diff options
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r-- | src/org/traccar/BasePipelineFactory.java | 312 |
1 files changed, 65 insertions, 247 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index b45e3a280..b3d37f689 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -15,36 +15,40 @@ */ package org.traccar; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.traccar.events.CommandResultEventHandler; -import org.traccar.events.DriverEventHandler; -import org.traccar.events.FuelDropEventHandler; -import org.traccar.events.GeofenceEventHandler; -import org.traccar.events.IgnitionEventHandler; -import org.traccar.events.MaintenanceEventHandler; -import org.traccar.events.MotionEventHandler; -import org.traccar.events.OverspeedEventHandler; -import org.traccar.events.AlertEventHandler; -import org.traccar.processing.ComputedAttributesHandler; -import org.traccar.processing.CopyAttributesHandler; +import org.traccar.config.Keys; +import org.traccar.handler.DefaultDataHandler; +import org.traccar.handler.events.AlertEventHandler; +import org.traccar.handler.events.CommandResultEventHandler; +import org.traccar.handler.events.DriverEventHandler; +import org.traccar.handler.events.FuelDropEventHandler; +import org.traccar.handler.events.GeofenceEventHandler; +import org.traccar.handler.events.IgnitionEventHandler; +import org.traccar.handler.events.MaintenanceEventHandler; +import org.traccar.handler.events.MotionEventHandler; +import org.traccar.handler.events.OverspeedEventHandler; +import org.traccar.handler.ComputedAttributesHandler; +import org.traccar.handler.CopyAttributesHandler; +import org.traccar.handler.DistanceHandler; +import org.traccar.handler.EngineHoursHandler; +import org.traccar.handler.FilterHandler; +import org.traccar.handler.GeocoderHandler; +import org.traccar.handler.GeolocationHandler; +import org.traccar.handler.HemisphereHandler; +import org.traccar.handler.MotionHandler; +import org.traccar.handler.NetworkMessageHandler; +import org.traccar.handler.OpenChannelHandler; +import org.traccar.handler.RemoteAddressHandler; +import org.traccar.handler.StandardLoggingHandler; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.Map; public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { @@ -52,207 +56,25 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { private static final Logger LOGGER = LoggerFactory.getLogger(BasePipelineFactory.class); private final TrackerServer server; + private boolean eventsEnabled; private int timeout; - private FilterHandler filterHandler; - private DistanceHandler distanceHandler; - private EngineHoursHandler engineHoursHandler; - private RemoteAddressHandler remoteAddressHandler; - private MotionHandler motionHandler; - private GeocoderHandler geocoderHandler; - private GeolocationHandler geolocationHandler; - private HemisphereHandler hemisphereHandler; - private CopyAttributesHandler copyAttributesHandler; - private ComputedAttributesHandler computedAttributesHandler; - - private CommandResultEventHandler commandResultEventHandler; - private OverspeedEventHandler overspeedEventHandler; - private FuelDropEventHandler fuelDropEventHandler; - private MotionEventHandler motionEventHandler; - private GeofenceEventHandler geofenceEventHandler; - private AlertEventHandler alertEventHandler; - private IgnitionEventHandler ignitionEventHandler; - private MaintenanceEventHandler maintenanceEventHandler; - private DriverEventHandler driverEventHandler; - - private static final class OpenChannelHandler extends ChannelDuplexHandler { - - private final TrackerServer server; - - private OpenChannelHandler(TrackerServer server) { - this.server = server; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - server.getChannelGroup().add(ctx.channel()); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - server.getChannelGroup().remove(ctx.channel()); - } - - } - - private static class NetworkMessageHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (ctx.channel() instanceof DatagramChannel) { - DatagramPacket packet = (DatagramPacket) msg; - ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); - } else if (msg instanceof ByteBuf) { - ByteBuf buffer = (ByteBuf) msg; - ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg instanceof NetworkMessage) { - NetworkMessage message = (NetworkMessage) msg; - if (ctx.channel() instanceof DatagramChannel) { - InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); - InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); - ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); - } else { - ctx.write(message.getMessage(), promise); - } - } else { - ctx.write(msg, promise); - } - } - - } - - private static class StandardLoggingHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log(ctx, false, msg); - super.channelRead(ctx, msg); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - log(ctx, true, msg); - super.write(ctx, msg, promise); - } - - public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { - if (o instanceof NetworkMessage) { - NetworkMessage networkMessage = (NetworkMessage) o; - if (networkMessage.getMessage() instanceof ByteBuf) { - log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); - } - } else if (o instanceof ByteBuf) { - log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); - } - } - - public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { - StringBuilder message = new StringBuilder(); - - message.append("[").append(ctx.channel().id().asShortText()).append(": "); - message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort()); - if (downstream) { - message.append(" > "); - } else { - message.append(" < "); - } - - if (remoteAddress instanceof InetSocketAddress) { - message.append(((InetSocketAddress) remoteAddress).getHostString()); - } else { - message.append("unknown"); - } - message.append("]"); - - message.append(" HEX: "); - message.append(ByteBufUtil.hexDump(buf)); - - LOGGER.info(message.toString()); - } - - } - public BasePipelineFactory(TrackerServer server, String protocol) { this.server = server; - - timeout = Context.getConfig().getInteger(protocol + ".timeout"); + eventsEnabled = Context.getConfig().getBoolean(Keys.EVENT_ENABLE); + timeout = Context.getConfig().getInteger(Keys.PROTOCOL_TIMEOUT.withPrefix(protocol)); if (timeout == 0) { - timeout = Context.getConfig().getInteger(protocol + ".resetDelay"); // temporary - if (timeout == 0) { - timeout = Context.getConfig().getInteger("server.timeout"); - } - } - - distanceHandler = new DistanceHandler( - Context.getConfig().getBoolean("coordinates.filter"), - Context.getConfig().getInteger("coordinates.minError"), - Context.getConfig().getInteger("coordinates.maxError")); - - if (Context.getConfig().getBoolean("processing.remoteAddress.enable")) { - remoteAddressHandler = new RemoteAddressHandler(); - } - - if (Context.getConfig().getBoolean("filter.enable")) { - filterHandler = new FilterHandler(); - } - - if (Context.getGeocoder() != null && !Context.getConfig().getBoolean("geocoder.ignorePositions")) { - geocoderHandler = new GeocoderHandler( - Context.getGeocoder(), - Context.getConfig().getBoolean("geocoder.processInvalidPositions")); - } - - if (Context.getGeolocationProvider() != null) { - geolocationHandler = new GeolocationHandler( - Context.getGeolocationProvider(), - Context.getConfig().getBoolean("geolocation.processInvalidPositions")); - } - - motionHandler = new MotionHandler(Context.getTripsConfig().getSpeedThreshold()); - - if (Context.getConfig().getBoolean("processing.engineHours.enable")) { - engineHoursHandler = new EngineHoursHandler(); - } - - if (Context.getConfig().hasKey("location.latitudeHemisphere") - || Context.getConfig().hasKey("location.longitudeHemisphere")) { - hemisphereHandler = new HemisphereHandler(); - } - - if (Context.getConfig().getBoolean("processing.copyAttributes.enable")) { - copyAttributesHandler = new CopyAttributesHandler(); - } - - if (Context.getConfig().getBoolean("processing.computedAttributes.enable")) { - computedAttributesHandler = new ComputedAttributesHandler(); - } - - if (Context.getConfig().getBoolean("event.enable")) { - commandResultEventHandler = new CommandResultEventHandler(); - overspeedEventHandler = Context.getOverspeedEventHandler(); - fuelDropEventHandler = new FuelDropEventHandler(); - motionEventHandler = Context.getMotionEventHandler(); - geofenceEventHandler = new GeofenceEventHandler(); - alertEventHandler = new AlertEventHandler(); - ignitionEventHandler = new IgnitionEventHandler(); - maintenanceEventHandler = new MaintenanceEventHandler(); - driverEventHandler = new DriverEventHandler(); + timeout = Context.getConfig().getInteger(Keys.SERVER_TIMEOUT); } } protected abstract void addProtocolHandlers(PipelineBuilder pipeline); - private void addHandlers(ChannelPipeline pipeline, ChannelHandler... handlers) { - for (ChannelHandler handler : handlers) { - if (handler != null) { - pipeline.addLast(handler); + @SafeVarargs + private final void addHandlers(ChannelPipeline pipeline, Class<? extends ChannelHandler>... handlerClasses) { + for (Class<? extends ChannelHandler> handlerClass : handlerClasses) { + if (handlerClass != null) { + pipeline.addLast(Main.getInjector().getInstance(handlerClass)); } } } @@ -273,8 +95,9 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { } @Override - protected void initChannel(Channel channel) throws Exception { + protected void initChannel(Channel channel) { final ChannelPipeline pipeline = channel.pipeline(); + if (timeout > 0 && !server.isDatagram()) { pipeline.addLast(new IdleStateHandler(timeout, 0, 0)); } @@ -295,53 +118,48 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { addHandlers( pipeline, - geolocationHandler, - hemisphereHandler, - distanceHandler, - remoteAddressHandler); + GeolocationHandler.class, + HemisphereHandler.class, + DistanceHandler.class, + RemoteAddressHandler.class); addDynamicHandlers(pipeline); addHandlers( pipeline, - filterHandler, - geocoderHandler, - motionHandler, - engineHoursHandler, - copyAttributesHandler, - computedAttributesHandler); - - if (Context.getDataManager() != null) { - pipeline.addLast(new DefaultDataHandler()); - } - - if (Context.getConfig().getBoolean("forward.enable")) { - pipeline.addLast(Main.getInjector().getInstance(WebDataHandler.Factory.class).create( - Context.getConfig().getString("forward.url"), Context.getConfig().getBoolean("forward.json"))); + FilterHandler.class, + GeocoderHandler.class, + MotionHandler.class, + EngineHoursHandler.class, + CopyAttributesHandler.class, + ComputedAttributesHandler.class, + WebDataHandler.class, + DefaultDataHandler.class); + + if (eventsEnabled) { + addHandlers( + pipeline, + CommandResultEventHandler.class, + OverspeedEventHandler.class, + FuelDropEventHandler.class, + MotionEventHandler.class, + GeofenceEventHandler.class, + AlertEventHandler.class, + IgnitionEventHandler.class, + MaintenanceEventHandler.class, + DriverEventHandler.class); } - addHandlers( - pipeline, - commandResultEventHandler, - overspeedEventHandler, - fuelDropEventHandler, - motionEventHandler, - geofenceEventHandler, - alertEventHandler, - ignitionEventHandler, - maintenanceEventHandler, - driverEventHandler); - pipeline.addLast(new MainEventHandler()); } private void addDynamicHandlers(ChannelPipeline pipeline) { - if (Context.getConfig().hasKey("extra.handlers")) { - String[] handlers = Context.getConfig().getString("extra.handlers").split(","); - for (String handler : handlers) { + String handlers = Context.getConfig().getString(Keys.EXTRA_HANDLERS); + if (handlers != null) { + for (String handler : handlers.split(",")) { try { - pipeline.addLast((ChannelHandler) Class.forName(handler).newInstance()); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException error) { + pipeline.addLast((ChannelHandler) Class.forName(handler).getDeclaredConstructor().newInstance()); + } catch (ReflectiveOperationException error) { LOGGER.warn("Dynamic handler error", error); } } |