diff options
Diffstat (limited to 'src/main/java/org/traccar/WebDataHandler.java')
-rw-r--r-- | src/main/java/org/traccar/WebDataHandler.java | 141 |
1 files changed, 124 insertions, 17 deletions
diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 64396de03..d6bfb126b 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 - 2019 Anton Tananaev (anton@traccar.org) + * Copyright 2015 - 2020 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,12 @@ package org.traccar; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.ChannelHandler; +import io.netty.util.Timer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.traccar.config.Config; import org.traccar.config.Keys; import org.traccar.database.IdentityManager; @@ -27,9 +33,13 @@ import org.traccar.model.Position; import org.traccar.model.Group; import javax.inject.Inject; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.InvocationCallback; import java.util.HashMap; import java.util.Map; import java.io.UnsupportedEncodingException; @@ -39,10 +49,14 @@ import java.util.Calendar; import java.util.Formatter; import java.util.Locale; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; @ChannelHandler.Sharable public class WebDataHandler extends BaseDataHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); + private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; @@ -53,16 +67,33 @@ public class WebDataHandler extends BaseDataHandler { private final String url; private final String header; private final boolean json; + private final boolean urlVariables; + + private final boolean retryEnabled; + private final int retryDelay; + private final int retryCount; + private final int retryLimit; + + private AtomicInteger deliveryPending; @Inject public WebDataHandler( Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) { + this.identityManager = identityManager; this.objectMapper = objectMapper; this.client = client; this.url = config.getString(Keys.FORWARD_URL); this.header = config.getString(Keys.FORWARD_HEADER); this.json = config.getBoolean(Keys.FORWARD_JSON); + this.urlVariables = config.getBoolean(Keys.FORWARD_URL_VARIABLES); + + this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE); + this.retryDelay = config.getInteger(Keys.FORWARD_RETRY_DELAY, 100); + this.retryCount = config.getInteger(Keys.FORWARD_RETRY_COUNT, 10); + this.retryLimit = config.getInteger(Keys.FORWARD_RETRY_LIMIT, 100); + + this.deliveryPending = new AtomicInteger(0); } private static String formatSentence(Position position) { @@ -152,35 +183,111 @@ public class WebDataHandler extends BaseDataHandler { return request; } - @Override - protected Position handlePosition(Position position) { + class AsyncRequestAndCallback implements InvocationCallback<Response>, TimerTask { - String url; - if (json) { - url = this.url; - } else { + private int retries = 0; + private Map<String, Object> payload; + private Invocation.Builder requestBuilder; + private MediaType mediaType = MediaType.APPLICATION_JSON_TYPE; + + AsyncRequestAndCallback(Position position) { + + String formattedUrl; try { - url = formatRequest(position); + formattedUrl = (json && !urlVariables) ? url : formatRequest(position); } catch (UnsupportedEncodingException | JsonProcessingException e) { throw new RuntimeException("Forwarding formatting error", e); } + + requestBuilder = client.target(formattedUrl).request(); + if (header != null && !header.isEmpty()) { + for (String line: header.split("\\r?\\n")) { + String[] values = line.split(":", 2); + String headerName = values[0].trim(); + String headerValue = values[1].trim(); + if (headerName.equals(HttpHeaders.CONTENT_TYPE)) { + mediaType = MediaType.valueOf(headerValue); + } else { + requestBuilder.header(headerName, headerValue); + } + } + } + + if (json) { + payload = prepareJsonPayload(position); + } + + deliveryPending.incrementAndGet(); } - Invocation.Builder requestBuilder = client.target(url).request(); + private void send() { + if (json) { + try { + Entity<String> entity = Entity.entity(objectMapper.writeValueAsString(payload), mediaType); + requestBuilder.async().post(entity, this); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize location to json", e); + } + } else { + requestBuilder.async().get(this); + } + } - if (header != null && !header.isEmpty()) { - for (String line: header.split("\\r?\\n")) { - String[] values = line.split(":", 2); - requestBuilder.header(values[0].trim(), values[1].trim()); + private void retry() { + boolean scheduled = false; + try { + if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) { + schedule(); + scheduled = true; + } + } finally { + int pending = scheduled ? deliveryPending.get() : deliveryPending.decrementAndGet(); + LOGGER.warn("Position forwarding failed: " + pending + " pending"); } } - if (json) { - requestBuilder.async().post(Entity.json(prepareJsonPayload(position))); - } else { - requestBuilder.async().get(); + private void schedule() { + Main.getInjector().getInstance(Timer.class).newTimeout( + this, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS); + } + + @Override + public void completed(Response response) { + if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) { + deliveryPending.decrementAndGet(); + } else { + retry(); + } + } + + @Override + public void failed(Throwable throwable) { + retry(); + } + + @Override + public void run(Timeout timeout) { + boolean sent = false; + try { + if (!timeout.isCancelled()) { + send(); + sent = true; + } + } finally { + if (!sent) { + deliveryPending.decrementAndGet(); + } + } } + } + + @Override + protected Position handlePosition(Position position) { + + AsyncRequestAndCallback request = new AsyncRequestAndCallback(position); + request.send(); + return position; } |