diff options
Diffstat (limited to 'src/main/java/org/traccar/ProcessingHandler.java')
-rw-r--r-- | src/main/java/org/traccar/ProcessingHandler.java | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 76a698a0a..fd048d127 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -155,15 +155,15 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B var iterator = positionHandlers.iterator(); iterator.next().handlePosition(position, new BasePositionHandler.Callback() { @Override - public void processed(Position position) { - if (position != null) { + public void processed(boolean filtered) { + if (!filtered) { if (iterator.hasNext()) { iterator.next().handlePosition(position, this); } else { processEventHandlers(ctx, position); } } else { - finishedProcessing(ctx, null); + finishedProcessing(ctx, position, true); } } }); @@ -172,24 +172,32 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B private void processEventHandlers(ChannelHandlerContext ctx, Position position) { eventHandlers.forEach(handler -> handler.analyzePosition( position, (event) -> notificationManager.updateEvents(Map.of(event, position)))); - finishedProcessing(ctx, position); + finishedProcessing(ctx, position, false); } - private void finishedProcessing(ChannelHandlerContext ctx, Position position) { - postProcessHandler.handlePosition(position, p -> { - positionLogger.log(ctx, p); - ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); + private void finishedProcessing(ChannelHandlerContext ctx, Position position, boolean filtered) { + if (!filtered) { + postProcessHandler.handlePosition(position, ignore -> { + positionLogger.log(ctx, position); + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + processNextPosition(ctx, position.getDeviceId()); + }); + } else { + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + processNextPosition(ctx, position.getDeviceId()); + } + } - Queue<Position> queue = getQueue(position.getDeviceId()); - Position nextPosition; - synchronized (queue) { - queue.poll(); // remove current position - nextPosition = queue.peek(); - } - if (nextPosition != null) { - processPositionHandlers(ctx, nextPosition); - } - }); + private void processNextPosition(ChannelHandlerContext ctx, long deviceId) { + Queue<Position> queue = getQueue(deviceId); + Position nextPosition; + synchronized (queue) { + queue.poll(); // remove current position + nextPosition = queue.peek(); + } + if (nextPosition != null) { + processPositionHandlers(ctx, nextPosition); + } } } |