diff options
author | Anton Tananaev <anton.tananaev@gmail.com> | 2018-06-03 11:46:55 +1200 |
---|---|---|
committer | Anton Tananaev <anton.tananaev@gmail.com> | 2018-06-03 11:46:55 +1200 |
commit | f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab (patch) | |
tree | c82ad82bdb77e0e33340d6a020dc0912448f60d8 /src/org/traccar/BasePipelineFactory.java | |
parent | 06e3bd8b16da12baafc9a97ba5949b3f7ffb5e07 (diff) | |
download | trackermap-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.tar.gz trackermap-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.tar.bz2 trackermap-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.zip |
Properly handle datagram channels
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r-- | src/org/traccar/BasePipelineFactory.java | 51 |
1 files changed, 43 insertions, 8 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 0b076a834..2cf4333f5 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleStateHandler; import org.traccar.events.CommandResultEventHandler; import org.traccar.events.DriverEventHandler; @@ -80,6 +82,39 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { server.getChannelGroup().add(ctx.channel()); } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + server.getChannelGroup().remove(ctx.channel()); + } + + } + + private static class NetworkMessageHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (ctx.channel() instanceof DatagramChannel) { + DatagramPacket packet = (DatagramPacket) msg; + ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); + } else { + ByteBuf buffer = (ByteBuf) msg; + ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + NetworkMessage message = (NetworkMessage) msg; + if (ctx.channel() instanceof DatagramChannel) { + InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); + InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); + ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + } else { + ctx.write(message.getMessage(), promise); + } + } + } private static class StandardLoggingHandler extends ChannelDuplexHandler { @@ -97,6 +132,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { } public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { + NetworkMessage networkMessage = (NetworkMessage) o; StringBuilder message = new StringBuilder(); message.append("[").append(ctx.channel().id().asShortText()).append(": "); @@ -107,17 +143,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { message.append(" < "); } - if (ctx.channel().remoteAddress() != null) { - message.append(((InetSocketAddress) ctx.channel().remoteAddress()).getHostString()); + if (networkMessage.getRemoteAddress() != null) { + message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString()); } else { message.append("null"); } message.append("]"); - if (o instanceof ByteBuf) { - message.append(" HEX: "); - message.append(ByteBufUtil.hexDump((ByteBuf) o)); - } + message.append(" HEX: "); + message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage())); Log.debug(message.toString()); } @@ -192,7 +226,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { } } - protected abstract void addSpecificHandlers(ChannelPipeline pipeline); + protected abstract void addProtocolHandlers(ChannelPipeline pipeline); @Override protected void initChannel(Channel channel) throws Exception { @@ -201,11 +235,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0)); } pipeline.addLast("openHandler", new OpenChannelHandler(server)); + pipeline.addLast("messageHandler", new NetworkMessageHandler()); if (Context.isLoggerEnabled()) { pipeline.addLast("logger", new StandardLoggingHandler()); } - addSpecificHandlers(pipeline); + addProtocolHandlers(pipeline); if (geolocationHandler != null) { pipeline.addLast("location", geolocationHandler); |