From e4134911f17cbe6f556a8bc4d1b3f5073474a940 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 31 Mar 2024 13:39:13 -0700 Subject: Queue positions for processing --- src/main/java/org/traccar/ProcessingHandler.java | 29 +++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) (limited to 'src/main/java/org') diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index cbb375a72..6a97b9dea 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -56,9 +56,12 @@ import org.traccar.handler.network.AcknowledgementHandler; import org.traccar.helper.PositionLogger; import org.traccar.model.Position; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -73,6 +76,12 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B private final List eventHandlers; private final PostProcessHandler postProcessHandler; + private final Map> queues = new HashMap<>(); + + private synchronized Queue getQueue(long deviceId) { + return queues.computeIfAbsent(deviceId, k -> new LinkedList<>()); + } + @Inject public ProcessingHandler( Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) { @@ -129,7 +138,15 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B @Override public void onReleased(ChannelHandlerContext context, Position position) { - processPositionHandlers(context, position); + Queue queue = getQueue(position.getDeviceId()); + boolean queued; + synchronized (queue) { + queued = !queue.isEmpty(); + queue.offer(position); + } + if (!queued) { + processPositionHandlers(context, position); + } } private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { @@ -160,6 +177,16 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B postProcessHandler.handlePosition(position, p -> { positionLogger.log(ctx, p); ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); + + Queue queue = getQueue(position.getDeviceId()); + Position nextPosition; + synchronized (queue) { + queue.poll(); // remove current position + nextPosition = queue.peek(); + } + if (nextPosition != null) { + processPositionHandlers(ctx, nextPosition); + } }); } -- cgit v1.2.3