diff options
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r-- | src/org/traccar/BasePipelineFactory.java | 164 |
1 files changed, 114 insertions, 50 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 5a077da7c..6269fb8cc 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 - 2017 Anton Tananaev (anton@traccar.org) + * Copyright 2012 - 2018 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,20 +15,20 @@ */ package org.traccar; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelHandler; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.DownstreamMessageEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.handler.logging.LoggingHandler; -import org.jboss.netty.handler.timeout.IdleStateHandler; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOutboundHandler; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.timeout.IdleStateHandler; import org.traccar.events.CommandResultEventHandler; import org.traccar.events.DriverEventHandler; import org.traccar.events.FuelDropEventHandler; @@ -44,13 +44,14 @@ import org.traccar.processing.CopyAttributesHandler; import java.net.InetSocketAddress; -public abstract class BasePipelineFactory implements ChannelPipelineFactory { +public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { private final TrackerServer server; private int timeout; private FilterHandler filterHandler; private DistanceHandler distanceHandler; + private EngineHoursHandler engineHoursHandler; private RemoteAddressHandler remoteAddressHandler; private MotionHandler motionHandler; private GeocoderHandler geocoderHandler; @@ -69,7 +70,7 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { private MaintenanceEventHandler maintenanceEventHandler; private DriverEventHandler driverEventHandler; - private static final class OpenChannelHandler extends SimpleChannelHandler { + private static final class OpenChannelHandler extends ChannelDuplexHandler { private final TrackerServer server; @@ -78,41 +79,83 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) { - server.getChannelGroup().add(e.getChannel()); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + server.getChannelGroup().add(ctx.channel()); } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + server.getChannelGroup().remove(ctx.channel()); + } + } - private static class StandardLoggingHandler extends LoggingHandler { + private static class NetworkMessageHandler extends ChannelDuplexHandler { @Override - public void log(ChannelEvent e) { - if (e instanceof MessageEvent) { - MessageEvent event = (MessageEvent) e; - StringBuilder msg = new StringBuilder(); - - msg.append("[").append(String.format("%08X", e.getChannel().getId())).append(": "); - msg.append(((InetSocketAddress) e.getChannel().getLocalAddress()).getPort()); - if (e instanceof DownstreamMessageEvent) { - msg.append(" > "); - } else { - msg.append(" < "); - } + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (ctx.channel() instanceof DatagramChannel) { + DatagramPacket packet = (DatagramPacket) msg; + ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); + } else { + ByteBuf buffer = (ByteBuf) msg; + ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); + } + } - if (event.getRemoteAddress() != null) { - msg.append(((InetSocketAddress) event.getRemoteAddress()).getHostString()); - } else { - msg.append("null"); - } - msg.append("]"); + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + NetworkMessage message = (NetworkMessage) msg; + if (ctx.channel() instanceof DatagramChannel) { + InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); + InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); + ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + } else { + ctx.write(message.getMessage(), promise); + } + } - if (event.getMessage() instanceof ChannelBuffer) { - msg.append(" HEX: "); - msg.append(ChannelBuffers.hexDump((ChannelBuffer) event.getMessage())); - } + } + + private static class StandardLoggingHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log(ctx, false, msg); + super.channelRead(ctx, msg); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log(ctx, true, msg); + super.write(ctx, msg, promise); + } - Log.debug(msg.toString()); + public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { + NetworkMessage networkMessage = (NetworkMessage) o; + StringBuilder message = new StringBuilder(); + + message.append("[").append(ctx.channel().id().asShortText()).append(": "); + message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort()); + if (downstream) { + message.append(" > "); + } else { + message.append(" < "); + } + + if (networkMessage.getRemoteAddress() != null) { + message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString()); + } else { + message.append("null"); } + message.append("]"); + + message.append(" HEX: "); + message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage())); + + Log.debug(message.toString()); } } @@ -155,6 +198,10 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { motionHandler = new MotionHandler(Context.getTripsConfig().getSpeedThreshold()); + if (Context.getConfig().getBoolean("processing.engineHours.enable")) { + engineHoursHandler = new EngineHoursHandler(); + } + if (Context.getConfig().hasKey("location.latitudeHemisphere") || Context.getConfig().hasKey("location.longitudeHemisphere")) { hemisphereHandler = new HemisphereHandler(); @@ -181,20 +228,33 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { } } - protected abstract void addSpecificHandlers(ChannelPipeline pipeline); + protected abstract void addProtocolHandlers(PipelineBuilder pipeline); @Override - public ChannelPipeline getPipeline() { - ChannelPipeline pipeline = Channels.pipeline(); - if (timeout > 0 && !server.isConnectionless()) { - pipeline.addLast("idleHandler", new IdleStateHandler(GlobalTimer.getTimer(), timeout, 0, 0)); + protected void initChannel(Channel channel) throws Exception { + final ChannelPipeline pipeline = channel.pipeline(); + if (timeout > 0 && !server.isDatagram()) { + pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0)); } pipeline.addLast("openHandler", new OpenChannelHandler(server)); + pipeline.addLast("messageHandler", new NetworkMessageHandler()); if (Context.isLoggerEnabled()) { pipeline.addLast("logger", new StandardLoggingHandler()); } - addSpecificHandlers(pipeline); + addProtocolHandlers(new PipelineBuilder() { + @Override + public void addLast(String name, ChannelHandler handler) { + if (!(handler instanceof BaseProtocolDecoder || handler instanceof BaseProtocolEncoder)) { + if (handler instanceof ChannelInboundHandler) { + handler = new WrapperInboundHandler((ChannelInboundHandler) handler); + } else { + handler = new WrapperOutboundHandler((ChannelOutboundHandler) handler); + } + } + pipeline.addLast(name, handler); + } + }); if (geolocationHandler != null) { pipeline.addLast("location", geolocationHandler); @@ -225,6 +285,10 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { pipeline.addLast("motion", motionHandler); } + if (engineHoursHandler != null) { + pipeline.addLast("engineHours", engineHoursHandler); + } + if (copyAttributesHandler != null) { pipeline.addLast("copyAttributes", copyAttributesHandler); } @@ -279,7 +343,6 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { } pipeline.addLast("mainHandler", new MainEventHandler()); - return pipeline; } private void addDynamicHandlers(ChannelPipeline pipeline) { @@ -294,4 +357,5 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { } } } + } |