From 72e8ab6966cb70dd58dcdf671dab11a7673aeabf Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Mon, 26 Aug 2019 06:08:50 -0400 Subject: Add a ring buffer and response check --- src/main/java/org/traccar/WebDataHandler.java | 89 +++++++++++++++++++++++---- src/main/java/org/traccar/config/Keys.java | 7 +++ 2 files changed, 85 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 64396de03..193efd230 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -27,9 +27,11 @@ import org.traccar.model.Position; import org.traccar.model.Group; import javax.inject.Inject; +import javax.ws.rs.core.Response; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.InvocationCallback; import java.util.HashMap; import java.util.Map; import java.io.UnsupportedEncodingException; @@ -39,6 +41,14 @@ import java.util.Calendar; import java.util.Formatter; import java.util.Locale; import java.util.TimeZone; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.collections.Buffer; +import org.apache.commons.collections.BufferUtils; +import org.apache.commons.collections.buffer.CircularFifoBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ChannelHandler.Sharable public class WebDataHandler extends BaseDataHandler { @@ -46,6 +56,8 @@ public class WebDataHandler extends BaseDataHandler { private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; + private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); + private final IdentityManager identityManager; private final ObjectMapper objectMapper; private final Client client; @@ -54,6 +66,13 @@ public class WebDataHandler extends BaseDataHandler { private final String header; private final boolean json; + private final Integer queueSize; + + private Buffer positionsQueue = null; + private AtomicBoolean sendInProgress; + + private Invocation.Builder requestBuilder; + @Inject public WebDataHandler( Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) { @@ -63,6 +82,12 @@ public class WebDataHandler extends BaseDataHandler { this.url = config.getString(Keys.FORWARD_URL); this.header = config.getString(Keys.FORWARD_HEADER); this.json = config.getBoolean(Keys.FORWARD_JSON); + Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0)); + this.queueSize = (queueSize > 0) ? queueSize : 0; + if (this.queueSize > 0) { + this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize)); + } + this.sendInProgress = new AtomicBoolean(false); } private static String formatSentence(Position position) { @@ -152,9 +177,7 @@ public class WebDataHandler extends BaseDataHandler { return request; } - @Override - protected Position handlePosition(Position position) { - + protected void sendPosition(Position position) { String url; if (json) { url = this.url; @@ -166,19 +189,63 @@ public class WebDataHandler extends BaseDataHandler { } } - Invocation.Builder requestBuilder = client.target(url).request(); - - if (header != null && !header.isEmpty()) { - for (String line: header.split("\\r?\\n")) { - String[] values = line.split(":", 2); - requestBuilder.header(values[0].trim(), values[1].trim()); + if (!json || requestBuilder == null) { + requestBuilder = client.target(url).request(); + if (header != null && !header.isEmpty()) { + for (String line: header.split("\\r?\\n")) { + String[] values = line.split(":", 2); + requestBuilder.header(values[0].trim(), values[1].trim()); + } } } + InvocationCallback callback = new InvocationCallback() { + public void completed(Response response) { + if (positionsQueue != null) { + if (response.getStatus() == 200) { + positionsQueue.remove(); + } + logQueueStatus(); + sendInProgress.set(false); + sendQueuedPositions(); + } + } + public void failed(Throwable throwable) { + if (positionsQueue != null) { + logQueueStatus(); + sendInProgress.set(false); + sendQueuedPositions(); + } + } + }; + if (json) { - requestBuilder.async().post(Entity.json(prepareJsonPayload(position))); + requestBuilder.async().post(Entity.json(prepareJsonPayload(position)), callback); + } else { + requestBuilder.async().get(callback); + } + } + + protected void logQueueStatus() { + LOGGER.info(String.format("Position forwarding queue: %d/%d", + positionsQueue.size(), queueSize)); + } + + protected void sendQueuedPositions() { + if (!positionsQueue.isEmpty() && !sendInProgress.get()) { + sendInProgress.set(true); + sendPosition((Position) positionsQueue.get()); + } + } + + @Override + protected Position handlePosition(Position position) { + + if (positionsQueue == null) { + sendPosition(position); } else { - requestBuilder.async().get(); + positionsQueue.add(position); + sendQueuedPositions(); } return position; diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 2c5dcefd5..10eda01e2 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -100,6 +100,13 @@ public final class Keys { public static final ConfigKey FORWARD_JSON = new ConfigKey( "forward.json", Boolean.class); + /** + * Position forwarding queue size. A ring buffer of the specified size is used to queue positions. + * If negative, zero, or not specified, queueing is disabled. (Legacy behaviour) + */ + public static final ConfigKey FORWARD_QUEUE_SIZE = new ConfigKey( + "forward.queue.size", Integer.class); + /** * Boolean flag to enable or disable position filtering. */ -- cgit v1.2.3 From 169c40cc7207a962f027aea85bd8f9c04b33738f Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Sat, 31 Aug 2019 18:27:13 -0400 Subject: Refactor code to have concurrency --- src/main/java/org/traccar/WebDataHandler.java | 141 ++++++++++++++++---------- src/main/java/org/traccar/config/Keys.java | 7 ++ 2 files changed, 93 insertions(+), 55 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 193efd230..dfd6f0e5b 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -41,7 +41,7 @@ import java.util.Calendar; import java.util.Formatter; import java.util.Locale; import java.util.TimeZone; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.collections.Buffer; import org.apache.commons.collections.BufferUtils; @@ -67,27 +67,33 @@ public class WebDataHandler extends BaseDataHandler { private final boolean json; private final Integer queueSize; + private final Integer queueConcurrency; private Buffer positionsQueue = null; - private AtomicBoolean sendInProgress; - - private Invocation.Builder requestBuilder; + private AtomicInteger pendingRequests; @Inject public WebDataHandler( Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) { + this.identityManager = identityManager; this.objectMapper = objectMapper; this.client = client; this.url = config.getString(Keys.FORWARD_URL); this.header = config.getString(Keys.FORWARD_HEADER); this.json = config.getBoolean(Keys.FORWARD_JSON); + Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0)); this.queueSize = (queueSize > 0) ? queueSize : 0; + + Integer queueConcurrency = (config.getInteger(Keys.FORWARD_QUEUE_CONCURRENCY, 1)); + this.queueConcurrency = (queueConcurrency > 0) ? queueConcurrency : 1; + if (this.queueSize > 0) { this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize)); } - this.sendInProgress = new AtomicBoolean(false); + + this.pendingRequests = new AtomicInteger(0); } private static String formatSentence(Position position) { @@ -177,80 +183,105 @@ public class WebDataHandler extends BaseDataHandler { return request; } - protected void sendPosition(Position position) { - String url; - if (json) { - url = this.url; - } else { - try { - url = formatRequest(position); - } catch (UnsupportedEncodingException | JsonProcessingException e) { - throw new RuntimeException("Forwarding formatting error", e); + @Override + protected Position handlePosition(Position position) { + + class AsyncRequestAndCallback implements InvocationCallback { + + private Map jsonPayload; + private Invocation.Builder requestBuilder; + + AsyncRequestAndCallback(Position position) { + + String formattedUrl; + try { + formattedUrl = (json) ? url : formatRequest(position); + } catch (UnsupportedEncodingException | JsonProcessingException e) { + throw new RuntimeException("Forwarding formatting error", e); + } + + requestBuilder = client.target(formattedUrl).request(); + if (header != null && !header.isEmpty()) { + for (String line: header.split("\\r?\\n")) { + String[] values = line.split(":", 2); + requestBuilder.header(values[0].trim(), values[1].trim()); + } + } + + if (json) { + jsonPayload = prepareJsonPayload(position); + } + + pendingRequests.incrementAndGet(); + + send(); } - } - if (!json || requestBuilder == null) { - requestBuilder = client.target(url).request(); - if (header != null && !header.isEmpty()) { - for (String line: header.split("\\r?\\n")) { - String[] values = line.split(":", 2); - requestBuilder.header(values[0].trim(), values[1].trim()); + private void send() { + if (json) { + requestBuilder.async().post(Entity.json(jsonPayload), this); + } else { + requestBuilder.async().get(this); + } + } + + private void retry() { + try { + Thread.sleep(1000); + send(); + } catch (Exception e) { + } + } + + private void next() { + if (!positionsQueue.isEmpty()) { + new AsyncRequestAndCallback((Position) positionsQueue.remove()); } } - } - InvocationCallback callback = new InvocationCallback() { public void completed(Response response) { if (positionsQueue != null) { - if (response.getStatus() == 200) { - positionsQueue.remove(); + boolean ok = (response.getStatus() == 200); + boolean retry = (positionsQueue.size() < queueSize); + if (!ok && retry) { + retry(); + } else { + pendingRequests.decrementAndGet(); + while (!positionsQueue.isEmpty() && pendingRequests.get() < queueConcurrency) { + next(); + } } - logQueueStatus(); - sendInProgress.set(false); - sendQueuedPositions(); } } + public void failed(Throwable throwable) { if (positionsQueue != null) { - logQueueStatus(); - sendInProgress.set(false); - sendQueuedPositions(); + if (positionsQueue.size() < queueSize) { + retry(); + } else if (pendingRequests.decrementAndGet() == 0) { + next(); + } } } - }; - - if (json) { - requestBuilder.async().post(Entity.json(prepareJsonPayload(position)), callback); - } else { - requestBuilder.async().get(callback); - } - } - - protected void logQueueStatus() { - LOGGER.info(String.format("Position forwarding queue: %d/%d", - positionsQueue.size(), queueSize)); - } - protected void sendQueuedPositions() { - if (!positionsQueue.isEmpty() && !sendInProgress.get()) { - sendInProgress.set(true); - sendPosition((Position) positionsQueue.get()); } - } - @Override - protected Position handlePosition(Position position) { - - if (positionsQueue == null) { - sendPosition(position); + if (positionsQueue == null || pendingRequests.get() == 0) { + new AsyncRequestAndCallback(position); } else { positionsQueue.add(position); - sendQueuedPositions(); } + logQueueStatus(); + return position; } + private void logQueueStatus() { + LOGGER.info(String.format("Position forwarding queue: %d/%d/%d", + pendingRequests.get(), positionsQueue.size(), queueSize)); + } + private Map prepareJsonPayload(Position position) { Map data = new HashMap<>(); diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 10eda01e2..6102f7604 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -107,6 +107,13 @@ public final class Keys { public static final ConfigKey FORWARD_QUEUE_SIZE = new ConfigKey( "forward.queue.size", Integer.class); + /** + * Position forwarding queue concurrency. Defines how many HTTP requests can be pending at any time. + * If queueing is enabled, and this value is less than one, it's ignored and forced to be one. + */ + public static final ConfigKey FORWARD_QUEUE_CONCURRENCY = new ConfigKey( + "forward.queue.concurrency", Integer.class); + /** * Boolean flag to enable or disable position filtering. */ -- cgit v1.2.3 From 229c042edf74979d5893c047d2b5de7fd8cc8df0 Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Sun, 1 Dec 2019 16:33:08 -0500 Subject: Implement wait and retry logic --- src/main/java/org/traccar/WebDataHandler.java | 102 +++++++++++--------------- src/main/java/org/traccar/config/Keys.java | 34 +++++++-- 2 files changed, 68 insertions(+), 68 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index dfd6f0e5b..19c5a58a8 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -43,21 +43,12 @@ import java.util.Locale; import java.util.TimeZone; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.collections.Buffer; -import org.apache.commons.collections.BufferUtils; -import org.apache.commons.collections.buffer.CircularFifoBuffer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - @ChannelHandler.Sharable public class WebDataHandler extends BaseDataHandler { private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; - private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); - private final IdentityManager identityManager; private final ObjectMapper objectMapper; private final Client client; @@ -66,11 +57,13 @@ public class WebDataHandler extends BaseDataHandler { private final String header; private final boolean json; - private final Integer queueSize; - private final Integer queueConcurrency; + private final Boolean retryEnabled; + private final Integer retryDelayMin; + private final Integer retryDelayMax; + private final Integer deliveryPendingLimit; - private Buffer positionsQueue = null; - private AtomicInteger pendingRequests; + private AtomicInteger deliveryFaliuresInRow; + private AtomicInteger deliveryPendingCurrent; @Inject public WebDataHandler( @@ -83,17 +76,19 @@ public class WebDataHandler extends BaseDataHandler { this.header = config.getString(Keys.FORWARD_HEADER); this.json = config.getBoolean(Keys.FORWARD_JSON); - Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0)); - this.queueSize = (queueSize > 0) ? queueSize : 0; + this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE); - Integer queueConcurrency = (config.getInteger(Keys.FORWARD_QUEUE_CONCURRENCY, 1)); - this.queueConcurrency = (queueConcurrency > 0) ? queueConcurrency : 1; + Integer retryDelayMin = config.getInteger(Keys.FORWARD_RETRY_DELAY_MIN, 1); + this.retryDelayMin = (retryDelayMin > 0 && retryDelayMin < 3600) ? retryDelayMin : 1; - if (this.queueSize > 0) { - this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize)); - } + Integer retryDelayMax = config.getInteger(Keys.FORWARD_RETRY_DELAY_MAX, 10); + this.retryDelayMax = (retryDelayMax > retryDelayMin && retryDelayMax < 3600) ? retryDelayMax : retryDelayMin; + + Integer deliveryPendingLimit = config.getInteger(Keys.FORWARD_RETRY_PENDING_LIMIT, 100); + this.deliveryPendingLimit = (retryDelayMax > 0) ? deliveryPendingLimit : 100; - this.pendingRequests = new AtomicInteger(0); + this.deliveryFaliuresInRow = new AtomicInteger(0); + this.deliveryPendingCurrent = new AtomicInteger(0); } private static String formatSentence(Position position) { @@ -188,6 +183,7 @@ public class WebDataHandler extends BaseDataHandler { class AsyncRequestAndCallback implements InvocationCallback { + private Integer delay = retryDelayMin; private Map jsonPayload; private Invocation.Builder requestBuilder; @@ -212,7 +208,7 @@ public class WebDataHandler extends BaseDataHandler { jsonPayload = prepareJsonPayload(position); } - pendingRequests.incrementAndGet(); + deliveryPendingCurrent.incrementAndGet(); send(); } @@ -227,61 +223,47 @@ public class WebDataHandler extends BaseDataHandler { private void retry() { try { - Thread.sleep(1000); - send(); + deliveryFaliuresInRow.incrementAndGet(); + if (!retryEnabled || deliveryPendingCurrent.get() > deliveryPendingLimit) { + deliveryPendingCurrent.decrementAndGet(); + } else { + Integer i = 0; + for ( ; i < delay; i++) { + Thread.sleep(1000); + if (deliveryFaliuresInRow.get() == 0) { + delay = retryDelayMin; + break; + } + } + if (i >= delay && delay < retryDelayMax) { + delay++; + } + send(); + } } catch (Exception e) { } } - private void next() { - if (!positionsQueue.isEmpty()) { - new AsyncRequestAndCallback((Position) positionsQueue.remove()); - } - } - public void completed(Response response) { - if (positionsQueue != null) { - boolean ok = (response.getStatus() == 200); - boolean retry = (positionsQueue.size() < queueSize); - if (!ok && retry) { - retry(); - } else { - pendingRequests.decrementAndGet(); - while (!positionsQueue.isEmpty() && pendingRequests.get() < queueConcurrency) { - next(); - } - } + if (response.getStatus() == 200) { + deliveryFaliuresInRow.set(0); + deliveryPendingCurrent.decrementAndGet(); + } else { + retry(); } } public void failed(Throwable throwable) { - if (positionsQueue != null) { - if (positionsQueue.size() < queueSize) { - retry(); - } else if (pendingRequests.decrementAndGet() == 0) { - next(); - } - } + retry(); } } - if (positionsQueue == null || pendingRequests.get() == 0) { - new AsyncRequestAndCallback(position); - } else { - positionsQueue.add(position); - } - - logQueueStatus(); + new AsyncRequestAndCallback(position); return position; } - private void logQueueStatus() { - LOGGER.info(String.format("Position forwarding queue: %d/%d/%d", - pendingRequests.get(), positionsQueue.size(), queueSize)); - } - private Map prepareJsonPayload(Position position) { Map data = new HashMap<>(); diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 6102f7604..029316142 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -101,18 +101,36 @@ public final class Keys { "forward.json", Boolean.class); /** - * Position forwarding queue size. A ring buffer of the specified size is used to queue positions. - * If negative, zero, or not specified, queueing is disabled. (Legacy behaviour) + * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions. + * If initial delivery fails, because of an unreachable server or an HTTP response different from '200 OK', + * the software waits for 'forward.retry.delay.min' seconds to retry delivery. On subsecuent failures, this + * delay is incremented by 1 second up to 'forward.retry.delay.max'. On successful delivery, the delay is reset + * to 'forward.retry.delay.min'. Pending positions to be delivered are limited to 'forward.retry.pending.limit'. + * If this limit is reached, positions are discarded before next retry. */ - public static final ConfigKey FORWARD_QUEUE_SIZE = new ConfigKey( - "forward.queue.size", Integer.class); + public static final ConfigKey FORWARD_RETRY_ENABLE = new ConfigKey( + "forward.retry.enable", Boolean.class); /** - * Position forwarding queue concurrency. Defines how many HTTP requests can be pending at any time. - * If queueing is enabled, and this value is less than one, it's ignored and forced to be one. + * Position forwarding retry minimum delay in seconds. + * Can be set to anything between 1 and 3600 seconds. Defaults to 1 second. */ - public static final ConfigKey FORWARD_QUEUE_CONCURRENCY = new ConfigKey( - "forward.queue.concurrency", Integer.class); + public static final ConfigKey FORWARD_RETRY_DELAY_MIN = new ConfigKey( + "forward.retry.delay.min", Integer.class); + + /** + * Position forwarding retry maximum delay in seconds. + * Can be set to anything between 1 and 3600 seconds. Defaults to 10 seconds. + */ + public static final ConfigKey FORWARD_RETRY_DELAY_MAX = new ConfigKey( + "forward.retry.delay.max", Integer.class); + + /** + * Position forwarding retry pending limit. + * Can be set to anything greater than 0. Defaults to 100 positions. + */ + public static final ConfigKey FORWARD_RETRY_PENDING_LIMIT = new ConfigKey( + "forward.retry.pending.limit", Integer.class); /** * Boolean flag to enable or disable position filtering. -- cgit v1.2.3 From 628a80f49f286dcb51034c0e5fbc2f59fe295a08 Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Sun, 1 Dec 2019 16:39:21 -0500 Subject: Fix orthographic typo. --- src/main/java/org/traccar/config/Keys.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 029316142..ec4eb1801 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -103,7 +103,7 @@ public final class Keys { /** * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions. * If initial delivery fails, because of an unreachable server or an HTTP response different from '200 OK', - * the software waits for 'forward.retry.delay.min' seconds to retry delivery. On subsecuent failures, this + * the software waits for 'forward.retry.delay.min' seconds to retry delivery. On subsequent failures, this * delay is incremented by 1 second up to 'forward.retry.delay.max'. On successful delivery, the delay is reset * to 'forward.retry.delay.min'. Pending positions to be delivered are limited to 'forward.retry.pending.limit'. * If this limit is reached, positions are discarded before next retry. -- cgit v1.2.3 From 7113e04b9a5cabb509ff0219da18098f4b55924f Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Thu, 30 Jan 2020 03:49:55 -0500 Subject: Changes after review --- src/main/java/org/traccar/WebDataHandler.java | 69 +++++++++++++-------------- src/main/java/org/traccar/config/Keys.java | 9 ++-- 2 files changed, 36 insertions(+), 42 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 19c5a58a8..2d6d98f5d 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 - 2019 Anton Tananaev (anton@traccar.org) + * Copyright 2015 - 2020 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,9 @@ package org.traccar; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.ChannelHandler; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + import org.traccar.config.Config; import org.traccar.config.Keys; import org.traccar.database.IdentityManager; @@ -41,6 +44,7 @@ import java.util.Calendar; import java.util.Formatter; import java.util.Locale; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @ChannelHandler.Sharable @@ -57,12 +61,11 @@ public class WebDataHandler extends BaseDataHandler { private final String header; private final boolean json; - private final Boolean retryEnabled; - private final Integer retryDelayMin; - private final Integer retryDelayMax; - private final Integer deliveryPendingLimit; + private final boolean retryEnabled; + private final int retryDelayMin; + private final int retryDelayMax; + private final int deliveryPendingLimit; - private AtomicInteger deliveryFaliuresInRow; private AtomicInteger deliveryPendingCurrent; @Inject @@ -77,17 +80,10 @@ public class WebDataHandler extends BaseDataHandler { this.json = config.getBoolean(Keys.FORWARD_JSON); this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE); + this.retryDelayMin = config.getInteger(Keys.FORWARD_RETRY_DELAY_MIN, 1); + this.retryDelayMax = config.getInteger(Keys.FORWARD_RETRY_DELAY_MAX, 10); + this.deliveryPendingLimit = config.getInteger(Keys.FORWARD_RETRY_PENDING_LIMIT, 100); - Integer retryDelayMin = config.getInteger(Keys.FORWARD_RETRY_DELAY_MIN, 1); - this.retryDelayMin = (retryDelayMin > 0 && retryDelayMin < 3600) ? retryDelayMin : 1; - - Integer retryDelayMax = config.getInteger(Keys.FORWARD_RETRY_DELAY_MAX, 10); - this.retryDelayMax = (retryDelayMax > retryDelayMin && retryDelayMax < 3600) ? retryDelayMax : retryDelayMin; - - Integer deliveryPendingLimit = config.getInteger(Keys.FORWARD_RETRY_PENDING_LIMIT, 100); - this.deliveryPendingLimit = (retryDelayMax > 0) ? deliveryPendingLimit : 100; - - this.deliveryFaliuresInRow = new AtomicInteger(0); this.deliveryPendingCurrent = new AtomicInteger(0); } @@ -183,15 +179,15 @@ public class WebDataHandler extends BaseDataHandler { class AsyncRequestAndCallback implements InvocationCallback { - private Integer delay = retryDelayMin; - private Map jsonPayload; + private int delay = retryDelayMin; + private Map payload; private Invocation.Builder requestBuilder; AsyncRequestAndCallback(Position position) { String formattedUrl; try { - formattedUrl = (json) ? url : formatRequest(position); + formattedUrl = json ? url : formatRequest(position); } catch (UnsupportedEncodingException | JsonProcessingException e) { throw new RuntimeException("Forwarding formatting error", e); } @@ -205,7 +201,7 @@ public class WebDataHandler extends BaseDataHandler { } if (json) { - jsonPayload = prepareJsonPayload(position); + payload = prepareJsonPayload(position); } deliveryPendingCurrent.incrementAndGet(); @@ -215,7 +211,7 @@ public class WebDataHandler extends BaseDataHandler { private void send() { if (json) { - requestBuilder.async().post(Entity.json(jsonPayload), this); + requestBuilder.async().post(Entity.json(payload), this); } else { requestBuilder.async().get(this); } @@ -223,36 +219,35 @@ public class WebDataHandler extends BaseDataHandler { private void retry() { try { - deliveryFaliuresInRow.incrementAndGet(); - if (!retryEnabled || deliveryPendingCurrent.get() > deliveryPendingLimit) { - deliveryPendingCurrent.decrementAndGet(); - } else { - Integer i = 0; - for ( ; i < delay; i++) { - Thread.sleep(1000); - if (deliveryFaliuresInRow.get() == 0) { - delay = retryDelayMin; - break; + if (retryEnabled && deliveryPendingCurrent.get() <= deliveryPendingLimit) { + GlobalTimer.getTimer().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) { + if (!timeout.isCancelled()) { + if (delay < retryDelayMax) { + delay++; + } + send(); + } } - } - if (i >= delay && delay < retryDelayMax) { - delay++; - } - send(); + }, delay, TimeUnit.SECONDS); + return; } } catch (Exception e) { } + deliveryPendingCurrent.decrementAndGet(); } + @Override public void completed(Response response) { if (response.getStatus() == 200) { - deliveryFaliuresInRow.set(0); deliveryPendingCurrent.decrementAndGet(); } else { retry(); } } + @Override public void failed(Throwable throwable) { retry(); } diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index ec4eb1801..999a1d6df 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -104,23 +104,22 @@ public final class Keys { * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions. * If initial delivery fails, because of an unreachable server or an HTTP response different from '200 OK', * the software waits for 'forward.retry.delay.min' seconds to retry delivery. On subsequent failures, this - * delay is incremented by 1 second up to 'forward.retry.delay.max'. On successful delivery, the delay is reset - * to 'forward.retry.delay.min'. Pending positions to be delivered are limited to 'forward.retry.pending.limit'. - * If this limit is reached, positions are discarded before next retry. + * delay is incremented by 1 second up to 'forward.retry.delay.max'. Positions pending to be delivered + * are limited to 'forward.retry.pending.limit'. If this limit is reached, positions get discarded. */ public static final ConfigKey FORWARD_RETRY_ENABLE = new ConfigKey( "forward.retry.enable", Boolean.class); /** * Position forwarding retry minimum delay in seconds. - * Can be set to anything between 1 and 3600 seconds. Defaults to 1 second. + * Can be set to anything greater than 0. Defaults to 1 second. */ public static final ConfigKey FORWARD_RETRY_DELAY_MIN = new ConfigKey( "forward.retry.delay.min", Integer.class); /** * Position forwarding retry maximum delay in seconds. - * Can be set to anything between 1 and 3600 seconds. Defaults to 10 seconds. + * Can be set to anything greater than 0. Defaults to 10 seconds. */ public static final ConfigKey FORWARD_RETRY_DELAY_MAX = new ConfigKey( "forward.retry.delay.max", Integer.class); -- cgit v1.2.3 From 3c71ac47b4515fb799e67a18b18d722bea4d2241 Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Thu, 30 Jan 2020 05:33:24 -0500 Subject: Log position forwarding failures --- src/main/java/org/traccar/WebDataHandler.java | 36 +++++++++++++++++++-------- 1 file changed, 25 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 2d6d98f5d..e2f7c1e8b 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -20,6 +20,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.ChannelHandler; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.traccar.config.Config; import org.traccar.config.Keys; @@ -53,6 +55,8 @@ public class WebDataHandler extends BaseDataHandler { private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; + private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); + private final IdentityManager identityManager; private final ObjectMapper objectMapper; private final Client client; @@ -219,19 +223,29 @@ public class WebDataHandler extends BaseDataHandler { private void retry() { try { - if (retryEnabled && deliveryPendingCurrent.get() <= deliveryPendingLimit) { - GlobalTimer.getTimer().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) { - if (!timeout.isCancelled()) { - if (delay < retryDelayMax) { - delay++; + String message = "Position forwarding failed."; + if (!retryEnabled) { + LOGGER.warn(message); + } else { + int pending = deliveryPendingCurrent.get(); + if (pending <= deliveryPendingLimit) { + LOGGER.warn(message + " Pending: " + pending + + ". Retrying in " + delay + " seconds."); + GlobalTimer.getTimer().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) { + if (!timeout.isCancelled()) { + if (delay < retryDelayMax) { + delay++; + } + send(); } - send(); } - } - }, delay, TimeUnit.SECONDS); - return; + }, delay, TimeUnit.SECONDS); + return; + } + LOGGER.warn(message + " Pending: " + pending + + ". Delivery will not be retried."); } } catch (Exception e) { } -- cgit v1.2.3 From f8c83c4f7f517b7482bdf7f29a3704903b876855 Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Sat, 15 Feb 2020 20:23:08 -0500 Subject: Changes after review --- src/main/java/org/traccar/MainModule.java | 7 +++ src/main/java/org/traccar/WebDataHandler.java | 64 +++++++++++++-------------- src/main/java/org/traccar/config/Keys.java | 32 +++++++------- 3 files changed, 54 insertions(+), 49 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java index 3acd19b6a..24448ef51 100644 --- a/src/main/java/org/traccar/MainModule.java +++ b/src/main/java/org/traccar/MainModule.java @@ -73,6 +73,7 @@ import org.traccar.reports.model.TripsConfig; import javax.annotation.Nullable; import javax.ws.rs.client.Client; +import io.netty.util.Timer; public class MainModule extends AbstractModule { @@ -375,6 +376,12 @@ public class MainModule extends AbstractModule { return new DriverEventHandler(identityManager); } + @Singleton + @Provides + public static Timer provideGlobalTimer() { + return GlobalTimer.getTimer(); + } + @Override protected void configure() { binder().requireExplicitBindings(); diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index e2f7c1e8b..58b9ca73b 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -18,6 +18,7 @@ package org.traccar; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.ChannelHandler; +import io.netty.util.Timer; import io.netty.util.Timeout; import io.netty.util.TimerTask; import org.slf4j.Logger; @@ -66,11 +67,11 @@ public class WebDataHandler extends BaseDataHandler { private final boolean json; private final boolean retryEnabled; - private final int retryDelayMin; - private final int retryDelayMax; - private final int deliveryPendingLimit; + private final int retryDelay; + private final int retryCount; + private final int retryLimit; - private AtomicInteger deliveryPendingCurrent; + private AtomicInteger deliveryPending; @Inject public WebDataHandler( @@ -84,11 +85,11 @@ public class WebDataHandler extends BaseDataHandler { this.json = config.getBoolean(Keys.FORWARD_JSON); this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE); - this.retryDelayMin = config.getInteger(Keys.FORWARD_RETRY_DELAY_MIN, 1); - this.retryDelayMax = config.getInteger(Keys.FORWARD_RETRY_DELAY_MAX, 10); - this.deliveryPendingLimit = config.getInteger(Keys.FORWARD_RETRY_PENDING_LIMIT, 100); + this.retryDelay = config.getInteger(Keys.FORWARD_RETRY_DELAY, 100); + this.retryCount = config.getInteger(Keys.FORWARD_RETRY_COUNT, 10); + this.retryLimit = config.getInteger(Keys.FORWARD_RETRY_LIMIT, 100); - this.deliveryPendingCurrent = new AtomicInteger(0); + this.deliveryPending = new AtomicInteger(0); } private static String formatSentence(Position position) { @@ -183,7 +184,7 @@ public class WebDataHandler extends BaseDataHandler { class AsyncRequestAndCallback implements InvocationCallback { - private int delay = retryDelayMin; + private int retries = 0; private Map payload; private Invocation.Builder requestBuilder; @@ -208,7 +209,7 @@ public class WebDataHandler extends BaseDataHandler { payload = prepareJsonPayload(position); } - deliveryPendingCurrent.incrementAndGet(); + deliveryPending.incrementAndGet(); send(); } @@ -222,40 +223,37 @@ public class WebDataHandler extends BaseDataHandler { } private void retry() { + boolean ok = false; try { - String message = "Position forwarding failed."; - if (!retryEnabled) { - LOGGER.warn(message); - } else { - int pending = deliveryPendingCurrent.get(); - if (pending <= deliveryPendingLimit) { - LOGGER.warn(message + " Pending: " + pending - + ". Retrying in " + delay + " seconds."); - GlobalTimer.getTimer().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) { + if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) { + Main.getInjector().getInstance(Timer.class).newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) { + boolean ok = false; + try { if (!timeout.isCancelled()) { - if (delay < retryDelayMax) { - delay++; - } send(); + ok = true; + } + } finally { + if (!ok) { + deliveryPending.decrementAndGet(); } } - }, delay, TimeUnit.SECONDS); - return; - } - LOGGER.warn(message + " Pending: " + pending - + ". Delivery will not be retried."); + } + }, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS); + ok = true; } - } catch (Exception e) { + } finally { + int pending = ok ? deliveryPending.get() : deliveryPending.decrementAndGet(); + LOGGER.warn("Position forwarding failed: " + pending + " pending"); } - deliveryPendingCurrent.decrementAndGet(); } @Override public void completed(Response response) { - if (response.getStatus() == 200) { - deliveryPendingCurrent.decrementAndGet(); + if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) { + deliveryPending.decrementAndGet(); } else { retry(); } diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 999a1d6df..d88b36d28 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -101,35 +101,35 @@ public final class Keys { "forward.json", Boolean.class); /** - * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions. - * If initial delivery fails, because of an unreachable server or an HTTP response different from '200 OK', - * the software waits for 'forward.retry.delay.min' seconds to retry delivery. On subsequent failures, this - * delay is incremented by 1 second up to 'forward.retry.delay.max'. Positions pending to be delivered - * are limited to 'forward.retry.pending.limit'. If this limit is reached, positions get discarded. + * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions. If initial + * delivery fails, because of an unreachable server or an HTTP response different from '2xx', the software waits + * for 'forward.retry.delay' milliseconds to retry delivery. On subsequent failures, this delay is duplicated. + * If forwarding is retried for 'forward.retry.count', retrying is canceled and the position is dropped. Positions + * pending to be delivered are limited to 'forward.retry.limit'. If this limit is reached, positions get discarded. */ public static final ConfigKey FORWARD_RETRY_ENABLE = new ConfigKey( "forward.retry.enable", Boolean.class); /** - * Position forwarding retry minimum delay in seconds. - * Can be set to anything greater than 0. Defaults to 1 second. + * Position forwarding retry first delay in milliseconds. + * Can be set to anything greater than 0. Defaults to 100 milliseconds. */ - public static final ConfigKey FORWARD_RETRY_DELAY_MIN = new ConfigKey( - "forward.retry.delay.min", Integer.class); + public static final ConfigKey FORWARD_RETRY_DELAY = new ConfigKey( + "forward.retry.delay", Integer.class); /** - * Position forwarding retry maximum delay in seconds. - * Can be set to anything greater than 0. Defaults to 10 seconds. + * Position forwarding retry maximum retries. + * Can be set to anything greater than 0. Defaults to 10 retries. */ - public static final ConfigKey FORWARD_RETRY_DELAY_MAX = new ConfigKey( - "forward.retry.delay.max", Integer.class); + public static final ConfigKey FORWARD_RETRY_COUNT = new ConfigKey( + "forward.retry.count", Integer.class); /** - * Position forwarding retry pending limit. + * Position forwarding retry pending positions limit. * Can be set to anything greater than 0. Defaults to 100 positions. */ - public static final ConfigKey FORWARD_RETRY_PENDING_LIMIT = new ConfigKey( - "forward.retry.pending.limit", Integer.class); + public static final ConfigKey FORWARD_RETRY_LIMIT = new ConfigKey( + "forward.retry.limit", Integer.class); /** * Boolean flag to enable or disable position filtering. -- cgit v1.2.3 From 1049b01a22e28acb69ea530de824608b72c8222c Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Tue, 25 Feb 2020 18:27:26 -0500 Subject: Changes after review --- src/main/java/org/traccar/MainModule.java | 2 +- src/main/java/org/traccar/WebDataHandler.java | 142 +++++++++++++------------- 2 files changed, 74 insertions(+), 70 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java index 24448ef51..1d5508f5a 100644 --- a/src/main/java/org/traccar/MainModule.java +++ b/src/main/java/org/traccar/MainModule.java @@ -378,7 +378,7 @@ public class MainModule extends AbstractModule { @Singleton @Provides - public static Timer provideGlobalTimer() { + public static Timer provideTimer() { return GlobalTimer.getTimer(); } diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 58b9ca73b..39e54616b 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -53,11 +53,11 @@ import java.util.concurrent.atomic.AtomicInteger; @ChannelHandler.Sharable public class WebDataHandler extends BaseDataHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); + private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; - private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); - private final IdentityManager identityManager; private final ObjectMapper objectMapper; private final Client client; @@ -179,94 +179,98 @@ public class WebDataHandler extends BaseDataHandler { return request; } - @Override - protected Position handlePosition(Position position) { + class AsyncRequestAndCallback implements InvocationCallback, TimerTask { - class AsyncRequestAndCallback implements InvocationCallback { + private int retries = 0; + private Map payload; + private Invocation.Builder requestBuilder; - private int retries = 0; - private Map payload; - private Invocation.Builder requestBuilder; + AsyncRequestAndCallback(Position position) { - AsyncRequestAndCallback(Position position) { - - String formattedUrl; - try { - formattedUrl = json ? url : formatRequest(position); - } catch (UnsupportedEncodingException | JsonProcessingException e) { - throw new RuntimeException("Forwarding formatting error", e); - } + String formattedUrl; + try { + formattedUrl = json ? url : formatRequest(position); + } catch (UnsupportedEncodingException | JsonProcessingException e) { + throw new RuntimeException("Forwarding formatting error", e); + } - requestBuilder = client.target(formattedUrl).request(); - if (header != null && !header.isEmpty()) { - for (String line: header.split("\\r?\\n")) { - String[] values = line.split(":", 2); - requestBuilder.header(values[0].trim(), values[1].trim()); - } + requestBuilder = client.target(formattedUrl).request(); + if (header != null && !header.isEmpty()) { + for (String line: header.split("\\r?\\n")) { + String[] values = line.split(":", 2); + requestBuilder.header(values[0].trim(), values[1].trim()); } + } - if (json) { - payload = prepareJsonPayload(position); - } + if (json) { + payload = prepareJsonPayload(position); + } - deliveryPending.incrementAndGet(); + deliveryPending.incrementAndGet(); + } - send(); + private void send() { + if (json) { + requestBuilder.async().post(Entity.json(payload), this); + } else { + requestBuilder.async().get(this); } + } - private void send() { - if (json) { - requestBuilder.async().post(Entity.json(payload), this); - } else { - requestBuilder.async().get(this); + private void retry() { + boolean scheduled = false; + try { + if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) { + schedule(); + scheduled = true; } + } finally { + int pending = scheduled ? deliveryPending.get() : deliveryPending.decrementAndGet(); + LOGGER.warn("Position forwarding failed: " + pending + " pending"); } + } - private void retry() { - boolean ok = false; - try { - if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) { - Main.getInjector().getInstance(Timer.class).newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) { - boolean ok = false; - try { - if (!timeout.isCancelled()) { - send(); - ok = true; - } - } finally { - if (!ok) { - deliveryPending.decrementAndGet(); - } - } - } - }, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS); - ok = true; - } - } finally { - int pending = ok ? deliveryPending.get() : deliveryPending.decrementAndGet(); - LOGGER.warn("Position forwarding failed: " + pending + " pending"); - } + private void schedule() { + Main.getInjector().getInstance(Timer.class).newTimeout( + this, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS); + } + + @Override + public void completed(Response response) { + if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) { + deliveryPending.decrementAndGet(); + } else { + retry(); } + } + + @Override + public void failed(Throwable throwable) { + retry(); + } - @Override - public void completed(Response response) { - if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) { + @Override + public void run(Timeout timeout) { + boolean sent = false; + try { + if (!timeout.isCancelled()) { + send(); + sent = true; + } + } finally { + if (!sent) { deliveryPending.decrementAndGet(); - } else { - retry(); } } + } - @Override - public void failed(Throwable throwable) { - retry(); - } + } - } + @Override + protected Position handlePosition(Position position) { - new AsyncRequestAndCallback(position); + AsyncRequestAndCallback request = new AsyncRequestAndCallback(position); + request.send(); return position; } -- cgit v1.2.3