aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/WebDataHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/traccar/WebDataHandler.java')
-rw-r--r--src/main/java/org/traccar/WebDataHandler.java141
1 files changed, 124 insertions, 17 deletions
diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java
index 64396de03..d6bfb126b 100644
--- a/src/main/java/org/traccar/WebDataHandler.java
+++ b/src/main/java/org/traccar/WebDataHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015 - 2019 Anton Tananaev (anton@traccar.org)
+ * Copyright 2015 - 2020 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,12 @@ package org.traccar;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelHandler;
+import io.netty.util.Timer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.traccar.config.Config;
import org.traccar.config.Keys;
import org.traccar.database.IdentityManager;
@@ -27,9 +33,13 @@ import org.traccar.model.Position;
import org.traccar.model.Group;
import javax.inject.Inject;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.InvocationCallback;
import java.util.HashMap;
import java.util.Map;
import java.io.UnsupportedEncodingException;
@@ -39,10 +49,14 @@ import java.util.Calendar;
import java.util.Formatter;
import java.util.Locale;
import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
@ChannelHandler.Sharable
public class WebDataHandler extends BaseDataHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebDataHandler.class);
+
private static final String KEY_POSITION = "position";
private static final String KEY_DEVICE = "device";
@@ -53,16 +67,33 @@ public class WebDataHandler extends BaseDataHandler {
private final String url;
private final String header;
private final boolean json;
+ private final boolean urlVariables;
+
+ private final boolean retryEnabled;
+ private final int retryDelay;
+ private final int retryCount;
+ private final int retryLimit;
+
+ private AtomicInteger deliveryPending;
@Inject
public WebDataHandler(
Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) {
+
this.identityManager = identityManager;
this.objectMapper = objectMapper;
this.client = client;
this.url = config.getString(Keys.FORWARD_URL);
this.header = config.getString(Keys.FORWARD_HEADER);
this.json = config.getBoolean(Keys.FORWARD_JSON);
+ this.urlVariables = config.getBoolean(Keys.FORWARD_URL_VARIABLES);
+
+ this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE);
+ this.retryDelay = config.getInteger(Keys.FORWARD_RETRY_DELAY, 100);
+ this.retryCount = config.getInteger(Keys.FORWARD_RETRY_COUNT, 10);
+ this.retryLimit = config.getInteger(Keys.FORWARD_RETRY_LIMIT, 100);
+
+ this.deliveryPending = new AtomicInteger(0);
}
private static String formatSentence(Position position) {
@@ -152,35 +183,111 @@ public class WebDataHandler extends BaseDataHandler {
return request;
}
- @Override
- protected Position handlePosition(Position position) {
+ class AsyncRequestAndCallback implements InvocationCallback<Response>, TimerTask {
- String url;
- if (json) {
- url = this.url;
- } else {
+ private int retries = 0;
+ private Map<String, Object> payload;
+ private Invocation.Builder requestBuilder;
+ private MediaType mediaType = MediaType.APPLICATION_JSON_TYPE;
+
+ AsyncRequestAndCallback(Position position) {
+
+ String formattedUrl;
try {
- url = formatRequest(position);
+ formattedUrl = (json && !urlVariables) ? url : formatRequest(position);
} catch (UnsupportedEncodingException | JsonProcessingException e) {
throw new RuntimeException("Forwarding formatting error", e);
}
+
+ requestBuilder = client.target(formattedUrl).request();
+ if (header != null && !header.isEmpty()) {
+ for (String line: header.split("\\r?\\n")) {
+ String[] values = line.split(":", 2);
+ String headerName = values[0].trim();
+ String headerValue = values[1].trim();
+ if (headerName.equals(HttpHeaders.CONTENT_TYPE)) {
+ mediaType = MediaType.valueOf(headerValue);
+ } else {
+ requestBuilder.header(headerName, headerValue);
+ }
+ }
+ }
+
+ if (json) {
+ payload = prepareJsonPayload(position);
+ }
+
+ deliveryPending.incrementAndGet();
}
- Invocation.Builder requestBuilder = client.target(url).request();
+ private void send() {
+ if (json) {
+ try {
+ Entity<String> entity = Entity.entity(objectMapper.writeValueAsString(payload), mediaType);
+ requestBuilder.async().post(entity, this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize location to json", e);
+ }
+ } else {
+ requestBuilder.async().get(this);
+ }
+ }
- if (header != null && !header.isEmpty()) {
- for (String line: header.split("\\r?\\n")) {
- String[] values = line.split(":", 2);
- requestBuilder.header(values[0].trim(), values[1].trim());
+ private void retry() {
+ boolean scheduled = false;
+ try {
+ if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) {
+ schedule();
+ scheduled = true;
+ }
+ } finally {
+ int pending = scheduled ? deliveryPending.get() : deliveryPending.decrementAndGet();
+ LOGGER.warn("Position forwarding failed: " + pending + " pending");
}
}
- if (json) {
- requestBuilder.async().post(Entity.json(prepareJsonPayload(position)));
- } else {
- requestBuilder.async().get();
+ private void schedule() {
+ Main.getInjector().getInstance(Timer.class).newTimeout(
+ this, retryDelay * (int) Math.pow(2, retries++), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void completed(Response response) {
+ if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) {
+ deliveryPending.decrementAndGet();
+ } else {
+ retry();
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ retry();
+ }
+
+ @Override
+ public void run(Timeout timeout) {
+ boolean sent = false;
+ try {
+ if (!timeout.isCancelled()) {
+ send();
+ sent = true;
+ }
+ } finally {
+ if (!sent) {
+ deliveryPending.decrementAndGet();
+ }
+ }
}
+ }
+
+ @Override
+ protected Position handlePosition(Position position) {
+
+ AsyncRequestAndCallback request = new AsyncRequestAndCallback(position);
+ request.send();
+
return position;
}