diff options
author | Anton Tananaev <anton@traccar.org> | 2024-03-31 13:39:13 -0700 |
---|---|---|
committer | Anton Tananaev <anton@traccar.org> | 2024-03-31 13:39:13 -0700 |
commit | e4134911f17cbe6f556a8bc4d1b3f5073474a940 (patch) | |
tree | 560049793b49548a2340c9739113a25039c237e2 /src/main/java | |
parent | 7de30c41131c76e6491c641cd46c76a1a3513957 (diff) | |
download | trackermap-server-e4134911f17cbe6f556a8bc4d1b3f5073474a940.tar.gz trackermap-server-e4134911f17cbe6f556a8bc4d1b3f5073474a940.tar.bz2 trackermap-server-e4134911f17cbe6f556a8bc4d1b3f5073474a940.zip |
Queue positions for processing
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/org/traccar/ProcessingHandler.java | 29 |
1 files changed, 28 insertions, 1 deletions
diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index cbb375a72..6a97b9dea 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -56,9 +56,12 @@ import org.traccar.handler.network.AcknowledgementHandler; import org.traccar.helper.PositionLogger; import org.traccar.model.Position; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -73,6 +76,12 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B private final List<BaseEventHandler> eventHandlers; private final PostProcessHandler postProcessHandler; + private final Map<Long, Queue<Position>> queues = new HashMap<>(); + + private synchronized Queue<Position> getQueue(long deviceId) { + return queues.computeIfAbsent(deviceId, k -> new LinkedList<>()); + } + @Inject public ProcessingHandler( Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) { @@ -129,7 +138,15 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B @Override public void onReleased(ChannelHandlerContext context, Position position) { - processPositionHandlers(context, position); + Queue<Position> queue = getQueue(position.getDeviceId()); + boolean queued; + synchronized (queue) { + queued = !queue.isEmpty(); + queue.offer(position); + } + if (!queued) { + processPositionHandlers(context, position); + } } private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { @@ -160,6 +177,16 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B postProcessHandler.handlePosition(position, p -> { positionLogger.log(ctx, p); ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); + + Queue<Position> queue = getQueue(position.getDeviceId()); + Position nextPosition; + synchronized (queue) { + queue.poll(); // remove current position + nextPosition = queue.peek(); + } + if (nextPosition != null) { + processPositionHandlers(ctx, nextPosition); + } }); } |