/* * Copyright 2024 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.traccar; import com.google.inject.Injector; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.traccar.database.NotificationManager; import org.traccar.handler.BasePositionHandler; import org.traccar.handler.ComputedAttributesHandler; import org.traccar.handler.CopyAttributesHandler; import org.traccar.handler.DatabaseHandler; import org.traccar.handler.PostProcessHandler; import org.traccar.handler.DistanceHandler; import org.traccar.handler.EngineHoursHandler; import org.traccar.handler.FilterHandler; import org.traccar.handler.GeocoderHandler; import org.traccar.handler.GeofenceHandler; import org.traccar.handler.GeolocationHandler; import org.traccar.handler.HemisphereHandler; import org.traccar.handler.MotionHandler; import org.traccar.handler.PositionForwardingHandler; import org.traccar.handler.SpeedLimitHandler; import org.traccar.handler.TimeHandler; import org.traccar.handler.events.AlertEventHandler; import org.traccar.handler.events.BaseEventHandler; import org.traccar.handler.events.BehaviorEventHandler; import org.traccar.handler.events.CommandResultEventHandler; import org.traccar.handler.events.DriverEventHandler; import org.traccar.handler.events.FuelEventHandler; import org.traccar.handler.events.GeofenceEventHandler; import org.traccar.handler.events.IgnitionEventHandler; import org.traccar.handler.events.MaintenanceEventHandler; import org.traccar.handler.events.MediaEventHandler; import org.traccar.handler.events.MotionEventHandler; import org.traccar.handler.events.OverspeedEventHandler; import org.traccar.handler.network.AcknowledgementHandler; import org.traccar.helper.PositionLogger; import org.traccar.model.Position; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @Singleton @ChannelHandler.Sharable public class ProcessingHandler extends ChannelInboundHandlerAdapter { private final NotificationManager notificationManager; private final PositionLogger positionLogger; private final List positionHandlers; private final List eventHandlers; private final PostProcessHandler postProcessHandler; @Inject public ProcessingHandler( Injector injector, NotificationManager notificationManager, PositionLogger positionLogger) { this.notificationManager = notificationManager; this.positionLogger = positionLogger; positionHandlers = Stream.of( TimeHandler.class, GeolocationHandler.class, HemisphereHandler.class, DistanceHandler.class, FilterHandler.class, GeofenceHandler.class, GeocoderHandler.class, SpeedLimitHandler.class, MotionHandler.class, EngineHoursHandler.class, ComputedAttributesHandler.class, CopyAttributesHandler.class, PositionForwardingHandler.class, DatabaseHandler.class) .map((clazz) -> (BasePositionHandler) injector.getInstance(clazz)) .filter(Objects::nonNull) .collect(Collectors.toUnmodifiableList()); eventHandlers = Stream.of( MediaEventHandler.class, CommandResultEventHandler.class, OverspeedEventHandler.class, BehaviorEventHandler.class, FuelEventHandler.class, MotionEventHandler.class, GeofenceEventHandler.class, AlertEventHandler.class, IgnitionEventHandler.class, MaintenanceEventHandler.class, DriverEventHandler.class) .map((clazz) -> (BaseEventHandler) injector.getInstance(clazz)) .filter(Objects::nonNull) .collect(Collectors.toUnmodifiableList()); postProcessHandler = injector.getInstance(PostProcessHandler.class); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Position) { processPositionHandlers(ctx, (Position) msg); } else { super.channelRead(ctx, msg); } } private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { var iterator = positionHandlers.iterator(); iterator.next().handlePosition(position, new BasePositionHandler.Callback() { @Override public void processed(Position position) { if (position != null) { if (iterator.hasNext()) { iterator.next().handlePosition(position, this); } else { processEventHandlers(ctx, position); } } else { finishedProcessing(ctx, null); } } }); } private void processEventHandlers(ChannelHandlerContext ctx, Position position) { eventHandlers.forEach(handler -> handler.analyzePosition( position, (event) -> notificationManager.updateEvents(Map.of(event, position)))); finishedProcessing(ctx, position); } private void finishedProcessing(ChannelHandlerContext ctx, Position position) { postProcessHandler.handlePosition(position, p -> { positionLogger.log(ctx, p); ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); }); } }