aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/traccar/WebDataHandler.java89
-rw-r--r--src/main/java/org/traccar/config/Keys.java7
2 files changed, 85 insertions, 11 deletions
diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java
index 64396de03..193efd230 100644
--- a/src/main/java/org/traccar/WebDataHandler.java
+++ b/src/main/java/org/traccar/WebDataHandler.java
@@ -27,9 +27,11 @@ import org.traccar.model.Position;
import org.traccar.model.Group;
import javax.inject.Inject;
+import javax.ws.rs.core.Response;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.InvocationCallback;
import java.util.HashMap;
import java.util.Map;
import java.io.UnsupportedEncodingException;
@@ -39,6 +41,14 @@ import java.util.Calendar;
import java.util.Formatter;
import java.util.Locale;
import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.collections.Buffer;
+import org.apache.commons.collections.BufferUtils;
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@ChannelHandler.Sharable
public class WebDataHandler extends BaseDataHandler {
@@ -46,6 +56,8 @@ public class WebDataHandler extends BaseDataHandler {
private static final String KEY_POSITION = "position";
private static final String KEY_DEVICE = "device";
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class);
+
private final IdentityManager identityManager;
private final ObjectMapper objectMapper;
private final Client client;
@@ -54,6 +66,13 @@ public class WebDataHandler extends BaseDataHandler {
private final String header;
private final boolean json;
+ private final Integer queueSize;
+
+ private Buffer positionsQueue = null;
+ private AtomicBoolean sendInProgress;
+
+ private Invocation.Builder requestBuilder;
+
@Inject
public WebDataHandler(
Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) {
@@ -63,6 +82,12 @@ public class WebDataHandler extends BaseDataHandler {
this.url = config.getString(Keys.FORWARD_URL);
this.header = config.getString(Keys.FORWARD_HEADER);
this.json = config.getBoolean(Keys.FORWARD_JSON);
+ Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0));
+ this.queueSize = (queueSize > 0) ? queueSize : 0;
+ if (this.queueSize > 0) {
+ this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize));
+ }
+ this.sendInProgress = new AtomicBoolean(false);
}
private static String formatSentence(Position position) {
@@ -152,9 +177,7 @@ public class WebDataHandler extends BaseDataHandler {
return request;
}
- @Override
- protected Position handlePosition(Position position) {
-
+ protected void sendPosition(Position position) {
String url;
if (json) {
url = this.url;
@@ -166,19 +189,63 @@ public class WebDataHandler extends BaseDataHandler {
}
}
- Invocation.Builder requestBuilder = client.target(url).request();
-
- if (header != null && !header.isEmpty()) {
- for (String line: header.split("\\r?\\n")) {
- String[] values = line.split(":", 2);
- requestBuilder.header(values[0].trim(), values[1].trim());
+ if (!json || requestBuilder == null) {
+ requestBuilder = client.target(url).request();
+ if (header != null && !header.isEmpty()) {
+ for (String line: header.split("\\r?\\n")) {
+ String[] values = line.split(":", 2);
+ requestBuilder.header(values[0].trim(), values[1].trim());
+ }
}
}
+ InvocationCallback<Response> callback = new InvocationCallback<Response>() {
+ public void completed(Response response) {
+ if (positionsQueue != null) {
+ if (response.getStatus() == 200) {
+ positionsQueue.remove();
+ }
+ logQueueStatus();
+ sendInProgress.set(false);
+ sendQueuedPositions();
+ }
+ }
+ public void failed(Throwable throwable) {
+ if (positionsQueue != null) {
+ logQueueStatus();
+ sendInProgress.set(false);
+ sendQueuedPositions();
+ }
+ }
+ };
+
if (json) {
- requestBuilder.async().post(Entity.json(prepareJsonPayload(position)));
+ requestBuilder.async().post(Entity.json(prepareJsonPayload(position)), callback);
+ } else {
+ requestBuilder.async().get(callback);
+ }
+ }
+
+ protected void logQueueStatus() {
+ LOGGER.info(String.format("Position forwarding queue: %d/%d",
+ positionsQueue.size(), queueSize));
+ }
+
+ protected void sendQueuedPositions() {
+ if (!positionsQueue.isEmpty() && !sendInProgress.get()) {
+ sendInProgress.set(true);
+ sendPosition((Position) positionsQueue.get());
+ }
+ }
+
+ @Override
+ protected Position handlePosition(Position position) {
+
+ if (positionsQueue == null) {
+ sendPosition(position);
} else {
- requestBuilder.async().get();
+ positionsQueue.add(position);
+ sendQueuedPositions();
}
return position;
diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java
index 2c5dcefd5..10eda01e2 100644
--- a/src/main/java/org/traccar/config/Keys.java
+++ b/src/main/java/org/traccar/config/Keys.java
@@ -101,6 +101,13 @@ public final class Keys {
"forward.json", Boolean.class);
/**
+ * Position forwarding queue size. A ring buffer of the specified size is used to queue positions.
+ * If negative, zero, or not specified, queueing is disabled. (Legacy behaviour)
+ */
+ public static final ConfigKey FORWARD_QUEUE_SIZE = new ConfigKey(
+ "forward.queue.size", Integer.class);
+
+ /**
* Boolean flag to enable or disable position filtering.
*/
public static final ConfigKey FILTER_ENABLE = new ConfigKey(