aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Tananaev <anton.tananaev@gmail.com>2020-02-25 17:40:26 -0800
committerGitHub <noreply@github.com>2020-02-25 17:40:26 -0800
commit9662f1d3b813bbdead7613f6919e23b6fe76c6db (patch)
tree577ac39d17687a07531194e40bbbc23764a98d1a
parent7db2926c38edafb4cbd9b2cda1a98b5567836bc1 (diff)
parent1049b01a22e28acb69ea530de824608b72c8222c (diff)
downloadtraccar-server-9662f1d3b813bbdead7613f6919e23b6fe76c6db.tar.gz
traccar-server-9662f1d3b813bbdead7613f6919e23b6fe76c6db.tar.bz2
traccar-server-9662f1d3b813bbdead7613f6919e23b6fe76c6db.zip
Merge pull request #4389 from edvalley/proposed
Add an optional queue to web data handler
-rw-r--r--src/main/java/org/traccar/MainModule.java7
-rw-r--r--src/main/java/org/traccar/WebDataHandler.java125
-rw-r--r--src/main/java/org/traccar/config/Keys.java31
3 files changed, 146 insertions, 17 deletions
diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java
index 0957d9fe3..c3c0182d7 100644
--- a/src/main/java/org/traccar/MainModule.java
+++ b/src/main/java/org/traccar/MainModule.java
@@ -73,6 +73,7 @@ import org.traccar.reports.model.TripsConfig;
import javax.annotation.Nullable;
import javax.ws.rs.client.Client;
+import io.netty.util.Timer;
public class MainModule extends AbstractModule {
@@ -375,6 +376,12 @@ public class MainModule extends AbstractModule {
return new DriverEventHandler(identityManager);
}
+ @Singleton
+ @Provides
+ public static Timer provideTimer() {
+ return GlobalTimer.getTimer();
+ }
+
@Override
protected void configure() {
binder().requireExplicitBindings();
diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java
index 64396de03..39e54616b 100644
--- a/src/main/java/org/traccar/WebDataHandler.java
+++ b/src/main/java/org/traccar/WebDataHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015 - 2019 Anton Tananaev (anton@traccar.org)
+ * Copyright 2015 - 2020 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,12 @@ package org.traccar;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelHandler;
+import io.netty.util.Timer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.traccar.config.Config;
import org.traccar.config.Keys;
import org.traccar.database.IdentityManager;
@@ -27,9 +33,11 @@ import org.traccar.model.Position;
import org.traccar.model.Group;
import javax.inject.Inject;
+import javax.ws.rs.core.Response;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.InvocationCallback;
import java.util.HashMap;
import java.util.Map;
import java.io.UnsupportedEncodingException;
@@ -39,10 +47,14 @@ import java.util.Calendar;
import java.util.Formatter;
import java.util.Locale;
import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
@ChannelHandler.Sharable
public class WebDataHandler extends BaseDataHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class);
+
private static final String KEY_POSITION = "position";
private static final String KEY_DEVICE = "device";
@@ -54,15 +66,30 @@ public class WebDataHandler extends BaseDataHandler {
private final String header;
private final boolean json;
+ private final boolean retryEnabled;
+ private final int retryDelay;
+ private final int retryCount;
+ private final int retryLimit;
+
+ private AtomicInteger deliveryPending;
+
@Inject
public WebDataHandler(
Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) {
+
this.identityManager = identityManager;
this.objectMapper = objectMapper;
this.client = client;
this.url = config.getString(Keys.FORWARD_URL);
this.header = config.getString(Keys.FORWARD_HEADER);
this.json = config.getBoolean(Keys.FORWARD_JSON);
+
+ this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE);
+ this.retryDelay = config.getInteger(Keys.FORWARD_RETRY_DELAY, 100);
+ this.retryCount = config.getInteger(Keys.FORWARD_RETRY_COUNT, 10);
+ this.retryLimit = config.getInteger(Keys.FORWARD_RETRY_LIMIT, 100);
+
+ this.deliveryPending = new AtomicInteger(0);
}
private static String formatSentence(Position position) {
@@ -152,35 +179,99 @@ public class WebDataHandler extends BaseDataHandler {
return request;
}
- @Override
- protected Position handlePosition(Position position) {
+ class AsyncRequestAndCallback implements InvocationCallback<Response>, TimerTask {
- String url;
- if (json) {
- url = this.url;
- } else {
+ private int retries = 0;
+ private Map<String, Object> payload;
+ private Invocation.Builder requestBuilder;
+
+ AsyncRequestAndCallback(Position position) {
+
+ String formattedUrl;
try {
- url = formatRequest(position);
+ formattedUrl = json ? url : formatRequest(position);
} catch (UnsupportedEncodingException | JsonProcessingException e) {
throw new RuntimeException("Forwarding formatting error", e);
}
+
+ requestBuilder = client.target(formattedUrl).request();
+ if (header != null && !header.isEmpty()) {
+ for (String line: header.split("\\r?\\n")) {
+ String[] values = line.split(":", 2);
+ requestBuilder.header(values[0].trim(), values[1].trim());
+ }
+ }
+
+ if (json) {
+ payload = prepareJsonPayload(position);
+ }
+
+ deliveryPending.incrementAndGet();
+ }
+
+ private void send() {
+ if (json) {
+ requestBuilder.async().post(Entity.json(payload), this);
+ } else {
+ requestBuilder.async().get(this);
+ }
+ }
+
+ private void retry() {
+ boolean scheduled = false;
+ try {
+ if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) {
+ schedule();
+ scheduled = true;
+ }
+ } finally {
+ int pending = scheduled ? deliveryPending.get() : deliveryPending.decrementAndGet();
+ LOGGER.warn("Position forwarding failed: " + pending + " pending");
+ }
}
- Invocation.Builder requestBuilder = client.target(url).request();
+ private void schedule() {
+ Main.getInjector().getInstance(Timer.class).newTimeout(
+ this, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS);
+ }
- if (header != null && !header.isEmpty()) {
- for (String line: header.split("\\r?\\n")) {
- String[] values = line.split(":", 2);
- requestBuilder.header(values[0].trim(), values[1].trim());
+ @Override
+ public void completed(Response response) {
+ if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) {
+ deliveryPending.decrementAndGet();
+ } else {
+ retry();
}
}
- if (json) {
- requestBuilder.async().post(Entity.json(prepareJsonPayload(position)));
- } else {
- requestBuilder.async().get();
+ @Override
+ public void failed(Throwable throwable) {
+ retry();
+ }
+
+ @Override
+ public void run(Timeout timeout) {
+ boolean sent = false;
+ try {
+ if (!timeout.isCancelled()) {
+ send();
+ sent = true;
+ }
+ } finally {
+ if (!sent) {
+ deliveryPending.decrementAndGet();
+ }
+ }
}
+ }
+
+ @Override
+ protected Position handlePosition(Position position) {
+
+ AsyncRequestAndCallback request = new AsyncRequestAndCallback(position);
+ request.send();
+
return position;
}
diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java
index 2c5dcefd5..d88b36d28 100644
--- a/src/main/java/org/traccar/config/Keys.java
+++ b/src/main/java/org/traccar/config/Keys.java
@@ -101,6 +101,37 @@ public final class Keys {
"forward.json", Boolean.class);
/**
+ * Position forwarding retrying enable. When enabled, additional attempts are made to deliver positions. If initial
+ * delivery fails, because of an unreachable server or an HTTP response different from '2xx', the software waits
+ * for 'forward.retry.delay' milliseconds to retry delivery. On subsequent failures, this delay is duplicated.
+ * If forwarding is retried for 'forward.retry.count', retrying is canceled and the position is dropped. Positions
+ * pending to be delivered are limited to 'forward.retry.limit'. If this limit is reached, positions get discarded.
+ */
+ public static final ConfigKey FORWARD_RETRY_ENABLE = new ConfigKey(
+ "forward.retry.enable", Boolean.class);
+
+ /**
+ * Position forwarding retry first delay in milliseconds.
+ * Can be set to anything greater than 0. Defaults to 100 milliseconds.
+ */
+ public static final ConfigKey FORWARD_RETRY_DELAY = new ConfigKey(
+ "forward.retry.delay", Integer.class);
+
+ /**
+ * Position forwarding retry maximum retries.
+ * Can be set to anything greater than 0. Defaults to 10 retries.
+ */
+ public static final ConfigKey FORWARD_RETRY_COUNT = new ConfigKey(
+ "forward.retry.count", Integer.class);
+
+ /**
+ * Position forwarding retry pending positions limit.
+ * Can be set to anything greater than 0. Defaults to 100 positions.
+ */
+ public static final ConfigKey FORWARD_RETRY_LIMIT = new ConfigKey(
+ "forward.retry.limit", Integer.class);
+
+ /**
* Boolean flag to enable or disable position filtering.
*/
public static final ConfigKey FILTER_ENABLE = new ConfigKey(