From 24eac32a4a69cc7bbbfbe5c82464400ac0811684 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Fri, 18 Jan 2019 16:03:28 -0800 Subject: Improve pipeline message handing --- src/org/traccar/BasePipelineFactory.java | 62 +++++++++++++++++++------------- 1 file changed, 37 insertions(+), 25 deletions(-) (limited to 'src/org/traccar/BasePipelineFactory.java') diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 401c42d8b..b45e3a280 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 - 2018 Anton Tananaev (anton@traccar.org) + * Copyright 2012 - 2019 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ import org.traccar.processing.ComputedAttributesHandler; import org.traccar.processing.CopyAttributesHandler; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Map; public abstract class BasePipelineFactory extends ChannelInitializer { @@ -99,25 +100,29 @@ public abstract class BasePipelineFactory extends ChannelInitializer { private static class NetworkMessageHandler extends ChannelDuplexHandler { @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) { if (ctx.channel() instanceof DatagramChannel) { DatagramPacket packet = (DatagramPacket) msg; ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); - } else { + } else if (msg instanceof ByteBuf) { ByteBuf buffer = (ByteBuf) msg; ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); } } @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - NetworkMessage message = (NetworkMessage) msg; - if (ctx.channel() instanceof DatagramChannel) { - InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); - InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); - ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof NetworkMessage) { + NetworkMessage message = (NetworkMessage) msg; + if (ctx.channel() instanceof DatagramChannel) { + InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); + InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); + ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + } else { + ctx.write(message.getMessage(), promise); + } } else { - ctx.write(message.getMessage(), promise); + ctx.write(msg, promise); } } @@ -138,7 +143,17 @@ public abstract class BasePipelineFactory extends ChannelInitializer { } public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { - NetworkMessage networkMessage = (NetworkMessage) o; + if (o instanceof NetworkMessage) { + NetworkMessage networkMessage = (NetworkMessage) o; + if (networkMessage.getMessage() instanceof ByteBuf) { + log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); + } + } else if (o instanceof ByteBuf) { + log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); + } + } + + public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { StringBuilder message = new StringBuilder(); message.append("[").append(ctx.channel().id().asShortText()).append(": "); @@ -149,15 +164,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer { message.append(" < "); } - if (networkMessage.getRemoteAddress() != null) { - message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString()); + if (remoteAddress instanceof InetSocketAddress) { + message.append(((InetSocketAddress) remoteAddress).getHostString()); } else { - message.append("null"); + message.append("unknown"); } message.append("]"); message.append(" HEX: "); - message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage())); + message.append(ByteBufUtil.hexDump(buf)); LOGGER.info(message.toString()); } @@ -267,18 +282,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer { pipeline.addLast(new NetworkMessageHandler()); pipeline.addLast(new StandardLoggingHandler()); - addProtocolHandlers(new PipelineBuilder() { - @Override - public void addLast(ChannelHandler handler) { - if (!(handler instanceof BaseProtocolDecoder || handler instanceof BaseProtocolEncoder)) { - if (handler instanceof ChannelInboundHandler) { - handler = new WrapperInboundHandler((ChannelInboundHandler) handler); - } else { - handler = new WrapperOutboundHandler((ChannelOutboundHandler) handler); - } + addProtocolHandlers(handler -> { + if (!(handler instanceof BaseProtocolDecoder || handler instanceof BaseProtocolEncoder)) { + if (handler instanceof ChannelInboundHandler) { + handler = new WrapperInboundHandler((ChannelInboundHandler) handler); + } else { + handler = new WrapperOutboundHandler((ChannelOutboundHandler) handler); } - pipeline.addLast(handler); } + pipeline.addLast(handler); }); addHandlers( -- cgit v1.2.3