From 169c40cc7207a962f027aea85bd8f9c04b33738f Mon Sep 17 00:00:00 2001 From: Edward Valley Date: Sat, 31 Aug 2019 18:27:13 -0400 Subject: Refactor code to have concurrency --- src/main/java/org/traccar/WebDataHandler.java | 141 ++++++++++++++++---------- src/main/java/org/traccar/config/Keys.java | 7 ++ 2 files changed, 93 insertions(+), 55 deletions(-) (limited to 'src/main/java/org') diff --git a/src/main/java/org/traccar/WebDataHandler.java b/src/main/java/org/traccar/WebDataHandler.java index 193efd230..dfd6f0e5b 100644 --- a/src/main/java/org/traccar/WebDataHandler.java +++ b/src/main/java/org/traccar/WebDataHandler.java @@ -41,7 +41,7 @@ import java.util.Calendar; import java.util.Formatter; import java.util.Locale; import java.util.TimeZone; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.collections.Buffer; import org.apache.commons.collections.BufferUtils; @@ -67,27 +67,33 @@ public class WebDataHandler extends BaseDataHandler { private final boolean json; private final Integer queueSize; + private final Integer queueConcurrency; private Buffer positionsQueue = null; - private AtomicBoolean sendInProgress; - - private Invocation.Builder requestBuilder; + private AtomicInteger pendingRequests; @Inject public WebDataHandler( Config config, IdentityManager identityManager, ObjectMapper objectMapper, Client client) { + this.identityManager = identityManager; this.objectMapper = objectMapper; this.client = client; this.url = config.getString(Keys.FORWARD_URL); this.header = config.getString(Keys.FORWARD_HEADER); this.json = config.getBoolean(Keys.FORWARD_JSON); + Integer queueSize = (config.getInteger(Keys.FORWARD_QUEUE_SIZE, 0)); this.queueSize = (queueSize > 0) ? queueSize : 0; + + Integer queueConcurrency = (config.getInteger(Keys.FORWARD_QUEUE_CONCURRENCY, 1)); + this.queueConcurrency = (queueConcurrency > 0) ? queueConcurrency : 1; + if (this.queueSize > 0) { this.positionsQueue = BufferUtils.synchronizedBuffer(new CircularFifoBuffer(this.queueSize)); } - this.sendInProgress = new AtomicBoolean(false); + + this.pendingRequests = new AtomicInteger(0); } private static String formatSentence(Position position) { @@ -177,80 +183,105 @@ public class WebDataHandler extends BaseDataHandler { return request; } - protected void sendPosition(Position position) { - String url; - if (json) { - url = this.url; - } else { - try { - url = formatRequest(position); - } catch (UnsupportedEncodingException | JsonProcessingException e) { - throw new RuntimeException("Forwarding formatting error", e); + @Override + protected Position handlePosition(Position position) { + + class AsyncRequestAndCallback implements InvocationCallback { + + private Map jsonPayload; + private Invocation.Builder requestBuilder; + + AsyncRequestAndCallback(Position position) { + + String formattedUrl; + try { + formattedUrl = (json) ? url : formatRequest(position); + } catch (UnsupportedEncodingException | JsonProcessingException e) { + throw new RuntimeException("Forwarding formatting error", e); + } + + requestBuilder = client.target(formattedUrl).request(); + if (header != null && !header.isEmpty()) { + for (String line: header.split("\\r?\\n")) { + String[] values = line.split(":", 2); + requestBuilder.header(values[0].trim(), values[1].trim()); + } + } + + if (json) { + jsonPayload = prepareJsonPayload(position); + } + + pendingRequests.incrementAndGet(); + + send(); } - } - if (!json || requestBuilder == null) { - requestBuilder = client.target(url).request(); - if (header != null && !header.isEmpty()) { - for (String line: header.split("\\r?\\n")) { - String[] values = line.split(":", 2); - requestBuilder.header(values[0].trim(), values[1].trim()); + private void send() { + if (json) { + requestBuilder.async().post(Entity.json(jsonPayload), this); + } else { + requestBuilder.async().get(this); + } + } + + private void retry() { + try { + Thread.sleep(1000); + send(); + } catch (Exception e) { + } + } + + private void next() { + if (!positionsQueue.isEmpty()) { + new AsyncRequestAndCallback((Position) positionsQueue.remove()); } } - } - InvocationCallback callback = new InvocationCallback() { public void completed(Response response) { if (positionsQueue != null) { - if (response.getStatus() == 200) { - positionsQueue.remove(); + boolean ok = (response.getStatus() == 200); + boolean retry = (positionsQueue.size() < queueSize); + if (!ok && retry) { + retry(); + } else { + pendingRequests.decrementAndGet(); + while (!positionsQueue.isEmpty() && pendingRequests.get() < queueConcurrency) { + next(); + } } - logQueueStatus(); - sendInProgress.set(false); - sendQueuedPositions(); } } + public void failed(Throwable throwable) { if (positionsQueue != null) { - logQueueStatus(); - sendInProgress.set(false); - sendQueuedPositions(); + if (positionsQueue.size() < queueSize) { + retry(); + } else if (pendingRequests.decrementAndGet() == 0) { + next(); + } } } - }; - - if (json) { - requestBuilder.async().post(Entity.json(prepareJsonPayload(position)), callback); - } else { - requestBuilder.async().get(callback); - } - } - - protected void logQueueStatus() { - LOGGER.info(String.format("Position forwarding queue: %d/%d", - positionsQueue.size(), queueSize)); - } - protected void sendQueuedPositions() { - if (!positionsQueue.isEmpty() && !sendInProgress.get()) { - sendInProgress.set(true); - sendPosition((Position) positionsQueue.get()); } - } - @Override - protected Position handlePosition(Position position) { - - if (positionsQueue == null) { - sendPosition(position); + if (positionsQueue == null || pendingRequests.get() == 0) { + new AsyncRequestAndCallback(position); } else { positionsQueue.add(position); - sendQueuedPositions(); } + logQueueStatus(); + return position; } + private void logQueueStatus() { + LOGGER.info(String.format("Position forwarding queue: %d/%d/%d", + pendingRequests.get(), positionsQueue.size(), queueSize)); + } + private Map prepareJsonPayload(Position position) { Map data = new HashMap<>(); diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 10eda01e2..6102f7604 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -107,6 +107,13 @@ public final class Keys { public static final ConfigKey FORWARD_QUEUE_SIZE = new ConfigKey( "forward.queue.size", Integer.class); + /** + * Position forwarding queue concurrency. Defines how many HTTP requests can be pending at any time. + * If queueing is enabled, and this value is less than one, it's ignored and forced to be one. + */ + public static final ConfigKey FORWARD_QUEUE_CONCURRENCY = new ConfigKey( + "forward.queue.concurrency", Integer.class); + /** * Boolean flag to enable or disable position filtering. */ -- cgit v1.2.3