From 72e8ab6966cb70dd58dcdf671dab11a7673aeabf Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Mon, 26 Aug 2019 06:08:50 -0400 Subject: Add a ring buffer and response check --- src/main/java/org/traccar/WebDataHandler.java | 89 +++++++++++++++++++++++---- src/main/java/org/traccar/config/Keys.java | 7 +++ 2 files changed, 85 insertions(+), 11 deletions(-) (limited to 'src/main/java/org') diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 64396de03..193efd230 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -27,9 +27,11 @@ import org.traccar.model.Position; import org.traccar.model.Group; import javax.inject.Inject; +import javax.ws.rs.core.Response; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.InvocationCallback; import java.util.HashMap; import java.util.Map; import java.io.UnsupportedEncodingException; @@ -39,6 +41,14 @@ import java.util.Calendar; import java.util.Formatter; import java.util.Locale; import java.util.TimeZone; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.collections.Buffer; +import org.apache.commons.collections.BufferUtils; +import org.apache.commons.collections.buffer.CircularFifoBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ChannelHandler.Sharable public class WebDataHandler extends BaseDataHandler { @@ -46,6 +56,8 @@ public class WebDataHandler extends BaseDataHandler { private static final String KEY_POSITION = "position"; private static final String KEY_DEVICE = "device"; + private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class); + private final IdentityManager identityManager; private final ObjectMapper objectMapper; private final Client client; @@ -54,6 +66,13 @@ public class WebDataHandler extends BaseDataHandler { private final String header; private final boolean json; + private final Integer queueSize; + + private Buffer positionsQueue = null; + private AtomicBoolean sendInProgress; + + private Invocation.Builder requestBuilder; + @Inject public WebDataHandler( Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) { @@ -63,6 +82,12 @@ public class WebDataHandler extends BaseDataHandler { this.url = config.getString(Keys.FORWARD_URL); this.header = config.getString(Keys.FORWARD_HEADER); this.json = config.getBoolean(Keys.FORWARD_JSON); + Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0)); + this.queueSize = (queueSize > 0) ? queueSize : 0; + if (this.queueSize > 0) { + this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize)); + } + this.sendInProgress = new AtomicBoolean(false); } private static String formatSentence(Position position) { @@ -152,9 +177,7 @@ public class WebDataHandler extends BaseDataHandler { return request; } - @Override - protected Position handlePosition(Position position) { - + protected void sendPosition(Position position) { String url; if (json) { url = this.url; @@ -166,19 +189,63 @@ public class WebDataHandler extends BaseDataHandler { } } - Invocation.Builder requestBuilder = client.target(url).request(); - - if (header != null && !header.isEmpty()) { - for (String line: header.split("\\r?\\n")) { - String[] values = line.split(":", 2); - requestBuilder.header(values[0].trim(), values[1].trim()); + if (!json || requestBuilder == null) { + requestBuilder = client.target(url).request(); + if (header != null && !header.isEmpty()) { + for (String line: header.split("\\r?\\n")) { + String[] values = line.split(":", 2); + requestBuilder.header(values[0].trim(), values[1].trim()); + } } } + InvocationCallback callback = new InvocationCallback() { + public void completed(Response response) { + if (positionsQueue != null) { + if (response.getStatus() == 200) { + positionsQueue.remove(); + } + logQueueStatus(); + sendInProgress.set(false); + sendQueuedPositions(); + } + } + public void failed(Throwable throwable) { + if (positionsQueue != null) { + logQueueStatus(); + sendInProgress.set(false); + sendQueuedPositions(); + } + } + }; + if (json) { - requestBuilder.async().post(Entity.json(prepareJsonPayload(position))); + requestBuilder.async().post(Entity.json(prepareJsonPayload(position)), callback); + } else { + requestBuilder.async().get(callback); + } + } + + protected void logQueueStatus() { + LOGGER.info(String.format("Position forwarding queue: %d/%d", + positionsQueue.size(), queueSize)); + } + + protected void sendQueuedPositions() { + if (!positionsQueue.isEmpty() && !sendInProgress.get()) { + sendInProgress.set(true); + sendPosition((Position) positionsQueue.get()); + } + } + + @Override + protected Position handlePosition(Position position) { + + if (positionsQueue == null) { + sendPosition(position); } else { - requestBuilder.async().get(); + positionsQueue.add(position); + sendQueuedPositions(); } return position; diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 2c5dcefd5..10eda01e2 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -100,6 +100,13 @@ public final class Keys { public static final ConfigKey FORWARD_JSON = new ConfigKey( "forward.json", Boolean.class); + /** + * Position forwarding queue size. A ring buffer of the specified size is used to queue positions. + * If negative, zero, or not specified, queueing is disabled. (Legacy behaviour) + */ + public static final ConfigKey FORWARD_QUEUE_SIZE = new ConfigKey( + "forward.queue.size", Integer.class); + /** * Boolean flag to enable or disable position filtering. */ -- cgit v1.2.3