diff options
-rw-r--r-- | src/main/java/org/traccar/MainModule.java | 2 | ||||
-rw-r--r-- | src/main/java/org/traccar/WebDataHandler.java | 142 |
2 files changed, 74 insertions, 70 deletions
diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java index 24448ef51..1d5508f5a 100644 --- a/src/main/java/org/traccar/MainModule.java +++ b/src/main/java/org/traccar/MainModule.java @@ -378,7 +378,7 @@ public class MainModule extends AbstractModule { @Singleton @Provides - public static Timer provideGlobalTimer() { + public static Timer provideTimer() { return GlobalTimer.getTimer(); } diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 58b9ca73b..39e54616b 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -53,11 +53,11 @@ import java.util.concurrent.atomic.AtomicInteger; @ChannelHandler.Sharable public class WebDataHandler extends BaseDataHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); + private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; - private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); - private final IdentityManager identityManager; private final ObjectMapper objectMapper; private final Client client; @@ -179,94 +179,98 @@ public class WebDataHandler extends BaseDataHandler { return request; } - @Override - protected Position handlePosition(Position position) { + class AsyncRequestAndCallback implements InvocationCallback<Response>, TimerTask { - class AsyncRequestAndCallback implements InvocationCallback<Response> { + private int retries = 0; + private Map<String, Object> payload; + private Invocation.Builder requestBuilder; - private int retries = 0; - private Map<String, Object> payload; - private Invocation.Builder requestBuilder; + AsyncRequestAndCallback(Position position) { - AsyncRequestAndCallback(Position position) { - - String formattedUrl; - try { - formattedUrl = json ? url : formatRequest(position); - } catch (UnsupportedEncodingException | JsonProcessingException e) { - throw new RuntimeException("Forwarding formatting error", e); - } + String formattedUrl; + try { + formattedUrl = json ? url : formatRequest(position); + } catch (UnsupportedEncodingException | JsonProcessingException e) { + throw new RuntimeException("Forwarding formatting error", e); + } - requestBuilder = client.target(formattedUrl).request(); - if (header != null && !header.isEmpty()) { - for (String line: header.split("\\r?\\n")) { - String[] values = line.split(":", 2); - requestBuilder.header(values[0].trim(), values[1].trim()); - } + requestBuilder = client.target(formattedUrl).request(); + if (header != null && !header.isEmpty()) { + for (String line: header.split("\\r?\\n")) { + String[] values = line.split(":", 2); + requestBuilder.header(values[0].trim(), values[1].trim()); } + } - if (json) { - payload = prepareJsonPayload(position); - } + if (json) { + payload = prepareJsonPayload(position); + } - deliveryPending.incrementAndGet(); + deliveryPending.incrementAndGet(); + } - send(); + private void send() { + if (json) { + requestBuilder.async().post(Entity.json(payload), this); + } else { + requestBuilder.async().get(this); } + } - private void send() { - if (json) { - requestBuilder.async().post(Entity.json(payload), this); - } else { - requestBuilder.async().get(this); + private void retry() { + boolean scheduled = false; + try { + if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) { + schedule(); + scheduled = true; } + } finally { + int pending = scheduled ? deliveryPending.get() : deliveryPending.decrementAndGet(); + LOGGER.warn("Position forwarding failed: " + pending + " pending"); } + } - private void retry() { - boolean ok = false; - try { - if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) { - Main.getInjector().getInstance(Timer.class).newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) { - boolean ok = false; - try { - if (!timeout.isCancelled()) { - send(); - ok = true; - } - } finally { - if (!ok) { - deliveryPending.decrementAndGet(); - } - } - } - }, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS); - ok = true; - } - } finally { - int pending = ok ? deliveryPending.get() : deliveryPending.decrementAndGet(); - LOGGER.warn("Position forwarding failed: " + pending + " pending"); - } + private void schedule() { + Main.getInjector().getInstance(Timer.class).newTimeout( + this, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS); + } + + @Override + public void completed(Response response) { + if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) { + deliveryPending.decrementAndGet(); + } else { + retry(); } + } + + @Override + public void failed(Throwable throwable) { + retry(); + } - @Override - public void completed(Response response) { - if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) { + @Override + public void run(Timeout timeout) { + boolean sent = false; + try { + if (!timeout.isCancelled()) { + send(); + sent = true; + } + } finally { + if (!sent) { deliveryPending.decrementAndGet(); - } else { - retry(); } } + } - @Override - public void failed(Throwable throwable) { - retry(); - } + } - } + @Override + protected Position handlePosition(Position position) { - new AsyncRequestAndCallback(position); + AsyncRequestAndCallback request = new AsyncRequestAndCallback(position); + request.send(); return position; } |