From 229c042edf74979d5893c047d2b5de7fd8cc8df0 Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Sun, 1 Dec 2019 16:33:08 -0500 Subject: Implement wait and retry logic --- src/main/java/org/traccar/WebDataHandler.java | 102 +++++++++++--------------- src/main/java/org/traccar/config/Keys.java | 34 +++++++-- 2 files changed, 68 insertions(+), 68 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index dfd6f0e5b..19c5a58a8 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -43,21 +43,12 @@ import java.util.Locale; import java.util.TimeZone; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.collections.Buffer; -import org.apache.commons.collections.BufferUtils; -import org.apache.commons.collections.buffer.CircularFifoBuffer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - @ChannelHandler.Sharable public class WebDataHandler extends BaseDataHandler { private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; - private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); - private final IdentityManager identityManager; private final ObjectMapper objectMapper; private final Client client; @@ -66,11 +57,13 @@ public class WebDataHandler extends BaseDataHandler { private final String header; private final boolean json; - private final Integer queueSize; - private final Integer queueConcurrency; + private final Boolean retryEnabled; + private final Integer retryDelayMin; + private final Integer retryDelayMax; + private final Integer deliveryPendingLimit; - private Buffer positionsQueue = null; - private AtomicInteger pendingRequests; + private AtomicInteger deliveryFaliuresInRow; + private AtomicInteger deliveryPendingCurrent; @Inject public WebDataHandler( @@ -83,17 +76,19 @@ public class WebDataHandler extends BaseDataHandler { this.header = config.getString(Keys.FORWARD_HEADER); this.json = config.getBoolean(Keys.FORWARD_JSON); - Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0)); - this.queueSize = (queueSize > 0) ? queueSize : 0; + this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE); - Integer queueConcurrency = (config.getInteger(Keys.FORWARD_QUEUE_CONCURRENCY, 1)); - this.queueConcurrency = (queueConcurrency > 0) ? queueConcurrency : 1; + Integer retryDelayMin = config.getInteger(Keys.FORWARD_RETRY_DELAY_MIN, 1); + this.retryDelayMin = (retryDelayMin > 0 && retryDelayMin < 3600) ? retryDelayMin : 1; - if (this.queueSize > 0) { - this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize)); - } + Integer retryDelayMax = config.getInteger(Keys.FORWARD_RETRY_DELAY_MAX, 10); + this.retryDelayMax = (retryDelayMax > retryDelayMin && retryDelayMax < 3600) ? retryDelayMax : retryDelayMin; + + Integer deliveryPendingLimit = config.getInteger(Keys.FORWARD_RETRY_PENDING_LIMIT, 100); + this.deliveryPendingLimit = (retryDelayMax > 0) ? deliveryPendingLimit : 100; - this.pendingRequests = new AtomicInteger(0); + this.deliveryFaliuresInRow = new AtomicInteger(0); + this.deliveryPendingCurrent = new AtomicInteger(0); } private static String formatSentence(Position position) { @@ -188,6 +183,7 @@ public class WebDataHandler extends BaseDataHandler { class AsyncRequestAndCallback implements InvocationCallback { + private Integer delay = retryDelayMin; private Map jsonPayload; private Invocation.Builder requestBuilder; @@ -212,7 +208,7 @@ public class WebDataHandler extends BaseDataHandler { jsonPayload = prepareJsonPayload(position); } - pendingRequests.incrementAndGet(); + deliveryPendingCurrent.incrementAndGet(); send(); } @@ -227,61 +223,47 @@ public class WebDataHandler extends BaseDataHandler { private void retry() { try { - Thread.sleep(1000); - send(); + deliveryFaliuresInRow.incrementAndGet(); + if (!retryEnabled || deliveryPendingCurrent.get() > deliveryPendingLimit) { + deliveryPendingCurrent.decrementAndGet(); + } else { + Integer i = 0; + for ( ; i < delay; i++) { + Thread.sleep(1000); + if (deliveryFaliuresInRow.get() == 0) { + delay = retryDelayMin; + break; + } + } + if (i >= delay && delay < retryDelayMax) { + delay++; + } + send(); + } } catch (Exception e) { } } - private void next() { - if (!positionsQueue.isEmpty()) { - new AsyncRequestAndCallback((Position) positionsQueue.remove()); - } - } - public void completed(Response response) { - if (positionsQueue != null) { - boolean ok = (response.getStatus() == 200); - boolean retry = (positionsQueue.size() < queueSize); - if (!ok && retry) { - retry(); - } else { - pendingRequests.decrementAndGet(); - while (!positionsQueue.isEmpty() && pendingRequests.get() < queueConcurrency) { - next(); - } - } + if (response.getStatus() == 200) { + deliveryFaliuresInRow.set(0); + deliveryPendingCurrent.decrementAndGet(); + } else { + retry(); } } public void failed(Throwable throwable) { - if (positionsQueue != null) { - if (positionsQueue.size() < queueSize) { - retry(); - } else if (pendingRequests.decrementAndGet() == 0) { - next(); - } - } + retry(); } } - if (positionsQueue == null || pendingRequests.get() == 0) { - new AsyncRequestAndCallback(position); - } else { - positionsQueue.add(position); - } - - logQueueStatus(); + new AsyncRequestAndCallback(position); return position; } - private void logQueueStatus() { - LOGGER.info(String.format("Position forwarding queue: %d/%d/%d", - pendingRequests.get(), positionsQueue.size(), queueSize)); - } - private Map prepareJsonPayload(Position position) { Map data = new HashMap<>(); diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 6102f7604..029316142 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -101,18 +101,36 @@ public final class Keys { "forward.json", Boolean.class); /** - * Position forwarding queue size. A ring buffer of the specified size is used to queue positions. - * If negative, zero, or not specified, queueing is disabled. (Legacy behaviour) + * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions. + * If initial delivery fails, because of an unreachable server or an HTTP response different from '200 OK', + * the software waits for 'forward.retry.delay.min' seconds to retry delivery. On subsecuent failures, this + * delay is incremented by 1 second up to 'forward.retry.delay.max'. On successful delivery, the delay is reset + * to 'forward.retry.delay.min'. Pending positions to be delivered are limited to 'forward.retry.pending.limit'. + * If this limit is reached, positions are discarded before next retry. */ - public static final ConfigKey FORWARD_QUEUE_SIZE = new ConfigKey( - "forward.queue.size", Integer.class); + public static final ConfigKey FORWARD_RETRY_ENABLE = new ConfigKey( + "forward.retry.enable", Boolean.class); /** - * Position forwarding queue concurrency. Defines how many HTTP requests can be pending at any time. - * If queueing is enabled, and this value is less than one, it's ignored and forced to be one. + * Position forwarding retry minimum delay in seconds. + * Can be set to anything between 1 and 3600 seconds. Defaults to 1 second. */ - public static final ConfigKey FORWARD_QUEUE_CONCURRENCY = new ConfigKey( - "forward.queue.concurrency", Integer.class); + public static final ConfigKey FORWARD_RETRY_DELAY_MIN = new ConfigKey( + "forward.retry.delay.min", Integer.class); + + /** + * Position forwarding retry maximum delay in seconds. + * Can be set to anything between 1 and 3600 seconds. Defaults to 10 seconds. + */ + public static final ConfigKey FORWARD_RETRY_DELAY_MAX = new ConfigKey( + "forward.retry.delay.max", Integer.class); + + /** + * Position forwarding retry pending limit. + * Can be set to anything greater than 0. Defaults to 100 positions. + */ + public static final ConfigKey FORWARD_RETRY_PENDING_LIMIT = new ConfigKey( + "forward.retry.pending.limit", Integer.class); /** * Boolean flag to enable or disable position filtering. -- cgit v1.2.3