aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/ProcessingHandler.java
diff options
context:
space:
mode:
authorAnton Tananaev <anton@traccar.org>2024-03-31 13:39:13 -0700
committerAnton Tananaev <anton@traccar.org>2024-03-31 13:39:13 -0700
commite4134911f17cbe6f556a8bc4d1b3f5073474a940 (patch)
tree560049793b49548a2340c9739113a25039c237e2 /src/main/java/org/traccar/ProcessingHandler.java
parent7de30c41131c76e6491c641cd46c76a1a3513957 (diff)
downloadtrackermap-server-e4134911f17cbe6f556a8bc4d1b3f5073474a940.tar.gz
trackermap-server-e4134911f17cbe6f556a8bc4d1b3f5073474a940.tar.bz2
trackermap-server-e4134911f17cbe6f556a8bc4d1b3f5073474a940.zip
Queue positions for processing
Diffstat (limited to 'src/main/java/org/traccar/ProcessingHandler.java')
-rw-r--r--src/main/java/org/traccar/ProcessingHandler.java29
1 files changed, 28 insertions, 1 deletions
diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java
index cbb375a72..6a97b9dea 100644
--- a/src/main/java/org/traccar/ProcessingHandler.java
+++ b/src/main/java/org/traccar/ProcessingHandler.java
@@ -56,9 +56,12 @@ import org.traccar.handler.network.AcknowledgementHandler;
import org.traccar.helper.PositionLogger;
import org.traccar.model.Position;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Queue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -73,6 +76,12 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B
private final List<BaseEventHandler> eventHandlers;
private final PostProcessHandler postProcessHandler;
+ private final Map<Long, Queue<Position>> queues = new HashMap<>();
+
+ private synchronized Queue<Position> getQueue(long deviceId) {
+ return queues.computeIfAbsent(deviceId, k -> new LinkedList<>());
+ }
+
@Inject
public ProcessingHandler(
Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) {
@@ -129,7 +138,15 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B
@Override
public void onReleased(ChannelHandlerContext context, Position position) {
- processPositionHandlers(context, position);
+ Queue<Position> queue = getQueue(position.getDeviceId());
+ boolean queued;
+ synchronized (queue) {
+ queued = !queue.isEmpty();
+ queue.offer(position);
+ }
+ if (!queued) {
+ processPositionHandlers(context, position);
+ }
}
private void processPositionHandlers(ChannelHandlerContext ctx, Position position) {
@@ -160,6 +177,16 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B
postProcessHandler.handlePosition(position, p -> {
positionLogger.log(ctx, p);
ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p));
+
+ Queue<Position> queue = getQueue(position.getDeviceId());
+ Position nextPosition;
+ synchronized (queue) {
+ queue.poll(); // remove current position
+ nextPosition = queue.peek();
+ }
+ if (nextPosition != null) {
+ processPositionHandlers(ctx, nextPosition);
+ }
});
}