aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/WebDataHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/traccar/WebDataHandler.java')
-rw-r--r--src/main/java/org/traccar/WebDataHandler.java102
1 files changed, 42 insertions, 60 deletions
diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java
index dfd6f0e5b..19c5a58a8 100644
--- a/src/main/java/org/traccar/WebDataHandler.java
+++ b/src/main/java/org/traccar/WebDataHandler.java
@@ -43,21 +43,12 @@ import java.util.Locale;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUtils;
-import org.apache.commons.collections.buffer.CircularFifoBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
@ChannelHandler.Sharable
public class WebDataHandler extends BaseDataHandler {
private static final String KEY_POSITION = "position";
private static final String KEY_DEVICE = "device";
- private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class);
-
private final IdentityManager identityManager;
private final ObjectMapper objectMapper;
private final Client client;
@@ -66,11 +57,13 @@ public class WebDataHandler extends BaseDataHandler {
private final String header;
private final boolean json;
- private final Integer queueSize;
- private final Integer queueConcurrency;
+ private final Boolean retryEnabled;
+ private final Integer retryDelayMin;
+ private final Integer retryDelayMax;
+ private final Integer deliveryPendingLimit;
- private Buffer positionsQueue = null;
- private AtomicInteger pendingRequests;
+ private AtomicInteger deliveryFaliuresInRow;
+ private AtomicInteger deliveryPendingCurrent;
@Inject
public WebDataHandler(
@@ -83,17 +76,19 @@ public class WebDataHandler extends BaseDataHandler {
this.header = config.getString(Keys.FORWARD_HEADER);
this.json = config.getBoolean(Keys.FORWARD_JSON);
- Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0));
- this.queueSize = (queueSize > 0) ? queueSize : 0;
+ this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE);
- Integer queueConcurrency = (config.getInteger(Keys.FORWARD_QUEUE_CONCURRENCY, 1));
- this.queueConcurrency = (queueConcurrency > 0) ? queueConcurrency : 1;
+ Integer retryDelayMin = config.getInteger(Keys.FORWARD_RETRY_DELAY_MIN, 1);
+ this.retryDelayMin = (retryDelayMin > 0 && retryDelayMin < 3600) ? retryDelayMin : 1;
- if (this.queueSize > 0) {
- this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize));
- }
+ Integer retryDelayMax = config.getInteger(Keys.FORWARD_RETRY_DELAY_MAX, 10);
+ this.retryDelayMax = (retryDelayMax > retryDelayMin && retryDelayMax < 3600) ? retryDelayMax : retryDelayMin;
+
+ Integer deliveryPendingLimit = config.getInteger(Keys.FORWARD_RETRY_PENDING_LIMIT, 100);
+ this.deliveryPendingLimit = (retryDelayMax > 0) ? deliveryPendingLimit : 100;
- this.pendingRequests = new AtomicInteger(0);
+ this.deliveryFaliuresInRow = new AtomicInteger(0);
+ this.deliveryPendingCurrent = new AtomicInteger(0);
}
private static String formatSentence(Position position) {
@@ -188,6 +183,7 @@ public class WebDataHandler extends BaseDataHandler {
class AsyncRequestAndCallback implements InvocationCallback<Response> {
+ private Integer delay = retryDelayMin;
private Map<String, Object> jsonPayload;
private Invocation.Builder requestBuilder;
@@ -212,7 +208,7 @@ public class WebDataHandler extends BaseDataHandler {
jsonPayload = prepareJsonPayload(position);
}
- pendingRequests.incrementAndGet();
+ deliveryPendingCurrent.incrementAndGet();
send();
}
@@ -227,61 +223,47 @@ public class WebDataHandler extends BaseDataHandler {
private void retry() {
try {
- Thread.sleep(1000);
- send();
+ deliveryFaliuresInRow.incrementAndGet();
+ if (!retryEnabled || deliveryPendingCurrent.get() > deliveryPendingLimit) {
+ deliveryPendingCurrent.decrementAndGet();
+ } else {
+ Integer i = 0;
+ for ( ; i < delay; i++) {
+ Thread.sleep(1000);
+ if (deliveryFaliuresInRow.get() == 0) {
+ delay = retryDelayMin;
+ break;
+ }
+ }
+ if (i >= delay && delay < retryDelayMax) {
+ delay++;
+ }
+ send();
+ }
} catch (Exception e) {
}
}
- private void next() {
- if (!positionsQueue.isEmpty()) {
- new AsyncRequestAndCallback((Position) positionsQueue.remove());
- }
- }
-
public void completed(Response response) {
- if (positionsQueue != null) {
- boolean ok = (response.getStatus() == 200);
- boolean retry = (positionsQueue.size() < queueSize);
- if (!ok && retry) {
- retry();
- } else {
- pendingRequests.decrementAndGet();
- while (!positionsQueue.isEmpty() && pendingRequests.get() < queueConcurrency) {
- next();
- }
- }
+ if (response.getStatus() == 200) {
+ deliveryFaliuresInRow.set(0);
+ deliveryPendingCurrent.decrementAndGet();
+ } else {
+ retry();
}
}
public void failed(Throwable throwable) {
- if (positionsQueue != null) {
- if (positionsQueue.size() < queueSize) {
- retry();
- } else if (pendingRequests.decrementAndGet() == 0) {
- next();
- }
- }
+ retry();
}
}
- if (positionsQueue == null || pendingRequests.get() == 0) {
- new AsyncRequestAndCallback(position);
- } else {
- positionsQueue.add(position);
- }
-
- logQueueStatus();
+ new AsyncRequestAndCallback(position);
return position;
}
- private void logQueueStatus() {
- LOGGER.info(String.format("Position forwarding queue: %d/%d/%d",
- pendingRequests.get(), positionsQueue.size(), queueSize));
- }
-
private Map<String, Object> prepareJsonPayload(Position position) {
Map<String, Object> data = new HashMap<>();