aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/traccar/MainModule.java2
-rw-r--r--src/main/java/org/traccar/WebDataHandler.java142
2 files changed, 74 insertions, 70 deletions
diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java
index 24448ef51..1d5508f5a 100644
--- a/src/main/java/org/traccar/MainModule.java
+++ b/src/main/java/org/traccar/MainModule.java
@@ -378,7 +378,7 @@ public class MainModule extends AbstractModule {
@Singleton
@Provides
- public static Timer provideGlobalTimer() {
+ public static Timer provideTimer() {
return GlobalTimer.getTimer();
}
diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java
index 58b9ca73b..39e54616b 100644
--- a/src/main/java/org/traccar/WebDataHandler.java
+++ b/src/main/java/org/traccar/WebDataHandler.java
@@ -53,11 +53,11 @@ import java.util.concurrent.atomic.AtomicInteger;
@ChannelHandler.Sharable
public class WebDataHandler extends BaseDataHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class);
+
private static final String KEY_POSITION = "position";
private static final String KEY_DEVICE = "device";
- private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class);
-
private final IdentityManager identityManager;
private final ObjectMapper objectMapper;
private final Client client;
@@ -179,94 +179,98 @@ public class WebDataHandler extends BaseDataHandler {
return request;
}
- @Override
- protected Position handlePosition(Position position) {
+ class AsyncRequestAndCallback implements InvocationCallback<Response>, TimerTask {
- class AsyncRequestAndCallback implements InvocationCallback<Response> {
+ private int retries = 0;
+ private Map<String, Object> payload;
+ private Invocation.Builder requestBuilder;
- private int retries = 0;
- private Map<String, Object> payload;
- private Invocation.Builder requestBuilder;
+ AsyncRequestAndCallback(Position position) {
- AsyncRequestAndCallback(Position position) {
-
- String formattedUrl;
- try {
- formattedUrl = json ? url : formatRequest(position);
- } catch (UnsupportedEncodingException | JsonProcessingException e) {
- throw new RuntimeException("Forwarding formatting error", e);
- }
+ String formattedUrl;
+ try {
+ formattedUrl = json ? url : formatRequest(position);
+ } catch (UnsupportedEncodingException | JsonProcessingException e) {
+ throw new RuntimeException("Forwarding formatting error", e);
+ }
- requestBuilder = client.target(formattedUrl).request();
- if (header != null && !header.isEmpty()) {
- for (String line: header.split("\\r?\\n")) {
- String[] values = line.split(":", 2);
- requestBuilder.header(values[0].trim(), values[1].trim());
- }
+ requestBuilder = client.target(formattedUrl).request();
+ if (header != null && !header.isEmpty()) {
+ for (String line: header.split("\\r?\\n")) {
+ String[] values = line.split(":", 2);
+ requestBuilder.header(values[0].trim(), values[1].trim());
}
+ }
- if (json) {
- payload = prepareJsonPayload(position);
- }
+ if (json) {
+ payload = prepareJsonPayload(position);
+ }
- deliveryPending.incrementAndGet();
+ deliveryPending.incrementAndGet();
+ }
- send();
+ private void send() {
+ if (json) {
+ requestBuilder.async().post(Entity.json(payload), this);
+ } else {
+ requestBuilder.async().get(this);
}
+ }
- private void send() {
- if (json) {
- requestBuilder.async().post(Entity.json(payload), this);
- } else {
- requestBuilder.async().get(this);
+ private void retry() {
+ boolean scheduled = false;
+ try {
+ if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) {
+ schedule();
+ scheduled = true;
}
+ } finally {
+ int pending = scheduled ? deliveryPending.get() : deliveryPending.decrementAndGet();
+ LOGGER.warn("Position forwarding failed: " + pending + " pending");
}
+ }
- private void retry() {
- boolean ok = false;
- try {
- if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) {
- Main.getInjector().getInstance(Timer.class).newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) {
- boolean ok = false;
- try {
- if (!timeout.isCancelled()) {
- send();
- ok = true;
- }
- } finally {
- if (!ok) {
- deliveryPending.decrementAndGet();
- }
- }
- }
- }, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS);
- ok = true;
- }
- } finally {
- int pending = ok ? deliveryPending.get() : deliveryPending.decrementAndGet();
- LOGGER.warn("Position forwarding failed: " + pending + " pending");
- }
+ private void schedule() {
+ Main.getInjector().getInstance(Timer.class).newTimeout(
+ this, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void completed(Response response) {
+ if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) {
+ deliveryPending.decrementAndGet();
+ } else {
+ retry();
}
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ retry();
+ }
- @Override
- public void completed(Response response) {
- if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) {
+ @Override
+ public void run(Timeout timeout) {
+ boolean sent = false;
+ try {
+ if (!timeout.isCancelled()) {
+ send();
+ sent = true;
+ }
+ } finally {
+ if (!sent) {
deliveryPending.decrementAndGet();
- } else {
- retry();
}
}
+ }
- @Override
- public void failed(Throwable throwable) {
- retry();
- }
+ }
- }
+ @Override
+ protected Position handlePosition(Position position) {
- new AsyncRequestAndCallback(position);
+ AsyncRequestAndCallback request = new AsyncRequestAndCallback(position);
+ request.send();
return position;
}