diff options
author | Anton Tananaev <anton.tananaev@gmail.com> | 2020-02-25 17:40:26 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-25 17:40:26 -0800 |
commit | 9662f1d3b813bbdead7613f6919e23b6fe76c6db (patch) | |
tree | 577ac39d17687a07531194e40bbbc23764a98d1a | |
parent | 7db2926c38edafb4cbd9b2cda1a98b5567836bc1 (diff) | |
parent | 1049b01a22e28acb69ea530de824608b72c8222c (diff) | |
download | traccar-server-9662f1d3b813bbdead7613f6919e23b6fe76c6db.tar.gz traccar-server-9662f1d3b813bbdead7613f6919e23b6fe76c6db.tar.bz2 traccar-server-9662f1d3b813bbdead7613f6919e23b6fe76c6db.zip |
Merge pull request #4389 from edvalley/proposed
Add an optional queue to web data handler
-rw-r--r-- | src/main/java/org/traccar/MainModule.java | 7 | ||||
-rw-r--r-- | src/main/java/org/traccar/WebDataHandler.java | 125 | ||||
-rw-r--r-- | src/main/java/org/traccar/config/Keys.java | 31 |
3 files changed, 146 insertions, 17 deletions
diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java index 0957d9fe3..c3c0182d7 100644 --- a/src/main/java/org/traccar/MainModule.java +++ b/src/main/java/org/traccar/MainModule.java @@ -73,6 +73,7 @@ import org.traccar.reports.model.TripsConfig; import javax.annotation.Nullable; import javax.ws.rs.client.Client; +import io.netty.util.Timer; public class MainModule extends AbstractModule { @@ -375,6 +376,12 @@ public class MainModule extends AbstractModule { return new DriverEventHandler(identityManager); } + @Singleton + @Provides + public static Timer provideTimer() { + return GlobalTimer.getTimer(); + } + @Override protected void configure() { binder().requireExplicitBindings(); diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 64396de03..39e54616b 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 - 2019 Anton Tananaev (anton@traccar.org) + * Copyright 2015 - 2020 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,12 @@ package org.traccar; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.ChannelHandler; +import io.netty.util.Timer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.traccar.config.Config; import org.traccar.config.Keys; import org.traccar.database.IdentityManager; @@ -27,9 +33,11 @@ import org.traccar.model.Position; import org.traccar.model.Group; import javax.inject.Inject; +import javax.ws.rs.core.Response; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.InvocationCallback; import java.util.HashMap; import java.util.Map; import java.io.UnsupportedEncodingException; @@ -39,10 +47,14 @@ import java.util.Calendar; import java.util.Formatter; import java.util.Locale; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; @ChannelHandler.Sharable public class WebDataHandler extends BaseDataHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); + private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; @@ -54,15 +66,30 @@ public class WebDataHandler extends BaseDataHandler { private final String header; private final boolean json; + private final boolean retryEnabled; + private final int retryDelay; + private final int retryCount; + private final int retryLimit; + + private AtomicInteger deliveryPending; + @Inject public WebDataHandler( Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) { + this.identityManager = identityManager; this.objectMapper = objectMapper; this.client = client; this.url = config.getString(Keys.FORWARD_URL); this.header = config.getString(Keys.FORWARD_HEADER); this.json = config.getBoolean(Keys.FORWARD_JSON); + + this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE); + this.retryDelay = config.getInteger(Keys.FORWARD_RETRY_DELAY, 100); + this.retryCount = config.getInteger(Keys.FORWARD_RETRY_COUNT, 10); + this.retryLimit = config.getInteger(Keys.FORWARD_RETRY_LIMIT, 100); + + this.deliveryPending = new AtomicInteger(0); } private static String formatSentence(Position position) { @@ -152,35 +179,99 @@ public class WebDataHandler extends BaseDataHandler { return request; } - @Override - protected Position handlePosition(Position position) { + class AsyncRequestAndCallback implements InvocationCallback<Response>, TimerTask { - String url; - if (json) { - url = this.url; - } else { + private int retries = 0; + private Map<String, Object> payload; + private Invocation.Builder requestBuilder; + + AsyncRequestAndCallback(Position position) { + + String formattedUrl; try { - url = formatRequest(position); + formattedUrl = json ? url : formatRequest(position); } catch (UnsupportedEncodingException | JsonProcessingException e) { throw new RuntimeException("Forwarding formatting error", e); } + + requestBuilder = client.target(formattedUrl).request(); + if (header != null && !header.isEmpty()) { + for (String line: header.split("\\r?\\n")) { + String[] values = line.split(":", 2); + requestBuilder.header(values[0].trim(), values[1].trim()); + } + } + + if (json) { + payload = prepareJsonPayload(position); + } + + deliveryPending.incrementAndGet(); + } + + private void send() { + if (json) { + requestBuilder.async().post(Entity.json(payload), this); + } else { + requestBuilder.async().get(this); + } + } + + private void retry() { + boolean scheduled = false; + try { + if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) { + schedule(); + scheduled = true; + } + } finally { + int pending = scheduled ? deliveryPending.get() : deliveryPending.decrementAndGet(); + LOGGER.warn("Position forwarding failed: " + pending + " pending"); + } } - Invocation.Builder requestBuilder = client.target(url).request(); + private void schedule() { + Main.getInjector().getInstance(Timer.class).newTimeout( + this, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS); + } - if (header != null && !header.isEmpty()) { - for (String line: header.split("\\r?\\n")) { - String[] values = line.split(":", 2); - requestBuilder.header(values[0].trim(), values[1].trim()); + @Override + public void completed(Response response) { + if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) { + deliveryPending.decrementAndGet(); + } else { + retry(); } } - if (json) { - requestBuilder.async().post(Entity.json(prepareJsonPayload(position))); - } else { - requestBuilder.async().get(); + @Override + public void failed(Throwable throwable) { + retry(); + } + + @Override + public void run(Timeout timeout) { + boolean sent = false; + try { + if (!timeout.isCancelled()) { + send(); + sent = true; + } + } finally { + if (!sent) { + deliveryPending.decrementAndGet(); + } + } } + } + + @Override + protected Position handlePosition(Position position) { + + AsyncRequestAndCallback request = new AsyncRequestAndCallback(position); + request.send(); + return position; } diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 2c5dcefd5..d88b36d28 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -101,6 +101,37 @@ public final class Keys { "forward.json", Boolean.class); /** + * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions. If initial + * delivery fails, because of an unreachable server or an HTTP response different from '2xx', the software waits + * for 'forward.retry.delay' milliseconds to retry delivery. On subsequent failures, this delay is duplicated. + * If forwarding is retried for 'forward.retry.count', retrying is canceled and the position is dropped. Positions + * pending to be delivered are limited to 'forward.retry.limit'. If this limit is reached, positions get discarded. + */ + public static final ConfigKey FORWARD_RETRY_ENABLE = new ConfigKey( + "forward.retry.enable", Boolean.class); + + /** + * Position forwarding retry first delay in milliseconds. + * Can be set to anything greater than 0. Defaults to 100 milliseconds. + */ + public static final ConfigKey FORWARD_RETRY_DELAY = new ConfigKey( + "forward.retry.delay", Integer.class); + + /** + * Position forwarding retry maximum retries. + * Can be set to anything greater than 0. Defaults to 10 retries. + */ + public static final ConfigKey FORWARD_RETRY_COUNT = new ConfigKey( + "forward.retry.count", Integer.class); + + /** + * Position forwarding retry pending positions limit. + * Can be set to anything greater than 0. Defaults to 100 positions. + */ + public static final ConfigKey FORWARD_RETRY_LIMIT = new ConfigKey( + "forward.retry.limit", Integer.class); + + /** * Boolean flag to enable or disable position filtering. */ public static final ConfigKey FILTER_ENABLE = new ConfigKey( |