diff options
Diffstat (limited to 'src/main/java/org/traccar/ProcessingHandler.java')
-rw-r--r-- | src/main/java/org/traccar/ProcessingHandler.java | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java new file mode 100644 index 000000000..fd048d127 --- /dev/null +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -0,0 +1,203 @@ +/* + * Copyright 2024 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import com.google.inject.Injector; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.traccar.config.Config; +import org.traccar.database.BufferingManager; +import org.traccar.database.NotificationManager; +import org.traccar.handler.BasePositionHandler; +import org.traccar.handler.ComputedAttributesHandler; +import org.traccar.handler.CopyAttributesHandler; +import org.traccar.handler.DatabaseHandler; +import org.traccar.handler.DistanceHandler; +import org.traccar.handler.EngineHoursHandler; +import org.traccar.handler.FilterHandler; +import org.traccar.handler.GeocoderHandler; +import org.traccar.handler.GeofenceHandler; +import org.traccar.handler.GeolocationHandler; +import org.traccar.handler.HemisphereHandler; +import org.traccar.handler.MotionHandler; +import org.traccar.handler.OutdatedHandler; +import org.traccar.handler.PositionForwardingHandler; +import org.traccar.handler.PostProcessHandler; +import org.traccar.handler.SpeedLimitHandler; +import org.traccar.handler.TimeHandler; +import org.traccar.handler.events.AlertEventHandler; +import org.traccar.handler.events.BaseEventHandler; +import org.traccar.handler.events.BehaviorEventHandler; +import org.traccar.handler.events.CommandResultEventHandler; +import org.traccar.handler.events.DriverEventHandler; +import org.traccar.handler.events.FuelEventHandler; +import org.traccar.handler.events.GeofenceEventHandler; +import org.traccar.handler.events.IgnitionEventHandler; +import org.traccar.handler.events.MaintenanceEventHandler; +import org.traccar.handler.events.MediaEventHandler; +import org.traccar.handler.events.MotionEventHandler; +import org.traccar.handler.events.OverspeedEventHandler; +import org.traccar.handler.network.AcknowledgementHandler; +import org.traccar.helper.PositionLogger; +import org.traccar.model.Position; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Singleton +@ChannelHandler.Sharable +public class ProcessingHandler extends ChannelInboundHandlerAdapter implements BufferingManager.Callback { + + private final NotificationManager notificationManager; + private final PositionLogger positionLogger; + private final BufferingManager bufferingManager; + private final List<BasePositionHandler> positionHandlers; + private final List<BaseEventHandler> eventHandlers; + private final PostProcessHandler postProcessHandler; + + private final Map<Long, Queue<Position>> queues = new HashMap<>(); + + private synchronized Queue<Position> getQueue(long deviceId) { + return queues.computeIfAbsent(deviceId, k -> new LinkedList<>()); + } + + @Inject + public ProcessingHandler( + Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) { + this.notificationManager = notificationManager; + this.positionLogger = positionLogger; + bufferingManager = new BufferingManager(config, this); + + positionHandlers = Stream.of( + OutdatedHandler.class, + TimeHandler.class, + GeolocationHandler.class, + HemisphereHandler.class, + DistanceHandler.class, + FilterHandler.class, + GeofenceHandler.class, + GeocoderHandler.class, + SpeedLimitHandler.class, + MotionHandler.class, + EngineHoursHandler.class, + ComputedAttributesHandler.class, + CopyAttributesHandler.class, + PositionForwardingHandler.class, + DatabaseHandler.class) + .map((clazz) -> (BasePositionHandler) injector.getInstance(clazz)) + .filter(Objects::nonNull) + .collect(Collectors.toUnmodifiableList()); + + eventHandlers = Stream.of( + MediaEventHandler.class, + CommandResultEventHandler.class, + OverspeedEventHandler.class, + BehaviorEventHandler.class, + FuelEventHandler.class, + MotionEventHandler.class, + GeofenceEventHandler.class, + AlertEventHandler.class, + IgnitionEventHandler.class, + MaintenanceEventHandler.class, + DriverEventHandler.class) + .map((clazz) -> (BaseEventHandler) injector.getInstance(clazz)) + .filter(Objects::nonNull) + .collect(Collectors.toUnmodifiableList()); + + postProcessHandler = injector.getInstance(PostProcessHandler.class); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Position) { + bufferingManager.accept(ctx, (Position) msg); + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void onReleased(ChannelHandlerContext context, Position position) { + Queue<Position> queue = getQueue(position.getDeviceId()); + boolean queued; + synchronized (queue) { + queued = !queue.isEmpty(); + queue.offer(position); + } + if (!queued) { + processPositionHandlers(context, position); + } + } + + private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { + var iterator = positionHandlers.iterator(); + iterator.next().handlePosition(position, new BasePositionHandler.Callback() { + @Override + public void processed(boolean filtered) { + if (!filtered) { + if (iterator.hasNext()) { + iterator.next().handlePosition(position, this); + } else { + processEventHandlers(ctx, position); + } + } else { + finishedProcessing(ctx, position, true); + } + } + }); + } + + private void processEventHandlers(ChannelHandlerContext ctx, Position position) { + eventHandlers.forEach(handler -> handler.analyzePosition( + position, (event) -> notificationManager.updateEvents(Map.of(event, position)))); + finishedProcessing(ctx, position, false); + } + + private void finishedProcessing(ChannelHandlerContext ctx, Position position, boolean filtered) { + if (!filtered) { + postProcessHandler.handlePosition(position, ignore -> { + positionLogger.log(ctx, position); + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + processNextPosition(ctx, position.getDeviceId()); + }); + } else { + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + processNextPosition(ctx, position.getDeviceId()); + } + } + + private void processNextPosition(ChannelHandlerContext ctx, long deviceId) { + Queue<Position> queue = getQueue(deviceId); + Position nextPosition; + synchronized (queue) { + queue.poll(); // remove current position + nextPosition = queue.peek(); + } + if (nextPosition != null) { + processPositionHandlers(ctx, nextPosition); + } + } + +} |