aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/ProcessingHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/traccar/ProcessingHandler.java')
-rw-r--r--src/main/java/org/traccar/ProcessingHandler.java17
1 files changed, 13 insertions, 4 deletions
diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java
index 688389d98..cbb375a72 100644
--- a/src/main/java/org/traccar/ProcessingHandler.java
+++ b/src/main/java/org/traccar/ProcessingHandler.java
@@ -21,12 +21,13 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
+import org.traccar.config.Config;
+import org.traccar.database.BufferingManager;
import org.traccar.database.NotificationManager;
import org.traccar.handler.BasePositionHandler;
import org.traccar.handler.ComputedAttributesHandler;
import org.traccar.handler.CopyAttributesHandler;
import org.traccar.handler.DatabaseHandler;
-import org.traccar.handler.PostProcessHandler;
import org.traccar.handler.DistanceHandler;
import org.traccar.handler.EngineHoursHandler;
import org.traccar.handler.FilterHandler;
@@ -36,6 +37,7 @@ import org.traccar.handler.GeolocationHandler;
import org.traccar.handler.HemisphereHandler;
import org.traccar.handler.MotionHandler;
import org.traccar.handler.PositionForwardingHandler;
+import org.traccar.handler.PostProcessHandler;
import org.traccar.handler.SpeedLimitHandler;
import org.traccar.handler.TimeHandler;
import org.traccar.handler.events.AlertEventHandler;
@@ -62,19 +64,21 @@ import java.util.stream.Stream;
@Singleton
@ChannelHandler.Sharable
-public class ProcessingHandler extends ChannelInboundHandlerAdapter {
+public class ProcessingHandler extends ChannelInboundHandlerAdapter implements BufferingManager.Callback {
private final NotificationManager notificationManager;
private final PositionLogger positionLogger;
+ private final BufferingManager bufferingManager;
private final List<BasePositionHandler> positionHandlers;
private final List<BaseEventHandler> eventHandlers;
private final PostProcessHandler postProcessHandler;
@Inject
public ProcessingHandler(
- Injector injector, NotificationManager notificationManager, PositionLogger positionLogger) {
+ Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) {
this.notificationManager = notificationManager;
this.positionLogger = positionLogger;
+ bufferingManager = new BufferingManager(config, this);
positionHandlers = Stream.of(
TimeHandler.class,
@@ -117,12 +121,17 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Position) {
- processPositionHandlers(ctx, (Position) msg);
+ bufferingManager.accept(ctx, (Position) msg);
} else {
super.channelRead(ctx, msg);
}
}
+ @Override
+ public void onReleased(ChannelHandlerContext context, Position position) {
+ processPositionHandlers(context, position);
+ }
+
private void processPositionHandlers(ChannelHandlerContext ctx, Position position) {
var iterator = positionHandlers.iterator();
iterator.next().handlePosition(position, new BasePositionHandler.Callback() {