From f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 3 Jun 2018 11:46:55 +1200 Subject: Properly handle datagram channels --- src/org/traccar/AdvancedStringDecoder.java | 34 +++++++++++++++ src/org/traccar/AdvancedStringEncoder.java | 37 ++++++++++++++++ src/org/traccar/BaseEventHandler.java | 1 - src/org/traccar/BasePipelineFactory.java | 51 ++++++++++++++++++---- src/org/traccar/BaseProtocolDecoder.java | 2 +- src/org/traccar/BaseProtocolEncoder.java | 16 ++++--- src/org/traccar/ExtendedObjectDecoder.java | 10 +++-- src/org/traccar/MainEventHandler.java | 4 +- src/org/traccar/NetworkMessage.java | 38 ++++++++++++++++ src/org/traccar/TrackerServer.java | 6 +-- src/org/traccar/database/ActiveDevice.java | 3 +- src/org/traccar/protocol/Gps103Protocol.java | 16 +++---- .../traccar/protocol/Gps103ProtocolDecoder.java | 7 +-- 13 files changed, 189 insertions(+), 36 deletions(-) create mode 100644 src/org/traccar/AdvancedStringDecoder.java create mode 100644 src/org/traccar/AdvancedStringEncoder.java create mode 100644 src/org/traccar/NetworkMessage.java (limited to 'src/org') diff --git a/src/org/traccar/AdvancedStringDecoder.java b/src/org/traccar/AdvancedStringDecoder.java new file mode 100644 index 000000000..d7cb6e7b2 --- /dev/null +++ b/src/org/traccar/AdvancedStringDecoder.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.nio.charset.StandardCharsets; + +public class AdvancedStringDecoder extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + NetworkMessage networkMessage = (NetworkMessage) msg; + ByteBuf buf = (ByteBuf) networkMessage.getMessage(); + ctx.fireChannelRead(new NetworkMessage( + buf.toString(StandardCharsets.US_ASCII), networkMessage.getRemoteAddress())); + } + +} diff --git a/src/org/traccar/AdvancedStringEncoder.java b/src/org/traccar/AdvancedStringEncoder.java new file mode 100644 index 000000000..7104b2154 --- /dev/null +++ b/src/org/traccar/AdvancedStringEncoder.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; + +public class AdvancedStringEncoder extends ChannelOutboundHandlerAdapter { + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + NetworkMessage networkMessage = (NetworkMessage) msg; + String string = (String) networkMessage.getMessage(); + ctx.write(new NetworkMessage( + ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(string), StandardCharsets.US_ASCII), + networkMessage.getRemoteAddress()), promise); + } + +} diff --git a/src/org/traccar/BaseEventHandler.java b/src/org/traccar/BaseEventHandler.java index b6f7e2085..50bbbefa2 100644 --- a/src/org/traccar/BaseEventHandler.java +++ b/src/org/traccar/BaseEventHandler.java @@ -24,7 +24,6 @@ public abstract class BaseEventHandler extends BaseDataHandler { @Override protected Position handlePosition(Position position) { - Map events = analyzePosition(position); if (events != null && Context.getNotificationManager() != null) { Context.getNotificationManager().updateEvents(events); diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 0b076a834..2cf4333f5 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleStateHandler; import org.traccar.events.CommandResultEventHandler; import org.traccar.events.DriverEventHandler; @@ -80,6 +82,39 @@ public abstract class BasePipelineFactory extends ChannelInitializer { server.getChannelGroup().add(ctx.channel()); } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + server.getChannelGroup().remove(ctx.channel()); + } + + } + + private static class NetworkMessageHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (ctx.channel() instanceof DatagramChannel) { + DatagramPacket packet = (DatagramPacket) msg; + ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); + } else { + ByteBuf buffer = (ByteBuf) msg; + ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + NetworkMessage message = (NetworkMessage) msg; + if (ctx.channel() instanceof DatagramChannel) { + InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); + InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); + ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + } else { + ctx.write(message.getMessage(), promise); + } + } + } private static class StandardLoggingHandler extends ChannelDuplexHandler { @@ -97,6 +132,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer { } public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { + NetworkMessage networkMessage = (NetworkMessage) o; StringBuilder message = new StringBuilder(); message.append("[").append(ctx.channel().id().asShortText()).append(": "); @@ -107,17 +143,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer { message.append(" < "); } - if (ctx.channel().remoteAddress() != null) { - message.append(((InetSocketAddress) ctx.channel().remoteAddress()).getHostString()); + if (networkMessage.getRemoteAddress() != null) { + message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString()); } else { message.append("null"); } message.append("]"); - if (o instanceof ByteBuf) { - message.append(" HEX: "); - message.append(ByteBufUtil.hexDump((ByteBuf) o)); - } + message.append(" HEX: "); + message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage())); Log.debug(message.toString()); } @@ -192,7 +226,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer { } } - protected abstract void addSpecificHandlers(ChannelPipeline pipeline); + protected abstract void addProtocolHandlers(ChannelPipeline pipeline); @Override protected void initChannel(Channel channel) throws Exception { @@ -201,11 +235,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer { pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0)); } pipeline.addLast("openHandler", new OpenChannelHandler(server)); + pipeline.addLast("messageHandler", new NetworkMessageHandler()); if (Context.isLoggerEnabled()) { pipeline.addLast("logger", new StandardLoggingHandler()); } - addSpecificHandlers(pipeline); + addProtocolHandlers(pipeline); if (geolocationHandler != null) { pipeline.addLast("location", geolocationHandler); diff --git a/src/org/traccar/BaseProtocolDecoder.java b/src/org/traccar/BaseProtocolDecoder.java index de6c00d7c..0750e1096 100644 --- a/src/org/traccar/BaseProtocolDecoder.java +++ b/src/org/traccar/BaseProtocolDecoder.java @@ -16,6 +16,7 @@ package org.traccar; import io.netty.channel.Channel; +import io.netty.channel.socket.DatagramChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import org.traccar.helper.Log; import org.traccar.helper.UnitsConverter; @@ -24,7 +25,6 @@ import org.traccar.model.Position; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.channels.DatagramChannel; import java.util.Collection; import java.util.Date; import java.util.HashMap; diff --git a/src/org/traccar/BaseProtocolEncoder.java b/src/org/traccar/BaseProtocolEncoder.java index db4f2cbbf..ed71b5551 100644 --- a/src/org/traccar/BaseProtocolEncoder.java +++ b/src/org/traccar/BaseProtocolEncoder.java @@ -44,8 +44,11 @@ public abstract class BaseProtocolEncoder extends ChannelOutboundHandlerAdapter @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof Command) { - Command command = (Command) msg; + NetworkMessage networkMessage = (NetworkMessage) msg; + + if (networkMessage.getMessage() instanceof Command) { + + Command command = (Command) networkMessage.getMessage(); Object encodedCommand = encodeCommand(ctx.channel(), command); StringBuilder s = new StringBuilder(); @@ -59,10 +62,13 @@ public abstract class BaseProtocolEncoder extends ChannelOutboundHandlerAdapter } Log.info(s.toString()); - super.write(ctx, encodedCommand, promise); - } + ctx.write(new NetworkMessage(encodedCommand, networkMessage.getRemoteAddress()), promise); - super.write(ctx, msg, promise); + } else { + + super.write(ctx, msg, promise); + + } } protected Object encodeCommand(Channel channel, Command command) { diff --git a/src/org/traccar/ExtendedObjectDecoder.java b/src/org/traccar/ExtendedObjectDecoder.java index 6a6a0fc4d..22a9c7262 100644 --- a/src/org/traccar/ExtendedObjectDecoder.java +++ b/src/org/traccar/ExtendedObjectDecoder.java @@ -43,14 +43,16 @@ public abstract class ExtendedObjectDecoder extends ChannelInboundHandlerAdapter } @Override - public void channelRead(ChannelHandlerContext ctx, Object originalMessage) throws Exception { - Object decodedMessage = decode(ctx.channel(), ctx.channel().remoteAddress(), originalMessage); - onMessageEvent(ctx.channel(), ctx.channel().remoteAddress(), originalMessage, decodedMessage); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + NetworkMessage networkMessage = (NetworkMessage) msg; + Object originalMessage = networkMessage.getMessage(); + Object decodedMessage = decode(ctx.channel(), networkMessage.getRemoteAddress(), originalMessage); + onMessageEvent(ctx.channel(), networkMessage.getRemoteAddress(), originalMessage, decodedMessage); if (originalMessage == decodedMessage) { ctx.fireChannelRead(originalMessage); } else { if (decodedMessage == null) { - decodedMessage = handleEmptyMessage(ctx.channel(), ctx.channel().remoteAddress(), originalMessage); + decodedMessage = handleEmptyMessage(ctx.channel(), networkMessage.getRemoteAddress(), originalMessage); } if (decodedMessage != null) { if (decodedMessage instanceof Collection) { diff --git a/src/org/traccar/MainEventHandler.java b/src/org/traccar/MainEventHandler.java index 21b917a2d..49694aaff 100644 --- a/src/org/traccar/MainEventHandler.java +++ b/src/org/traccar/MainEventHandler.java @@ -18,11 +18,11 @@ package org.traccar; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.socket.DatagramChannel; import io.netty.handler.timeout.IdleStateEvent; import org.traccar.helper.Log; import org.traccar.model.Position; -import java.nio.channels.DatagramChannel; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Arrays; @@ -101,7 +101,7 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - Log.warning(formatChannel(ctx.channel()) + " error", cause.getCause()); + Log.warning(formatChannel(ctx.channel()) + " error", cause); closeChannel(ctx.channel()); } diff --git a/src/org/traccar/NetworkMessage.java b/src/org/traccar/NetworkMessage.java new file mode 100644 index 000000000..14a397e69 --- /dev/null +++ b/src/org/traccar/NetworkMessage.java @@ -0,0 +1,38 @@ +/* + * Copyright 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import java.net.SocketAddress; + +public class NetworkMessage { + + private final SocketAddress remoteAddress; + private final Object message; + + public NetworkMessage(Object message, SocketAddress remoteAddress) { + this.message = message; + this.remoteAddress = remoteAddress; + } + + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + public Object getMessage() { + return message; + } + +} diff --git a/src/org/traccar/TrackerServer.java b/src/org/traccar/TrackerServer.java index cce3261b9..7081d455d 100644 --- a/src/org/traccar/TrackerServer.java +++ b/src/org/traccar/TrackerServer.java @@ -45,8 +45,8 @@ public abstract class TrackerServer { BasePipelineFactory pipelineFactory = new BasePipelineFactory(this, protocol) { @Override - protected void addSpecificHandlers(ChannelPipeline pipeline) { - TrackerServer.this.addSpecificHandlers(pipeline); + protected void addProtocolHandlers(ChannelPipeline pipeline) { + TrackerServer.this.addProtocolHandlers(pipeline); } }; @@ -67,7 +67,7 @@ public abstract class TrackerServer { } } - protected abstract void addSpecificHandlers(ChannelPipeline pipeline); + protected abstract void addProtocolHandlers(ChannelPipeline pipeline); private int port; diff --git a/src/org/traccar/database/ActiveDevice.java b/src/org/traccar/database/ActiveDevice.java index e58c05e19..207fc454b 100644 --- a/src/org/traccar/database/ActiveDevice.java +++ b/src/org/traccar/database/ActiveDevice.java @@ -16,6 +16,7 @@ package org.traccar.database; import io.netty.channel.Channel; +import org.traccar.NetworkMessage; import org.traccar.Protocol; import org.traccar.model.Command; @@ -48,7 +49,7 @@ public class ActiveDevice { } public void write(Object message) { - channel.write(message); // TODO handle UDP connection + channel.writeAndFlush(new NetworkMessage(message, remoteAddress)); } } diff --git a/src/org/traccar/protocol/Gps103Protocol.java b/src/org/traccar/protocol/Gps103Protocol.java index aa96215da..cdf6e10a9 100644 --- a/src/org/traccar/protocol/Gps103Protocol.java +++ b/src/org/traccar/protocol/Gps103Protocol.java @@ -16,8 +16,8 @@ package org.traccar.protocol; import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; +import org.traccar.AdvancedStringDecoder; +import org.traccar.AdvancedStringEncoder; import org.traccar.BaseProtocol; import org.traccar.CharacterDelimiterFrameDecoder; import org.traccar.TrackerServer; @@ -45,19 +45,19 @@ public class Gps103Protocol extends BaseProtocol { public void initTrackerServers(List serverList) { serverList.add(new TrackerServer(false, getName()) { @Override - protected void addSpecificHandlers(ChannelPipeline pipeline) { + protected void addProtocolHandlers(ChannelPipeline pipeline) { pipeline.addLast("frameDecoder", new CharacterDelimiterFrameDecoder(2048, "\r\n", "\n", ";")); - pipeline.addLast("stringEncoder", new StringEncoder()); - pipeline.addLast("stringDecoder", new StringDecoder()); + pipeline.addLast("stringEncoder", new AdvancedStringEncoder()); + pipeline.addLast("stringDecoder", new AdvancedStringDecoder()); pipeline.addLast("objectEncoder", new Gps103ProtocolEncoder()); pipeline.addLast("objectDecoder", new Gps103ProtocolDecoder(Gps103Protocol.this)); } }); serverList.add(new TrackerServer(true, getName()) { @Override - protected void addSpecificHandlers(ChannelPipeline pipeline) { - pipeline.addLast("stringEncoder", new StringEncoder()); - pipeline.addLast("stringDecoder", new StringDecoder()); + protected void addProtocolHandlers(ChannelPipeline pipeline) { + pipeline.addLast("stringEncoder", new AdvancedStringEncoder()); + pipeline.addLast("stringDecoder", new AdvancedStringDecoder()); pipeline.addLast("objectEncoder", new Gps103ProtocolEncoder()); pipeline.addLast("objectDecoder", new Gps103ProtocolDecoder(Gps103Protocol.this)); } diff --git a/src/org/traccar/protocol/Gps103ProtocolDecoder.java b/src/org/traccar/protocol/Gps103ProtocolDecoder.java index 4e39b32b3..7a66a48bd 100644 --- a/src/org/traccar/protocol/Gps103ProtocolDecoder.java +++ b/src/org/traccar/protocol/Gps103ProtocolDecoder.java @@ -18,6 +18,7 @@ package org.traccar.protocol; import io.netty.channel.Channel; import org.traccar.BaseProtocolDecoder; import org.traccar.DeviceSession; +import org.traccar.NetworkMessage; import org.traccar.helper.DateBuilder; import org.traccar.helper.Parser; import org.traccar.helper.PatternBuilder; @@ -160,7 +161,7 @@ public class Gps103ProtocolDecoder extends BaseProtocolDecoder { position.set(Position.KEY_ALARM, decodeAlarm(alarm)); if (alarm.equals("help me")) { if (channel != null) { - channel.write("**,imei:" + imei + ",E;"); + channel.writeAndFlush(new NetworkMessage("**,imei:" + imei + ",E;", remoteAddress)); } } else if (alarm.equals("acc on")) { position.set(Position.KEY_IGNITION, true); @@ -276,7 +277,7 @@ public class Gps103ProtocolDecoder extends BaseProtocolDecoder { if (sentence.contains("##")) { if (channel != null) { - channel.write("LOAD"); + channel.writeAndFlush(new NetworkMessage("LOAD", remoteAddress)); Matcher matcher = Pattern.compile("##,imei:(\\d+),A").matcher(sentence); if (matcher.matches()) { getDeviceSession(channel, remoteAddress, matcher.group(1)); @@ -287,7 +288,7 @@ public class Gps103ProtocolDecoder extends BaseProtocolDecoder { if (!sentence.isEmpty() && Character.isDigit(sentence.charAt(0))) { if (channel != null) { - channel.write("ON"); + channel.writeAndFlush(new NetworkMessage("ON", remoteAddress)); } int start = sentence.indexOf("imei:"); if (start >= 0) { -- cgit v1.2.3