aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnton Tananaev <anton.tananaev@gmail.com>2018-06-03 11:46:55 +1200
committerAnton Tananaev <anton.tananaev@gmail.com>2018-06-03 11:46:55 +1200
commitf0d1a5df7aecf1237609200d3ecb7cdd3d0abcab (patch)
treec82ad82bdb77e0e33340d6a020dc0912448f60d8 /src
parent06e3bd8b16da12baafc9a97ba5949b3f7ffb5e07 (diff)
downloadtraccar-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.tar.gz
traccar-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.tar.bz2
traccar-server-f0d1a5df7aecf1237609200d3ecb7cdd3d0abcab.zip
Properly handle datagram channels
Diffstat (limited to 'src')
-rw-r--r--src/org/traccar/AdvancedStringDecoder.java34
-rw-r--r--src/org/traccar/AdvancedStringEncoder.java37
-rw-r--r--src/org/traccar/BaseEventHandler.java1
-rw-r--r--src/org/traccar/BasePipelineFactory.java51
-rw-r--r--src/org/traccar/BaseProtocolDecoder.java2
-rw-r--r--src/org/traccar/BaseProtocolEncoder.java16
-rw-r--r--src/org/traccar/ExtendedObjectDecoder.java10
-rw-r--r--src/org/traccar/MainEventHandler.java4
-rw-r--r--src/org/traccar/NetworkMessage.java38
-rw-r--r--src/org/traccar/TrackerServer.java6
-rw-r--r--src/org/traccar/database/ActiveDevice.java3
-rw-r--r--src/org/traccar/protocol/Gps103Protocol.java16
-rw-r--r--src/org/traccar/protocol/Gps103ProtocolDecoder.java7
13 files changed, 189 insertions, 36 deletions
diff --git a/src/org/traccar/AdvancedStringDecoder.java b/src/org/traccar/AdvancedStringDecoder.java
new file mode 100644
index 000000000..d7cb6e7b2
--- /dev/null
+++ b/src/org/traccar/AdvancedStringDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.nio.charset.StandardCharsets;
+
+public class AdvancedStringDecoder extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ NetworkMessage networkMessage = (NetworkMessage) msg;
+ ByteBuf buf = (ByteBuf) networkMessage.getMessage();
+ ctx.fireChannelRead(new NetworkMessage(
+ buf.toString(StandardCharsets.US_ASCII), networkMessage.getRemoteAddress()));
+ }
+
+}
diff --git a/src/org/traccar/AdvancedStringEncoder.java b/src/org/traccar/AdvancedStringEncoder.java
new file mode 100644
index 000000000..7104b2154
--- /dev/null
+++ b/src/org/traccar/AdvancedStringEncoder.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class AdvancedStringEncoder extends ChannelOutboundHandlerAdapter {
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ NetworkMessage networkMessage = (NetworkMessage) msg;
+ String string = (String) networkMessage.getMessage();
+ ctx.write(new NetworkMessage(
+ ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(string), StandardCharsets.US_ASCII),
+ networkMessage.getRemoteAddress()), promise);
+ }
+
+}
diff --git a/src/org/traccar/BaseEventHandler.java b/src/org/traccar/BaseEventHandler.java
index b6f7e2085..50bbbefa2 100644
--- a/src/org/traccar/BaseEventHandler.java
+++ b/src/org/traccar/BaseEventHandler.java
@@ -24,7 +24,6 @@ public abstract class BaseEventHandler extends BaseDataHandler {
@Override
protected Position handlePosition(Position position) {
-
Map<Event, Position> events = analyzePosition(position);
if (events != null && Context.getNotificationManager() != null) {
Context.getNotificationManager().updateEvents(events);
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index 0b076a834..2cf4333f5 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.timeout.IdleStateHandler;
import org.traccar.events.CommandResultEventHandler;
import org.traccar.events.DriverEventHandler;
@@ -80,6 +82,39 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
server.getChannelGroup().add(ctx.channel());
}
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ server.getChannelGroup().remove(ctx.channel());
+ }
+
+ }
+
+ private static class NetworkMessageHandler extends ChannelDuplexHandler {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (ctx.channel() instanceof DatagramChannel) {
+ DatagramPacket packet = (DatagramPacket) msg;
+ ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender()));
+ } else {
+ ByteBuf buffer = (ByteBuf) msg;
+ ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress()));
+ }
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ NetworkMessage message = (NetworkMessage) msg;
+ if (ctx.channel() instanceof DatagramChannel) {
+ InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
+ InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
+ ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
+ } else {
+ ctx.write(message.getMessage(), promise);
+ }
+ }
+
}
private static class StandardLoggingHandler extends ChannelDuplexHandler {
@@ -97,6 +132,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
}
public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
+ NetworkMessage networkMessage = (NetworkMessage) o;
StringBuilder message = new StringBuilder();
message.append("[").append(ctx.channel().id().asShortText()).append(": ");
@@ -107,17 +143,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
message.append(" < ");
}
- if (ctx.channel().remoteAddress() != null) {
- message.append(((InetSocketAddress) ctx.channel().remoteAddress()).getHostString());
+ if (networkMessage.getRemoteAddress() != null) {
+ message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString());
} else {
message.append("null");
}
message.append("]");
- if (o instanceof ByteBuf) {
- message.append(" HEX: ");
- message.append(ByteBufUtil.hexDump((ByteBuf) o));
- }
+ message.append(" HEX: ");
+ message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage()));
Log.debug(message.toString());
}
@@ -192,7 +226,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
}
}
- protected abstract void addSpecificHandlers(ChannelPipeline pipeline);
+ protected abstract void addProtocolHandlers(ChannelPipeline pipeline);
@Override
protected void initChannel(Channel channel) throws Exception {
@@ -201,11 +235,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0));
}
pipeline.addLast("openHandler", new OpenChannelHandler(server));
+ pipeline.addLast("messageHandler", new NetworkMessageHandler());
if (Context.isLoggerEnabled()) {
pipeline.addLast("logger", new StandardLoggingHandler());
}
- addSpecificHandlers(pipeline);
+ addProtocolHandlers(pipeline);
if (geolocationHandler != null) {
pipeline.addLast("location", geolocationHandler);
diff --git a/src/org/traccar/BaseProtocolDecoder.java b/src/org/traccar/BaseProtocolDecoder.java
index de6c00d7c..0750e1096 100644
--- a/src/org/traccar/BaseProtocolDecoder.java
+++ b/src/org/traccar/BaseProtocolDecoder.java
@@ -16,6 +16,7 @@
package org.traccar;
import io.netty.channel.Channel;
+import io.netty.channel.socket.DatagramChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import org.traccar.helper.Log;
import org.traccar.helper.UnitsConverter;
@@ -24,7 +25,6 @@ import org.traccar.model.Position;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.channels.DatagramChannel;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
diff --git a/src/org/traccar/BaseProtocolEncoder.java b/src/org/traccar/BaseProtocolEncoder.java
index db4f2cbbf..ed71b5551 100644
--- a/src/org/traccar/BaseProtocolEncoder.java
+++ b/src/org/traccar/BaseProtocolEncoder.java
@@ -44,8 +44,11 @@ public abstract class BaseProtocolEncoder extends ChannelOutboundHandlerAdapter
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if (msg instanceof Command) {
- Command command = (Command) msg;
+ NetworkMessage networkMessage = (NetworkMessage) msg;
+
+ if (networkMessage.getMessage() instanceof Command) {
+
+ Command command = (Command) networkMessage.getMessage();
Object encodedCommand = encodeCommand(ctx.channel(), command);
StringBuilder s = new StringBuilder();
@@ -59,10 +62,13 @@ public abstract class BaseProtocolEncoder extends ChannelOutboundHandlerAdapter
}
Log.info(s.toString());
- super.write(ctx, encodedCommand, promise);
- }
+ ctx.write(new NetworkMessage(encodedCommand, networkMessage.getRemoteAddress()), promise);
- super.write(ctx, msg, promise);
+ } else {
+
+ super.write(ctx, msg, promise);
+
+ }
}
protected Object encodeCommand(Channel channel, Command command) {
diff --git a/src/org/traccar/ExtendedObjectDecoder.java b/src/org/traccar/ExtendedObjectDecoder.java
index 6a6a0fc4d..22a9c7262 100644
--- a/src/org/traccar/ExtendedObjectDecoder.java
+++ b/src/org/traccar/ExtendedObjectDecoder.java
@@ -43,14 +43,16 @@ public abstract class ExtendedObjectDecoder extends ChannelInboundHandlerAdapter
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object originalMessage) throws Exception {
- Object decodedMessage = decode(ctx.channel(), ctx.channel().remoteAddress(), originalMessage);
- onMessageEvent(ctx.channel(), ctx.channel().remoteAddress(), originalMessage, decodedMessage);
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ NetworkMessage networkMessage = (NetworkMessage) msg;
+ Object originalMessage = networkMessage.getMessage();
+ Object decodedMessage = decode(ctx.channel(), networkMessage.getRemoteAddress(), originalMessage);
+ onMessageEvent(ctx.channel(), networkMessage.getRemoteAddress(), originalMessage, decodedMessage);
if (originalMessage == decodedMessage) {
ctx.fireChannelRead(originalMessage);
} else {
if (decodedMessage == null) {
- decodedMessage = handleEmptyMessage(ctx.channel(), ctx.channel().remoteAddress(), originalMessage);
+ decodedMessage = handleEmptyMessage(ctx.channel(), networkMessage.getRemoteAddress(), originalMessage);
}
if (decodedMessage != null) {
if (decodedMessage instanceof Collection) {
diff --git a/src/org/traccar/MainEventHandler.java b/src/org/traccar/MainEventHandler.java
index 21b917a2d..49694aaff 100644
--- a/src/org/traccar/MainEventHandler.java
+++ b/src/org/traccar/MainEventHandler.java
@@ -18,11 +18,11 @@ package org.traccar;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.DatagramChannel;
import io.netty.handler.timeout.IdleStateEvent;
import org.traccar.helper.Log;
import org.traccar.model.Position;
-import java.nio.channels.DatagramChannel;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
@@ -101,7 +101,7 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- Log.warning(formatChannel(ctx.channel()) + " error", cause.getCause());
+ Log.warning(formatChannel(ctx.channel()) + " error", cause);
closeChannel(ctx.channel());
}
diff --git a/src/org/traccar/NetworkMessage.java b/src/org/traccar/NetworkMessage.java
new file mode 100644
index 000000000..14a397e69
--- /dev/null
+++ b/src/org/traccar/NetworkMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar;
+
+import java.net.SocketAddress;
+
+public class NetworkMessage {
+
+ private final SocketAddress remoteAddress;
+ private final Object message;
+
+ public NetworkMessage(Object message, SocketAddress remoteAddress) {
+ this.message = message;
+ this.remoteAddress = remoteAddress;
+ }
+
+ public SocketAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public Object getMessage() {
+ return message;
+ }
+
+}
diff --git a/src/org/traccar/TrackerServer.java b/src/org/traccar/TrackerServer.java
index cce3261b9..7081d455d 100644
--- a/src/org/traccar/TrackerServer.java
+++ b/src/org/traccar/TrackerServer.java
@@ -45,8 +45,8 @@ public abstract class TrackerServer {
BasePipelineFactory pipelineFactory = new BasePipelineFactory(this, protocol) {
@Override
- protected void addSpecificHandlers(ChannelPipeline pipeline) {
- TrackerServer.this.addSpecificHandlers(pipeline);
+ protected void addProtocolHandlers(ChannelPipeline pipeline) {
+ TrackerServer.this.addProtocolHandlers(pipeline);
}
};
@@ -67,7 +67,7 @@ public abstract class TrackerServer {
}
}
- protected abstract void addSpecificHandlers(ChannelPipeline pipeline);
+ protected abstract void addProtocolHandlers(ChannelPipeline pipeline);
private int port;
diff --git a/src/org/traccar/database/ActiveDevice.java b/src/org/traccar/database/ActiveDevice.java
index e58c05e19..207fc454b 100644
--- a/src/org/traccar/database/ActiveDevice.java
+++ b/src/org/traccar/database/ActiveDevice.java
@@ -16,6 +16,7 @@
package org.traccar.database;
import io.netty.channel.Channel;
+import org.traccar.NetworkMessage;
import org.traccar.Protocol;
import org.traccar.model.Command;
@@ -48,7 +49,7 @@ public class ActiveDevice {
}
public void write(Object message) {
- channel.write(message); // TODO handle UDP connection
+ channel.writeAndFlush(new NetworkMessage(message, remoteAddress));
}
}
diff --git a/src/org/traccar/protocol/Gps103Protocol.java b/src/org/traccar/protocol/Gps103Protocol.java
index aa96215da..cdf6e10a9 100644
--- a/src/org/traccar/protocol/Gps103Protocol.java
+++ b/src/org/traccar/protocol/Gps103Protocol.java
@@ -16,8 +16,8 @@
package org.traccar.protocol;
import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.string.StringDecoder;
-import io.netty.handler.codec.string.StringEncoder;
+import org.traccar.AdvancedStringDecoder;
+import org.traccar.AdvancedStringEncoder;
import org.traccar.BaseProtocol;
import org.traccar.CharacterDelimiterFrameDecoder;
import org.traccar.TrackerServer;
@@ -45,19 +45,19 @@ public class Gps103Protocol extends BaseProtocol {
public void initTrackerServers(List<TrackerServer> serverList) {
serverList.add(new TrackerServer(false, getName()) {
@Override
- protected void addSpecificHandlers(ChannelPipeline pipeline) {
+ protected void addProtocolHandlers(ChannelPipeline pipeline) {
pipeline.addLast("frameDecoder", new CharacterDelimiterFrameDecoder(2048, "\r\n", "\n", ";"));
- pipeline.addLast("stringEncoder", new StringEncoder());
- pipeline.addLast("stringDecoder", new StringDecoder());
+ pipeline.addLast("stringEncoder", new AdvancedStringEncoder());
+ pipeline.addLast("stringDecoder", new AdvancedStringDecoder());
pipeline.addLast("objectEncoder", new Gps103ProtocolEncoder());
pipeline.addLast("objectDecoder", new Gps103ProtocolDecoder(Gps103Protocol.this));
}
});
serverList.add(new TrackerServer(true, getName()) {
@Override
- protected void addSpecificHandlers(ChannelPipeline pipeline) {
- pipeline.addLast("stringEncoder", new StringEncoder());
- pipeline.addLast("stringDecoder", new StringDecoder());
+ protected void addProtocolHandlers(ChannelPipeline pipeline) {
+ pipeline.addLast("stringEncoder", new AdvancedStringEncoder());
+ pipeline.addLast("stringDecoder", new AdvancedStringDecoder());
pipeline.addLast("objectEncoder", new Gps103ProtocolEncoder());
pipeline.addLast("objectDecoder", new Gps103ProtocolDecoder(Gps103Protocol.this));
}
diff --git a/src/org/traccar/protocol/Gps103ProtocolDecoder.java b/src/org/traccar/protocol/Gps103ProtocolDecoder.java
index 4e39b32b3..7a66a48bd 100644
--- a/src/org/traccar/protocol/Gps103ProtocolDecoder.java
+++ b/src/org/traccar/protocol/Gps103ProtocolDecoder.java
@@ -18,6 +18,7 @@ package org.traccar.protocol;
import io.netty.channel.Channel;
import org.traccar.BaseProtocolDecoder;
import org.traccar.DeviceSession;
+import org.traccar.NetworkMessage;
import org.traccar.helper.DateBuilder;
import org.traccar.helper.Parser;
import org.traccar.helper.PatternBuilder;
@@ -160,7 +161,7 @@ public class Gps103ProtocolDecoder extends BaseProtocolDecoder {
position.set(Position.KEY_ALARM, decodeAlarm(alarm));
if (alarm.equals("help me")) {
if (channel != null) {
- channel.write("**,imei:" + imei + ",E;");
+ channel.writeAndFlush(new NetworkMessage("**,imei:" + imei + ",E;", remoteAddress));
}
} else if (alarm.equals("acc on")) {
position.set(Position.KEY_IGNITION, true);
@@ -276,7 +277,7 @@ public class Gps103ProtocolDecoder extends BaseProtocolDecoder {
if (sentence.contains("##")) {
if (channel != null) {
- channel.write("LOAD");
+ channel.writeAndFlush(new NetworkMessage("LOAD", remoteAddress));
Matcher matcher = Pattern.compile("##,imei:(\\d+),A").matcher(sentence);
if (matcher.matches()) {
getDeviceSession(channel, remoteAddress, matcher.group(1));
@@ -287,7 +288,7 @@ public class Gps103ProtocolDecoder extends BaseProtocolDecoder {
if (!sentence.isEmpty() && Character.isDigit(sentence.charAt(0))) {
if (channel != null) {
- channel.write("ON");
+ channel.writeAndFlush(new NetworkMessage("ON", remoteAddress));
}
int start = sentence.indexOf("imei:");
if (start >= 0) {