From f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 3 Jun 2018 11:46:55 +1200 Subject: Properly handle datagram channels --- src/org/traccar/BasePipelineFactory.java | 51 +++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 8 deletions(-) (limited to 'src/org/traccar/BasePipelineFactory.java') diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 0b076a834..2cf4333f5 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleStateHandler; import org.traccar.events.CommandResultEventHandler; import org.traccar.events.DriverEventHandler; @@ -80,6 +82,39 @@ public abstract class BasePipelineFactory extends ChannelInitializer { server.getChannelGroup().add(ctx.channel()); } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + server.getChannelGroup().remove(ctx.channel()); + } + + } + + private static class NetworkMessageHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (ctx.channel() instanceof DatagramChannel) { + DatagramPacket packet = (DatagramPacket) msg; + ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); + } else { + ByteBuf buffer = (ByteBuf) msg; + ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + NetworkMessage message = (NetworkMessage) msg; + if (ctx.channel() instanceof DatagramChannel) { + InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); + InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); + ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + } else { + ctx.write(message.getMessage(), promise); + } + } + } private static class StandardLoggingHandler extends ChannelDuplexHandler { @@ -97,6 +132,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer { } public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { + NetworkMessage networkMessage = (NetworkMessage) o; StringBuilder message = new StringBuilder(); message.append("[").append(ctx.channel().id().asShortText()).append(": "); @@ -107,17 +143,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer { message.append(" < "); } - if (ctx.channel().remoteAddress() != null) { - message.append(((InetSocketAddress) ctx.channel().remoteAddress()).getHostString()); + if (networkMessage.getRemoteAddress() != null) { + message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString()); } else { message.append("null"); } message.append("]"); - if (o instanceof ByteBuf) { - message.append(" HEX: "); - message.append(ByteBufUtil.hexDump((ByteBuf) o)); - } + message.append(" HEX: "); + message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage())); Log.debug(message.toString()); } @@ -192,7 +226,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer { } } - protected abstract void addSpecificHandlers(ChannelPipeline pipeline); + protected abstract void addProtocolHandlers(ChannelPipeline pipeline); @Override protected void initChannel(Channel channel) throws Exception { @@ -201,11 +235,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer { pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0)); } pipeline.addLast("openHandler", new OpenChannelHandler(server)); + pipeline.addLast("messageHandler", new NetworkMessageHandler()); if (Context.isLoggerEnabled()) { pipeline.addLast("logger", new StandardLoggingHandler()); } - addSpecificHandlers(pipeline); + addProtocolHandlers(pipeline); if (geolocationHandler != null) { pipeline.addLast("location", geolocationHandler); -- cgit v1.2.3