diff options
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r-- | src/org/traccar/BasePipelineFactory.java | 138 |
1 files changed, 13 insertions, 125 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 022eeeffa..7f617c470 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -15,23 +15,17 @@ */ package org.traccar; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.traccar.config.Keys; +import org.traccar.events.AlertEventHandler; import org.traccar.events.CommandResultEventHandler; import org.traccar.events.DriverEventHandler; import org.traccar.events.FuelDropEventHandler; @@ -40,13 +34,13 @@ import org.traccar.events.IgnitionEventHandler; import org.traccar.events.MaintenanceEventHandler; import org.traccar.events.MotionEventHandler; import org.traccar.events.OverspeedEventHandler; -import org.traccar.events.AlertEventHandler; -import org.traccar.processing.ComputedAttributesHandler; -import org.traccar.processing.CopyAttributesHandler; -import org.traccar.processing.FilterHandler; +import org.traccar.handler.ComputedAttributesHandler; +import org.traccar.handler.CopyAttributesHandler; +import org.traccar.handler.FilterHandler; +import org.traccar.handler.NetworkMessageHandler; +import org.traccar.handler.OpenChannelHandler; +import org.traccar.handler.StandardLoggingHandler; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.Map; public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { @@ -76,119 +70,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { private MaintenanceEventHandler maintenanceEventHandler; private DriverEventHandler driverEventHandler; - private static final class OpenChannelHandler extends ChannelDuplexHandler { - - private final TrackerServer server; - - private OpenChannelHandler(TrackerServer server) { - this.server = server; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - server.getChannelGroup().add(ctx.channel()); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - server.getChannelGroup().remove(ctx.channel()); - } - - } - - private static class NetworkMessageHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (ctx.channel() instanceof DatagramChannel) { - DatagramPacket packet = (DatagramPacket) msg; - ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); - } else if (msg instanceof ByteBuf) { - ByteBuf buffer = (ByteBuf) msg; - ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg instanceof NetworkMessage) { - NetworkMessage message = (NetworkMessage) msg; - if (ctx.channel() instanceof DatagramChannel) { - InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); - InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); - ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); - } else { - ctx.write(message.getMessage(), promise); - } - } else { - ctx.write(msg, promise); - } - } - - } - - private static class StandardLoggingHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log(ctx, false, msg); - super.channelRead(ctx, msg); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - log(ctx, true, msg); - super.write(ctx, msg, promise); - } - - public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { - if (o instanceof NetworkMessage) { - NetworkMessage networkMessage = (NetworkMessage) o; - if (networkMessage.getMessage() instanceof ByteBuf) { - log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); - } - } else if (o instanceof ByteBuf) { - log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); - } - } - - public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { - StringBuilder message = new StringBuilder(); - - message.append("[").append(ctx.channel().id().asShortText()).append(": "); - message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort()); - if (downstream) { - message.append(" > "); - } else { - message.append(" < "); - } - - if (remoteAddress instanceof InetSocketAddress) { - message.append(((InetSocketAddress) remoteAddress).getHostString()); - } else { - message.append("unknown"); - } - message.append("]"); - - message.append(" HEX: "); - message.append(ByteBufUtil.hexDump(buf)); - - LOGGER.info(message.toString()); - } - - } - public BasePipelineFactory(TrackerServer server, String protocol) { this.server = server; - timeout = Context.getConfig().getInteger(protocol + ".timeout"); + timeout = Context.getConfig().getInteger(Keys.PROTOCOL_TIMEOUT.withPrefix(protocol)); if (timeout == 0) { - timeout = Context.getConfig().getInteger(protocol + ".resetDelay"); // temporary - if (timeout == 0) { - timeout = Context.getConfig().getInteger("server.timeout"); - } + timeout = Context.getConfig().getInteger(Keys.SERVER_TIMEOUT); } distanceHandler = new DistanceHandler( @@ -196,7 +83,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { Context.getConfig().getInteger("coordinates.minError"), Context.getConfig().getInteger("coordinates.maxError")); - if (Context.getConfig().getBoolean("processing.remoteAddress.enable")) { + if (Context.getConfig().getBoolean("handler.remoteAddress.enable")) { remoteAddressHandler = new RemoteAddressHandler(); } @@ -223,7 +110,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { hemisphereHandler = new HemisphereHandler(); } - if (Context.getConfig().getBoolean("processing.copyAttributes.enable")) { + if (Context.getConfig().getBoolean("handler.copyAttributes.enable")) { copyAttributesHandler = new CopyAttributesHandler(); } @@ -270,8 +157,9 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { } @Override - protected void initChannel(Channel channel) throws Exception { + protected void initChannel(Channel channel) { final ChannelPipeline pipeline = channel.pipeline(); + if (timeout > 0 && !server.isDatagram()) { pipeline.addLast(new IdleStateHandler(timeout, 0, 0)); } |