diff options
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/org/traccar/ProcessingHandler.java | 17 | ||||
-rw-r--r-- | src/main/java/org/traccar/config/Keys.java | 8 | ||||
-rw-r--r-- | src/main/java/org/traccar/database/BufferingManager.java | 103 |
3 files changed, 124 insertions, 4 deletions
diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 688389d98..cbb375a72 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -21,12 +21,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.traccar.config.Config; +import org.traccar.database.BufferingManager; import org.traccar.database.NotificationManager; import org.traccar.handler.BasePositionHandler; import org.traccar.handler.ComputedAttributesHandler; import org.traccar.handler.CopyAttributesHandler; import org.traccar.handler.DatabaseHandler; -import org.traccar.handler.PostProcessHandler; import org.traccar.handler.DistanceHandler; import org.traccar.handler.EngineHoursHandler; import org.traccar.handler.FilterHandler; @@ -36,6 +37,7 @@ import org.traccar.handler.GeolocationHandler; import org.traccar.handler.HemisphereHandler; import org.traccar.handler.MotionHandler; import org.traccar.handler.PositionForwardingHandler; +import org.traccar.handler.PostProcessHandler; import org.traccar.handler.SpeedLimitHandler; import org.traccar.handler.TimeHandler; import org.traccar.handler.events.AlertEventHandler; @@ -62,19 +64,21 @@ import java.util.stream.Stream; @Singleton @ChannelHandler.Sharable -public class ProcessingHandler extends ChannelInboundHandlerAdapter { +public class ProcessingHandler extends ChannelInboundHandlerAdapter implements BufferingManager.Callback { private final NotificationManager notificationManager; private final PositionLogger positionLogger; + private final BufferingManager bufferingManager; private final List<BasePositionHandler> positionHandlers; private final List<BaseEventHandler> eventHandlers; private final PostProcessHandler postProcessHandler; @Inject public ProcessingHandler( - Injector injector, NotificationManager notificationManager, PositionLogger positionLogger) { + Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) { this.notificationManager = notificationManager; this.positionLogger = positionLogger; + bufferingManager = new BufferingManager(config, this); positionHandlers = Stream.of( TimeHandler.class, @@ -117,12 +121,17 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Position) { - processPositionHandlers(ctx, (Position) msg); + bufferingManager.accept(ctx, (Position) msg); } else { super.channelRead(ctx, msg); } } + @Override + public void onReleased(ChannelHandlerContext context, Position position) { + processPositionHandlers(context, position); + } + private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { var iterator = positionHandlers.iterator(); iterator.next().handlePosition(position, new BasePositionHandler.Callback() { diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 4aacb2cd8..d346084bd 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -293,6 +293,14 @@ public final class Keys { false); /** + * If not zero, enable buffering of incoming data to handle ordering locations. The value is threshold for + * buffering in milliseconds. + */ + public static final ConfigKey<Long> SERVER_BUFFERING_THRESHOLD = new LongConfigKey( + "server.buffering.threshold", + List.of(KeyType.CONFIG)); + + /** * Server wide connection timeout value in seconds. See protocol timeout for more information. */ public static final ConfigKey<Integer> SERVER_TIMEOUT = new IntegerConfigKey( diff --git a/src/main/java/org/traccar/database/BufferingManager.java b/src/main/java/org/traccar/database/BufferingManager.java new file mode 100644 index 000000000..ab71b2860 --- /dev/null +++ b/src/main/java/org/traccar/database/BufferingManager.java @@ -0,0 +1,103 @@ +/* + * Copyright 2024 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.database; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.config.Config; +import org.traccar.config.Keys; +import org.traccar.model.Position; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +public class BufferingManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(BufferingManager.class); + + public interface Callback { + void onReleased(ChannelHandlerContext context, Position position); + } + + private static class Holder implements Comparable<Holder> { + + private static final Comparator<Position> COMPARATOR = Comparator + .comparing(Position::getFixTime) + .thenComparing(Position::getDeviceTime) + .thenComparing(Position::getServerTime); + + private final ChannelHandlerContext context; + private final Position position; + private Timeout timeout; + + private Holder(ChannelHandlerContext context, Position position) { + this.context = context; + this.position = position; + } + + @Override + public int compareTo(Holder other) { + return COMPARATOR.compare(position, other.position); + } + } + + private final Timer timer = new HashedWheelTimer(); + private final Callback callback; + private final long threshold; + + private final Map<Long, TreeSet<Holder>> buffer = new HashMap<>(); + + public BufferingManager(Config config, Callback callback) { + this.callback = callback; + threshold = config.getLong(Keys.SERVER_BUFFERING_THRESHOLD); + } + + private Timeout scheduleTimeout(Holder holder) { + return timer.newTimeout( + timeout -> { + LOGGER.info("released {}", holder.position.getFixTime()); + buffer.get(holder.position.getDeviceId()).remove(holder); + callback.onReleased(holder.context, holder.position); + }, + threshold, TimeUnit.MILLISECONDS); + } + + public void accept(ChannelHandlerContext context, Position position) { + if (threshold > 0) { + synchronized (buffer) { + LOGGER.info("queued {}", position.getFixTime()); + var queue = buffer.computeIfAbsent(position.getDeviceId(), k -> new TreeSet<>()); + Holder holder = new Holder(context, position); + holder.timeout = scheduleTimeout(holder); + queue.add(holder); + queue.tailSet(holder).forEach(h -> { + h.timeout.cancel(); + h.timeout = scheduleTimeout(h); + }); + } + } else { + callback.onReleased(context, position); + } + } + +} |