/* * Copyright 2024 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.traccar; import com.google.inject.Injector; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.traccar.config.Config; import org.traccar.database.BufferingManager; import org.traccar.database.NotificationManager; import org.traccar.handler.BasePositionHandler; import org.traccar.handler.ComputedAttributesHandler; import org.traccar.handler.CopyAttributesHandler; import org.traccar.handler.DatabaseHandler; import org.traccar.handler.DistanceHandler; import org.traccar.handler.EngineHoursHandler; import org.traccar.handler.FilterHandler; import org.traccar.handler.GeocoderHandler; import org.traccar.handler.GeofenceHandler; import org.traccar.handler.GeolocationHandler; import org.traccar.handler.HemisphereHandler; import org.traccar.handler.MotionHandler; import org.traccar.handler.OutdatedHandler; import org.traccar.handler.PositionForwardingHandler; import org.traccar.handler.PostProcessHandler; import org.traccar.handler.SpeedLimitHandler; import org.traccar.handler.TimeHandler; import org.traccar.handler.events.AlertEventHandler; import org.traccar.handler.events.BaseEventHandler; import org.traccar.handler.events.BehaviorEventHandler; import org.traccar.handler.events.CommandResultEventHandler; import org.traccar.handler.events.DriverEventHandler; import org.traccar.handler.events.FuelEventHandler; import org.traccar.handler.events.GeofenceEventHandler; import org.traccar.handler.events.IgnitionEventHandler; import org.traccar.handler.events.MaintenanceEventHandler; import org.traccar.handler.events.MediaEventHandler; import org.traccar.handler.events.MotionEventHandler; import org.traccar.handler.events.OverspeedEventHandler; import org.traccar.handler.network.AcknowledgementHandler; import org.traccar.helper.PositionLogger; import org.traccar.model.Position; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.stream.Collectors; import java.util.stream.Stream; @Singleton @ChannelHandler.Sharable public class ProcessingHandler extends ChannelInboundHandlerAdapter implements BufferingManager.Callback { private final NotificationManager notificationManager; private final PositionLogger positionLogger; private final BufferingManager bufferingManager; private final List positionHandlers; private final List eventHandlers; private final PostProcessHandler postProcessHandler; private final Map> queues = new HashMap<>(); private synchronized Queue getQueue(long deviceId) { return queues.computeIfAbsent(deviceId, k -> new LinkedList<>()); } @Inject public ProcessingHandler( Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) { this.notificationManager = notificationManager; this.positionLogger = positionLogger; bufferingManager = new BufferingManager(config, this); positionHandlers = Stream.of( OutdatedHandler.class, TimeHandler.class, GeolocationHandler.class, HemisphereHandler.class, DistanceHandler.class, FilterHandler.class, GeofenceHandler.class, GeocoderHandler.class, SpeedLimitHandler.class, MotionHandler.class, ComputedAttributesHandler.class, EngineHoursHandler.class, CopyAttributesHandler.class, PositionForwardingHandler.class, DatabaseHandler.class) .map((clazz) -> (BasePositionHandler) injector.getInstance(clazz)) .filter(Objects::nonNull) .collect(Collectors.toUnmodifiableList()); eventHandlers = Stream.of( MediaEventHandler.class, CommandResultEventHandler.class, OverspeedEventHandler.class, BehaviorEventHandler.class, FuelEventHandler.class, MotionEventHandler.class, GeofenceEventHandler.class, AlertEventHandler.class, IgnitionEventHandler.class, MaintenanceEventHandler.class, DriverEventHandler.class) .map((clazz) -> (BaseEventHandler) injector.getInstance(clazz)) .filter(Objects::nonNull) .collect(Collectors.toUnmodifiableList()); postProcessHandler = injector.getInstance(PostProcessHandler.class); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Position position) { bufferingManager.accept(ctx, position); } else { super.channelRead(ctx, msg); } } @Override public void onReleased(ChannelHandlerContext context, Position position) { Queue queue = getQueue(position.getDeviceId()); boolean queued; synchronized (queue) { queued = !queue.isEmpty(); queue.offer(position); } if (!queued) { processPositionHandlers(context, position); } } private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { var iterator = positionHandlers.iterator(); iterator.next().handlePosition(position, new BasePositionHandler.Callback() { @Override public void processed(boolean filtered) { if (!filtered) { if (iterator.hasNext()) { iterator.next().handlePosition(position, this); } else { processEventHandlers(ctx, position); } } else { finishedProcessing(ctx, position, true); } } }); } private void processEventHandlers(ChannelHandlerContext ctx, Position position) { eventHandlers.forEach(handler -> handler.analyzePosition( position, (event) -> notificationManager.updateEvents(Map.of(event, position)))); finishedProcessing(ctx, position, false); } private void finishedProcessing(ChannelHandlerContext ctx, Position position, boolean filtered) { if (!filtered) { postProcessHandler.handlePosition(position, ignore -> { positionLogger.log(ctx, position); ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); processNextPosition(ctx, position.getDeviceId()); }); } else { ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); processNextPosition(ctx, position.getDeviceId()); } } private void processNextPosition(ChannelHandlerContext ctx, long deviceId) { Queue queue = getQueue(deviceId); Position nextPosition; synchronized (queue) { queue.poll(); // remove current position nextPosition = queue.peek(); } if (nextPosition != null) { processPositionHandlers(ctx, nextPosition); } } }