aboutsummaryrefslogtreecommitdiff
path: root/src/org/traccar/BasePipelineFactory.java
diff options
context:
space:
mode:
authorAnton Tananaev <anton.tananaev@gmail.com>2019-01-18 16:03:28 -0800
committerAnton Tananaev <anton.tananaev@gmail.com>2019-01-18 16:03:28 -0800
commit24eac32a4a69cc7bbbfbe5c82464400ac0811684 (patch)
tree3e6fc616f27e90d0da15b1a6083c437d7cd2eec6 /src/org/traccar/BasePipelineFactory.java
parentf7ee89be82c0b22186b487aafe384c9fa2c96323 (diff)
downloadtrackermap-server-24eac32a4a69cc7bbbfbe5c82464400ac0811684.tar.gz
trackermap-server-24eac32a4a69cc7bbbfbe5c82464400ac0811684.tar.bz2
trackermap-server-24eac32a4a69cc7bbbfbe5c82464400ac0811684.zip
Improve pipeline message handing
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r--src/org/traccar/BasePipelineFactory.java62
1 files changed, 37 insertions, 25 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index 401c42d8b..b45e3a280 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012 - 2018 Anton Tananaev (anton@traccar.org)
+ * Copyright 2012 - 2019 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,6 +44,7 @@ import org.traccar.processing.ComputedAttributesHandler;
import org.traccar.processing.CopyAttributesHandler;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Map;
public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
@@ -99,25 +100,29 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
private static class NetworkMessageHandler extends ChannelDuplexHandler {
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (ctx.channel() instanceof DatagramChannel) {
DatagramPacket packet = (DatagramPacket) msg;
ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender()));
- } else {
+ } else if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress()));
}
}
@Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- NetworkMessage message = (NetworkMessage) msg;
- if (ctx.channel() instanceof DatagramChannel) {
- InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
- InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
- ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+ if (msg instanceof NetworkMessage) {
+ NetworkMessage message = (NetworkMessage) msg;
+ if (ctx.channel() instanceof DatagramChannel) {
+ InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
+ InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
+ ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
+ } else {
+ ctx.write(message.getMessage(), promise);
+ }
} else {
- ctx.write(message.getMessage(), promise);
+ ctx.write(msg, promise);
}
}
@@ -138,7 +143,17 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
}
public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
- NetworkMessage networkMessage = (NetworkMessage) o;
+ if (o instanceof NetworkMessage) {
+ NetworkMessage networkMessage = (NetworkMessage) o;
+ if (networkMessage.getMessage() instanceof ByteBuf) {
+ log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage());
+ }
+ } else if (o instanceof ByteBuf) {
+ log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o);
+ }
+ }
+
+ public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) {
StringBuilder message = new StringBuilder();
message.append("[").append(ctx.channel().id().asShortText()).append(": ");
@@ -149,15 +164,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
message.append(" < ");
}
- if (networkMessage.getRemoteAddress() != null) {
- message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString());
+ if (remoteAddress instanceof InetSocketAddress) {
+ message.append(((InetSocketAddress) remoteAddress).getHostString());
} else {
- message.append("null");
+ message.append("unknown");
}
message.append("]");
message.append(" HEX: ");
- message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage()));
+ message.append(ByteBufUtil.hexDump(buf));
LOGGER.info(message.toString());
}
@@ -267,18 +282,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
pipeline.addLast(new NetworkMessageHandler());
pipeline.addLast(new StandardLoggingHandler());
- addProtocolHandlers(new PipelineBuilder() {
- @Override
- public void addLast(ChannelHandler handler) {
- if (!(handler instanceof BaseProtocolDecoder || handler instanceof BaseProtocolEncoder)) {
- if (handler instanceof ChannelInboundHandler) {
- handler = new WrapperInboundHandler((ChannelInboundHandler) handler);
- } else {
- handler = new WrapperOutboundHandler((ChannelOutboundHandler) handler);
- }
+ addProtocolHandlers(handler -> {
+ if (!(handler instanceof BaseProtocolDecoder || handler instanceof BaseProtocolEncoder)) {
+ if (handler instanceof ChannelInboundHandler) {
+ handler = new WrapperInboundHandler((ChannelInboundHandler) handler);
+ } else {
+ handler = new WrapperOutboundHandler((ChannelOutboundHandler) handler);
}
- pipeline.addLast(handler);
}
+ pipeline.addLast(handler);
});
addHandlers(