aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/traccar/WebDataHandler.java102
-rw-r--r--src/main/java/org/traccar/config/Keys.java34
2 files changed, 68 insertions, 68 deletions
diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java
index dfd6f0e5b..19c5a58a8 100644
--- a/src/main/java/org/traccar/WebDataHandler.java
+++ b/src/main/java/org/traccar/WebDataHandler.java
@@ -43,21 +43,12 @@ import java.util.Locale;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUtils;
-import org.apache.commons.collections.buffer.CircularFifoBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
@ChannelHandler.Sharable
public class WebDataHandler extends BaseDataHandler {
private static final String KEY_POSITION = "position";
private static final String KEY_DEVICE = "device";
- private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class);
-
private final IdentityManager identityManager;
private final ObjectMapper objectMapper;
private final Client client;
@@ -66,11 +57,13 @@ public class WebDataHandler extends BaseDataHandler {
private final String header;
private final boolean json;
- private final Integer queueSize;
- private final Integer queueConcurrency;
+ private final Boolean retryEnabled;
+ private final Integer retryDelayMin;
+ private final Integer retryDelayMax;
+ private final Integer deliveryPendingLimit;
- private Buffer positionsQueue = null;
- private AtomicInteger pendingRequests;
+ private AtomicInteger deliveryFaliuresInRow;
+ private AtomicInteger deliveryPendingCurrent;
@Inject
public WebDataHandler(
@@ -83,17 +76,19 @@ public class WebDataHandler extends BaseDataHandler {
this.header = config.getString(Keys.FORWARD_HEADER);
this.json = config.getBoolean(Keys.FORWARD_JSON);
- Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0));
- this.queueSize = (queueSize > 0) ? queueSize : 0;
+ this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE);
- Integer queueConcurrency = (config.getInteger(Keys.FORWARD_QUEUE_CONCURRENCY, 1));
- this.queueConcurrency = (queueConcurrency > 0) ? queueConcurrency : 1;
+ Integer retryDelayMin = config.getInteger(Keys.FORWARD_RETRY_DELAY_MIN, 1);
+ this.retryDelayMin = (retryDelayMin > 0 && retryDelayMin < 3600) ? retryDelayMin : 1;
- if (this.queueSize > 0) {
- this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize));
- }
+ Integer retryDelayMax = config.getInteger(Keys.FORWARD_RETRY_DELAY_MAX, 10);
+ this.retryDelayMax = (retryDelayMax > retryDelayMin && retryDelayMax < 3600) ? retryDelayMax : retryDelayMin;
+
+ Integer deliveryPendingLimit = config.getInteger(Keys.FORWARD_RETRY_PENDING_LIMIT, 100);
+ this.deliveryPendingLimit = (retryDelayMax > 0) ? deliveryPendingLimit : 100;
- this.pendingRequests = new AtomicInteger(0);
+ this.deliveryFaliuresInRow = new AtomicInteger(0);
+ this.deliveryPendingCurrent = new AtomicInteger(0);
}
private static String formatSentence(Position position) {
@@ -188,6 +183,7 @@ public class WebDataHandler extends BaseDataHandler {
class AsyncRequestAndCallback implements InvocationCallback<Response> {
+ private Integer delay = retryDelayMin;
private Map<String, Object> jsonPayload;
private Invocation.Builder requestBuilder;
@@ -212,7 +208,7 @@ public class WebDataHandler extends BaseDataHandler {
jsonPayload = prepareJsonPayload(position);
}
- pendingRequests.incrementAndGet();
+ deliveryPendingCurrent.incrementAndGet();
send();
}
@@ -227,61 +223,47 @@ public class WebDataHandler extends BaseDataHandler {
private void retry() {
try {
- Thread.sleep(1000);
- send();
+ deliveryFaliuresInRow.incrementAndGet();
+ if (!retryEnabled || deliveryPendingCurrent.get() > deliveryPendingLimit) {
+ deliveryPendingCurrent.decrementAndGet();
+ } else {
+ Integer i = 0;
+ for ( ; i < delay; i++) {
+ Thread.sleep(1000);
+ if (deliveryFaliuresInRow.get() == 0) {
+ delay = retryDelayMin;
+ break;
+ }
+ }
+ if (i >= delay && delay < retryDelayMax) {
+ delay++;
+ }
+ send();
+ }
} catch (Exception e) {
}
}
- private void next() {
- if (!positionsQueue.isEmpty()) {
- new AsyncRequestAndCallback((Position) positionsQueue.remove());
- }
- }
-
public void completed(Response response) {
- if (positionsQueue != null) {
- boolean ok = (response.getStatus() == 200);
- boolean retry = (positionsQueue.size() < queueSize);
- if (!ok && retry) {
- retry();
- } else {
- pendingRequests.decrementAndGet();
- while (!positionsQueue.isEmpty() && pendingRequests.get() < queueConcurrency) {
- next();
- }
- }
+ if (response.getStatus() == 200) {
+ deliveryFaliuresInRow.set(0);
+ deliveryPendingCurrent.decrementAndGet();
+ } else {
+ retry();
}
}
public void failed(Throwable throwable) {
- if (positionsQueue != null) {
- if (positionsQueue.size() < queueSize) {
- retry();
- } else if (pendingRequests.decrementAndGet() == 0) {
- next();
- }
- }
+ retry();
}
}
- if (positionsQueue == null || pendingRequests.get() == 0) {
- new AsyncRequestAndCallback(position);
- } else {
- positionsQueue.add(position);
- }
-
- logQueueStatus();
+ new AsyncRequestAndCallback(position);
return position;
}
- private void logQueueStatus() {
- LOGGER.info(String.format("Position forwarding queue: %d/%d/%d",
- pendingRequests.get(), positionsQueue.size(), queueSize));
- }
-
private Map<String, Object> prepareJsonPayload(Position position) {
Map<String, Object> data = new HashMap<>();
diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java
index 6102f7604..029316142 100644
--- a/src/main/java/org/traccar/config/Keys.java
+++ b/src/main/java/org/traccar/config/Keys.java
@@ -101,18 +101,36 @@ public final class Keys {
"forward.json", Boolean.class);
/**
- * Position forwarding queue size. A ring buffer of the specified size is used to queue positions.
- * If negative, zero, or not specified, queueing is disabled. (Legacy behaviour)
+ * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions.
+ * If initial delivery fails, because of an unreachable server or an HTTP response different from '200 OK',
+ * the software waits for 'forward.retry.delay.min' seconds to retry delivery. On subsecuent failures, this
+ * delay is incremented by 1 second up to 'forward.retry.delay.max'. On successful delivery, the delay is reset
+ * to 'forward.retry.delay.min'. Pending positions to be delivered are limited to 'forward.retry.pending.limit'.
+ * If this limit is reached, positions are discarded before next retry.
*/
- public static final ConfigKey FORWARD_QUEUE_SIZE = new ConfigKey(
- "forward.queue.size", Integer.class);
+ public static final ConfigKey FORWARD_RETRY_ENABLE = new ConfigKey(
+ "forward.retry.enable", Boolean.class);
/**
- * Position forwarding queue concurrency. Defines how many HTTP requests can be pending at any time.
- * If queueing is enabled, and this value is less than one, it's ignored and forced to be one.
+ * Position forwarding retry minimum delay in seconds.
+ * Can be set to anything between 1 and 3600 seconds. Defaults to 1 second.
*/
- public static final ConfigKey FORWARD_QUEUE_CONCURRENCY = new ConfigKey(
- "forward.queue.concurrency", Integer.class);
+ public static final ConfigKey FORWARD_RETRY_DELAY_MIN = new ConfigKey(
+ "forward.retry.delay.min", Integer.class);
+
+ /**
+ * Position forwarding retry maximum delay in seconds.
+ * Can be set to anything between 1 and 3600 seconds. Defaults to 10 seconds.
+ */
+ public static final ConfigKey FORWARD_RETRY_DELAY_MAX = new ConfigKey(
+ "forward.retry.delay.max", Integer.class);
+
+ /**
+ * Position forwarding retry pending limit.
+ * Can be set to anything greater than 0. Defaults to 100 positions.
+ */
+ public static final ConfigKey FORWARD_RETRY_PENDING_LIMIT = new ConfigKey(
+ "forward.retry.pending.limit", Integer.class);
/**
* Boolean flag to enable or disable position filtering.