From ae91b2f07e88d2dea97aa4bb316cb00116b4a702 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 31 Mar 2024 08:16:28 -0700 Subject: Implement position buffering --- src/main/java/org/traccar/ProcessingHandler.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'src/main/java/org/traccar/ProcessingHandler.java') diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 688389d98..cbb375a72 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -21,12 +21,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.traccar.config.Config; +import org.traccar.database.BufferingManager; import org.traccar.database.NotificationManager; import org.traccar.handler.BasePositionHandler; import org.traccar.handler.ComputedAttributesHandler; import org.traccar.handler.CopyAttributesHandler; import org.traccar.handler.DatabaseHandler; -import org.traccar.handler.PostProcessHandler; import org.traccar.handler.DistanceHandler; import org.traccar.handler.EngineHoursHandler; import org.traccar.handler.FilterHandler; @@ -36,6 +37,7 @@ import org.traccar.handler.GeolocationHandler; import org.traccar.handler.HemisphereHandler; import org.traccar.handler.MotionHandler; import org.traccar.handler.PositionForwardingHandler; +import org.traccar.handler.PostProcessHandler; import org.traccar.handler.SpeedLimitHandler; import org.traccar.handler.TimeHandler; import org.traccar.handler.events.AlertEventHandler; @@ -62,19 +64,21 @@ import java.util.stream.Stream; @Singleton @ChannelHandler.Sharable -public class ProcessingHandler extends ChannelInboundHandlerAdapter { +public class ProcessingHandler extends ChannelInboundHandlerAdapter implements BufferingManager.Callback { private final NotificationManager notificationManager; private final PositionLogger positionLogger; + private final BufferingManager bufferingManager; private final List positionHandlers; private final List eventHandlers; private final PostProcessHandler postProcessHandler; @Inject public ProcessingHandler( - Injector injector, NotificationManager notificationManager, PositionLogger positionLogger) { + Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) { this.notificationManager = notificationManager; this.positionLogger = positionLogger; + bufferingManager = new BufferingManager(config, this); positionHandlers = Stream.of( TimeHandler.class, @@ -117,12 +121,17 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Position) { - processPositionHandlers(ctx, (Position) msg); + bufferingManager.accept(ctx, (Position) msg); } else { super.channelRead(ctx, msg); } } + @Override + public void onReleased(ChannelHandlerContext context, Position position) { + processPositionHandlers(context, position); + } + private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { var iterator = positionHandlers.iterator(); iterator.next().handlePosition(position, new BasePositionHandler.Callback() { -- cgit v1.2.3