aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/ProcessingHandler.java
diff options
context:
space:
mode:
authorAnton Tananaev <anton@traccar.org>2024-04-07 07:17:09 -0700
committerAnton Tananaev <anton@traccar.org>2024-04-07 07:17:09 -0700
commit9a285e59e580994dc9c3f80935f766f3dafdcd46 (patch)
tree6f3af805426b54ba044b580134398adc60c2b242 /src/main/java/org/traccar/ProcessingHandler.java
parent204b0a020a4bd2c3d17754ba183c39ea3035e0ae (diff)
downloadtrackermap-server-9a285e59e580994dc9c3f80935f766f3dafdcd46.tar.gz
trackermap-server-9a285e59e580994dc9c3f80935f766f3dafdcd46.tar.bz2
trackermap-server-9a285e59e580994dc9c3f80935f766f3dafdcd46.zip
Fix filtering handlerv6.0
Diffstat (limited to 'src/main/java/org/traccar/ProcessingHandler.java')
-rw-r--r--src/main/java/org/traccar/ProcessingHandler.java44
1 files changed, 26 insertions, 18 deletions
diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java
index 76a698a0a..fd048d127 100644
--- a/src/main/java/org/traccar/ProcessingHandler.java
+++ b/src/main/java/org/traccar/ProcessingHandler.java
@@ -155,15 +155,15 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B
var iterator = positionHandlers.iterator();
iterator.next().handlePosition(position, new BasePositionHandler.Callback() {
@Override
- public void processed(Position position) {
- if (position != null) {
+ public void processed(boolean filtered) {
+ if (!filtered) {
if (iterator.hasNext()) {
iterator.next().handlePosition(position, this);
} else {
processEventHandlers(ctx, position);
}
} else {
- finishedProcessing(ctx, null);
+ finishedProcessing(ctx, position, true);
}
}
});
@@ -172,24 +172,32 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B
private void processEventHandlers(ChannelHandlerContext ctx, Position position) {
eventHandlers.forEach(handler -> handler.analyzePosition(
position, (event) -> notificationManager.updateEvents(Map.of(event, position))));
- finishedProcessing(ctx, position);
+ finishedProcessing(ctx, position, false);
}
- private void finishedProcessing(ChannelHandlerContext ctx, Position position) {
- postProcessHandler.handlePosition(position, p -> {
- positionLogger.log(ctx, p);
- ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p));
+ private void finishedProcessing(ChannelHandlerContext ctx, Position position, boolean filtered) {
+ if (!filtered) {
+ postProcessHandler.handlePosition(position, ignore -> {
+ positionLogger.log(ctx, position);
+ ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position));
+ processNextPosition(ctx, position.getDeviceId());
+ });
+ } else {
+ ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position));
+ processNextPosition(ctx, position.getDeviceId());
+ }
+ }
- Queue<Position> queue = getQueue(position.getDeviceId());
- Position nextPosition;
- synchronized (queue) {
- queue.poll(); // remove current position
- nextPosition = queue.peek();
- }
- if (nextPosition != null) {
- processPositionHandlers(ctx, nextPosition);
- }
- });
+ private void processNextPosition(ChannelHandlerContext ctx, long deviceId) {
+ Queue<Position> queue = getQueue(deviceId);
+ Position nextPosition;
+ synchronized (queue) {
+ queue.poll(); // remove current position
+ nextPosition = queue.peek();
+ }
+ if (nextPosition != null) {
+ processPositionHandlers(ctx, nextPosition);
+ }
}
}