aboutsummaryrefslogtreecommitdiff
path: root/src/org/traccar/BasePipelineFactory.java
diff options
context:
space:
mode:
authorAnton Tananaev <anton.tananaev@gmail.com>2018-06-03 11:46:55 +1200
committerAnton Tananaev <anton.tananaev@gmail.com>2018-06-03 11:46:55 +1200
commitf0d1a5df7aecf1237609200d3ecb7cdd3d0abcab (patch)
treec82ad82bdb77e0e33340d6a020dc0912448f60d8 /src/org/traccar/BasePipelineFactory.java
parent06e3bd8b16da12baafc9a97ba5949b3f7ffb5e07 (diff)
downloadtrackermap-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.tar.gz
trackermap-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.tar.bz2
trackermap-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.zip
Properly handle datagram channels
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r--src/org/traccar/BasePipelineFactory.java51
1 files changed, 43 insertions, 8 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index 0b076a834..2cf4333f5 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.timeout.IdleStateHandler;
import org.traccar.events.CommandResultEventHandler;
import org.traccar.events.DriverEventHandler;
@@ -80,6 +82,39 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
server.getChannelGroup().add(ctx.channel());
}
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ server.getChannelGroup().remove(ctx.channel());
+ }
+
+ }
+
+ private static class NetworkMessageHandler extends ChannelDuplexHandler {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (ctx.channel() instanceof DatagramChannel) {
+ DatagramPacket packet = (DatagramPacket) msg;
+ ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender()));
+ } else {
+ ByteBuf buffer = (ByteBuf) msg;
+ ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress()));
+ }
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ NetworkMessage message = (NetworkMessage) msg;
+ if (ctx.channel() instanceof DatagramChannel) {
+ InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
+ InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
+ ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
+ } else {
+ ctx.write(message.getMessage(), promise);
+ }
+ }
+
}
private static class StandardLoggingHandler extends ChannelDuplexHandler {
@@ -97,6 +132,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
}
public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
+ NetworkMessage networkMessage = (NetworkMessage) o;
StringBuilder message = new StringBuilder();
message.append("[").append(ctx.channel().id().asShortText()).append(": ");
@@ -107,17 +143,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
message.append(" < ");
}
- if (ctx.channel().remoteAddress() != null) {
- message.append(((InetSocketAddress) ctx.channel().remoteAddress()).getHostString());
+ if (networkMessage.getRemoteAddress() != null) {
+ message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString());
} else {
message.append("null");
}
message.append("]");
- if (o instanceof ByteBuf) {
- message.append(" HEX: ");
- message.append(ByteBufUtil.hexDump((ByteBuf) o));
- }
+ message.append(" HEX: ");
+ message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage()));
Log.debug(message.toString());
}
@@ -192,7 +226,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
}
}
- protected abstract void addSpecificHandlers(ChannelPipeline pipeline);
+ protected abstract void addProtocolHandlers(ChannelPipeline pipeline);
@Override
protected void initChannel(Channel channel) throws Exception {
@@ -201,11 +235,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0));
}
pipeline.addLast("openHandler", new OpenChannelHandler(server));
+ pipeline.addLast("messageHandler", new NetworkMessageHandler());
if (Context.isLoggerEnabled()) {
pipeline.addLast("logger", new StandardLoggingHandler());
}
- addSpecificHandlers(pipeline);
+ addProtocolHandlers(pipeline);
if (geolocationHandler != null) {
pipeline.addLast("location", geolocationHandler);