aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/traccar/WebDataHandler.java141
-rw-r--r--src/main/java/org/traccar/config/Keys.java7
2 files changed, 93 insertions, 55 deletions
diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java
index 193efd230..dfd6f0e5b 100644
--- a/src/main/java/org/traccar/WebDataHandler.java
+++ b/src/main/java/org/traccar/WebDataHandler.java
@@ -41,7 +41,7 @@ import java.util.Calendar;
import java.util.Formatter;
import java.util.Locale;
import java.util.TimeZone;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.BufferUtils;
@@ -67,27 +67,33 @@ public class WebDataHandler extends BaseDataHandler {
private final boolean json;
private final Integer queueSize;
+ private final Integer queueConcurrency;
private Buffer positionsQueue = null;
- private AtomicBoolean sendInProgress;
-
- private Invocation.Builder requestBuilder;
+ private AtomicInteger pendingRequests;
@Inject
public WebDataHandler(
Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) {
+
this.identityManager = identityManager;
this.objectMapper = objectMapper;
this.client = client;
this.url = config.getString(Keys.FORWARD_URL);
this.header = config.getString(Keys.FORWARD_HEADER);
this.json = config.getBoolean(Keys.FORWARD_JSON);
+
Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0));
this.queueSize = (queueSize > 0) ? queueSize : 0;
+
+ Integer queueConcurrency = (config.getInteger(Keys.FORWARD_QUEUE_CONCURRENCY, 1));
+ this.queueConcurrency = (queueConcurrency > 0) ? queueConcurrency : 1;
+
if (this.queueSize > 0) {
this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize));
}
- this.sendInProgress = new AtomicBoolean(false);
+
+ this.pendingRequests = new AtomicInteger(0);
}
private static String formatSentence(Position position) {
@@ -177,80 +183,105 @@ public class WebDataHandler extends BaseDataHandler {
return request;
}
- protected void sendPosition(Position position) {
- String url;
- if (json) {
- url = this.url;
- } else {
- try {
- url = formatRequest(position);
- } catch (UnsupportedEncodingException | JsonProcessingException e) {
- throw new RuntimeException("Forwarding formatting error", e);
+ @Override
+ protected Position handlePosition(Position position) {
+
+ class AsyncRequestAndCallback implements InvocationCallback<Response> {
+
+ private Map<String, Object> jsonPayload;
+ private Invocation.Builder requestBuilder;
+
+ AsyncRequestAndCallback(Position position) {
+
+ String formattedUrl;
+ try {
+ formattedUrl = (json) ? url : formatRequest(position);
+ } catch (UnsupportedEncodingException | JsonProcessingException e) {
+ throw new RuntimeException("Forwarding formatting error", e);
+ }
+
+ requestBuilder = client.target(formattedUrl).request();
+ if (header != null && !header.isEmpty()) {
+ for (String line: header.split("\\r?\\n")) {
+ String[] values = line.split(":", 2);
+ requestBuilder.header(values[0].trim(), values[1].trim());
+ }
+ }
+
+ if (json) {
+ jsonPayload = prepareJsonPayload(position);
+ }
+
+ pendingRequests.incrementAndGet();
+
+ send();
}
- }
- if (!json || requestBuilder == null) {
- requestBuilder = client.target(url).request();
- if (header != null && !header.isEmpty()) {
- for (String line: header.split("\\r?\\n")) {
- String[] values = line.split(":", 2);
- requestBuilder.header(values[0].trim(), values[1].trim());
+ private void send() {
+ if (json) {
+ requestBuilder.async().post(Entity.json(jsonPayload), this);
+ } else {
+ requestBuilder.async().get(this);
+ }
+ }
+
+ private void retry() {
+ try {
+ Thread.sleep(1000);
+ send();
+ } catch (Exception e) {
+ }
+ }
+
+ private void next() {
+ if (!positionsQueue.isEmpty()) {
+ new AsyncRequestAndCallback((Position) positionsQueue.remove());
}
}
- }
- InvocationCallback<Response> callback = new InvocationCallback<Response>() {
public void completed(Response response) {
if (positionsQueue != null) {
- if (response.getStatus() == 200) {
- positionsQueue.remove();
+ boolean ok = (response.getStatus() == 200);
+ boolean retry = (positionsQueue.size() < queueSize);
+ if (!ok && retry) {
+ retry();
+ } else {
+ pendingRequests.decrementAndGet();
+ while (!positionsQueue.isEmpty() && pendingRequests.get() < queueConcurrency) {
+ next();
+ }
}
- logQueueStatus();
- sendInProgress.set(false);
- sendQueuedPositions();
}
}
+
public void failed(Throwable throwable) {
if (positionsQueue != null) {
- logQueueStatus();
- sendInProgress.set(false);
- sendQueuedPositions();
+ if (positionsQueue.size() < queueSize) {
+ retry();
+ } else if (pendingRequests.decrementAndGet() == 0) {
+ next();
+ }
}
}
- };
-
- if (json) {
- requestBuilder.async().post(Entity.json(prepareJsonPayload(position)), callback);
- } else {
- requestBuilder.async().get(callback);
- }
- }
-
- protected void logQueueStatus() {
- LOGGER.info(String.format("Position forwarding queue: %d/%d",
- positionsQueue.size(), queueSize));
- }
- protected void sendQueuedPositions() {
- if (!positionsQueue.isEmpty() && !sendInProgress.get()) {
- sendInProgress.set(true);
- sendPosition((Position) positionsQueue.get());
}
- }
- @Override
- protected Position handlePosition(Position position) {
-
- if (positionsQueue == null) {
- sendPosition(position);
+ if (positionsQueue == null || pendingRequests.get() == 0) {
+ new AsyncRequestAndCallback(position);
} else {
positionsQueue.add(position);
- sendQueuedPositions();
}
+ logQueueStatus();
+
return position;
}
+ private void logQueueStatus() {
+ LOGGER.info(String.format("Position forwarding queue: %d/%d/%d",
+ pendingRequests.get(), positionsQueue.size(), queueSize));
+ }
+
private Map<String, Object> prepareJsonPayload(Position position) {
Map<String, Object> data = new HashMap<>();
diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java
index 10eda01e2..6102f7604 100644
--- a/src/main/java/org/traccar/config/Keys.java
+++ b/src/main/java/org/traccar/config/Keys.java
@@ -108,6 +108,13 @@ public final class Keys {
"forward.queue.size", Integer.class);
/**
+ * Position forwarding queue concurrency. Defines how many HTTP requests can be pending at any time.
+ * If queueing is enabled, and this value is less than one, it's ignored and forced to be one.
+ */
+ public static final ConfigKey FORWARD_QUEUE_CONCURRENCY = new ConfigKey(
+ "forward.queue.concurrency", Integer.class);
+
+ /**
* Boolean flag to enable or disable position filtering.
*/
public static final ConfigKey FILTER_ENABLE = new ConfigKey(