aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Tananaev <anton.tananaev@gmail.com>2015-07-12 13:05:10 +1200
committerAnton Tananaev <anton.tananaev@gmail.com>2015-07-12 13:05:10 +1200
commitf304021d16e433b261226a8293f03e7de30a08a4 (patch)
treebb908a0e437a4d80b7e5043346545d02e8b235af
parent639b427eba5738534c10f591e285d707e8b060e5 (diff)
downloadtrackermap-server-f304021d16e433b261226a8293f03e7de30a08a4.tar.gz
trackermap-server-f304021d16e433b261226a8293f03e7de30a08a4.tar.bz2
trackermap-server-f304021d16e433b261226a8293f03e7de30a08a4.zip
Re-factor all channel handlers
-rw-r--r--src/org/traccar/BaseDataHandler.java37
-rw-r--r--src/org/traccar/BasePipelineFactory.java17
-rw-r--r--src/org/traccar/ExtendedObjectDecoder.java9
-rw-r--r--src/org/traccar/FilterHandler.java26
-rw-r--r--src/org/traccar/MainEventHandler.java86
-rw-r--r--src/org/traccar/RemoteAddressHandler.java6
-rw-r--r--src/org/traccar/ReverseGeocoderHandler.java33
-rw-r--r--src/org/traccar/TrackerEventHandler.java93
8 files changed, 153 insertions, 154 deletions
diff --git a/src/org/traccar/BaseDataHandler.java b/src/org/traccar/BaseDataHandler.java
new file mode 100644
index 000000000..5a7e2a455
--- /dev/null
+++ b/src/org/traccar/BaseDataHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2015 Anton Tananaev (anton.tananaev@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+import org.traccar.model.Position;
+
+public abstract class BaseDataHandler extends OneToOneDecoder {
+
+ @Override
+ protected final Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+
+ if (msg instanceof Position) {
+ return handlePosition((Position) msg);
+ }
+
+ return msg;
+ }
+
+ protected abstract Position handlePosition(Position position);
+
+}
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index c4ca8f47c..4ff4030f0 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -18,7 +18,16 @@ package org.traccar;
import java.net.InetSocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DownstreamMessageEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.logging.LoggingHandler;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.traccar.helper.Log;
@@ -57,13 +66,13 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
msg.append("[").append(String.format("%08X", e.getChannel().getId())).append(": ");
msg.append(((InetSocketAddress) e.getChannel().getLocalAddress()).getPort());
- msg.append((e instanceof DownstreamMessageEvent) ? " -> " : " <- ");
+ msg.append((e instanceof DownstreamMessageEvent) ? " > " : " < ");
msg.append(((InetSocketAddress) event.getRemoteAddress()).getAddress().getHostAddress()).append("]");
// Append hex message
if (event.getMessage() instanceof ChannelBuffer) {
- msg.append(" - HEX: ");
+ msg.append(" HEX: ");
msg.append(ChannelBuffers.hexDump((ChannelBuffer) event.getMessage()));
}
@@ -116,7 +125,7 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
pipeline.addLast("geocoder", new ReverseGeocoderHandler(Context.getReverseGeocoder(), processInvalidPositions));
}
pipeline.addLast("remoteAddress", new RemoteAddressHandler());
- pipeline.addLast("handler", new TrackerEventHandler());
+ pipeline.addLast("handler", new MainEventHandler());
return pipeline;
}
diff --git a/src/org/traccar/ExtendedObjectDecoder.java b/src/org/traccar/ExtendedObjectDecoder.java
index ad0210934..d0f82dace 100644
--- a/src/org/traccar/ExtendedObjectDecoder.java
+++ b/src/org/traccar/ExtendedObjectDecoder.java
@@ -16,6 +16,7 @@
package org.traccar;
import java.net.SocketAddress;
+import java.util.Collection;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -39,7 +40,13 @@ public abstract class ExtendedObjectDecoder implements ChannelUpstreamHandler {
if (originalMessage == decodedMessage) {
ctx.sendUpstream(evt);
} else if (decodedMessage != null) {
- fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress());
+ if (decodedMessage instanceof Collection) {
+ for (Object o : (Collection) e.getMessage()) {
+ fireMessageReceived(ctx, o, e.getRemoteAddress());
+ }
+ } else {
+ fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress());
+ }
}
}
diff --git a/src/org/traccar/FilterHandler.java b/src/org/traccar/FilterHandler.java
index e5a06579c..a5be11463 100644
--- a/src/org/traccar/FilterHandler.java
+++ b/src/org/traccar/FilterHandler.java
@@ -15,16 +15,12 @@
*/
package org.traccar;
-import java.net.SocketAddress;
-import java.util.Iterator;
-import java.util.List;
import java.util.Properties;
-import org.jboss.netty.channel.Channel;
import org.traccar.helper.DistanceCalculator;
import org.traccar.helper.Log;
import org.traccar.model.Position;
-public class FilterHandler extends ExtendedObjectDecoder {
+public class FilterHandler extends BaseDataHandler {
private boolean filterInvalid;
private boolean filterZero;
@@ -144,24 +140,8 @@ public class FilterHandler extends ExtendedObjectDecoder {
}
@Override
- protected Object decode(
- Channel channel, SocketAddress remoteAddress, Object msg)
- throws Exception {
-
- if (msg instanceof Position) {
- if (filter((Position) msg)) {
- return null;
- }
- } else if (msg instanceof List) {
- Iterator<Position> i = ((List<Position>) msg).iterator();
- while (i.hasNext()) {
- if (filter(i.next())) {
- i.remove();
- }
- }
- }
-
- return msg;
+ protected Position handlePosition(Position position) {
+ return filter(position) ? null : position;
}
}
diff --git a/src/org/traccar/MainEventHandler.java b/src/org/traccar/MainEventHandler.java
new file mode 100644
index 000000000..30ec75f58
--- /dev/null
+++ b/src/org/traccar/MainEventHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2012 - 2015 Anton Tananaev (anton.tananaev@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateEvent;
+import org.traccar.helper.Log;
+import org.traccar.model.Position;
+
+public class MainEventHandler extends IdleStateAwareChannelHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+
+ if (e.getMessage() != null) {
+
+ if (e.getMessage() instanceof Position) {
+
+ Position position = (Position) e.getMessage();
+
+ // Log position
+ StringBuilder s = new StringBuilder();
+ s.append("device: ").append(position.getDeviceId()).append(", ");
+ s.append("time: ").append(position.getFixTime()).append(", ");
+ s.append("lat: ").append(position.getLatitude()).append(", ");
+ s.append("lon: ").append(position.getLongitude());
+ Log.info(s.toString());
+
+ try {
+ Context.getDataManager().addPosition(position);
+ Context.getDataManager().updateLatestPosition(position);
+ Context.getConnectionManager().update(position);
+ } catch (Exception error) {
+ Log.warning(error);
+ }
+
+ }
+ }
+ }
+
+ private static String formatChannel(Channel channel) {
+ return String.format("[%08X]", channel.getId());
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ Log.info(formatChannel(e.getChannel()) + " connected");
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ Log.info(formatChannel(e.getChannel()) + " disconnected");
+ e.getChannel().close();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ Log.info(formatChannel(e.getChannel()) + " error caught");
+ e.getChannel().close();
+ }
+
+ @Override
+ public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
+ Log.info(formatChannel(e.getChannel()) + " timed out");
+ e.getChannel().close();
+ }
+
+}
diff --git a/src/org/traccar/RemoteAddressHandler.java b/src/org/traccar/RemoteAddressHandler.java
index d9522020d..a26a1aff2 100644
--- a/src/org/traccar/RemoteAddressHandler.java
+++ b/src/org/traccar/RemoteAddressHandler.java
@@ -17,7 +17,6 @@ package org.traccar;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.List;
import org.jboss.netty.channel.Channel;
import org.traccar.model.Event;
import org.traccar.model.Position;
@@ -34,11 +33,6 @@ public class RemoteAddressHandler extends ExtendedObjectDecoder {
if (msg instanceof Position) {
Position position = (Position) msg;
position.set(Event.KEY_IP, hostAddress);
- } else if (msg instanceof List) {
- List<Position> positions = (List<Position>) msg;
- for (Position position : positions) {
- position.set(Event.KEY_IP, hostAddress);
- }
}
return msg;
diff --git a/src/org/traccar/ReverseGeocoderHandler.java b/src/org/traccar/ReverseGeocoderHandler.java
index aaa491ae7..bb4f5cedd 100644
--- a/src/org/traccar/ReverseGeocoderHandler.java
+++ b/src/org/traccar/ReverseGeocoderHandler.java
@@ -15,14 +15,11 @@
*/
package org.traccar;
-import java.net.SocketAddress;
-import java.util.List;
-import org.jboss.netty.channel.Channel;
import org.traccar.geocode.AddressFormat;
import org.traccar.geocode.ReverseGeocoder;
import org.traccar.model.Position;
-public class ReverseGeocoderHandler extends ExtendedObjectDecoder {
+public class ReverseGeocoderHandler extends BaseDataHandler {
private final ReverseGeocoder geocoder;
private final boolean processInvalidPositions;
@@ -35,30 +32,12 @@ public class ReverseGeocoderHandler extends ExtendedObjectDecoder {
}
@Override
- protected Object decode(
- Channel channel, SocketAddress remoteAddress, Object msg)
- throws Exception {
-
- if (geocoder != null) {
- if (msg instanceof Position) {
- Position position = (Position) msg;
-
- if (processInvalidPositions || position.getValid()) {
- position.setAddress(geocoder.getAddress(
- addressFormat, position.getLatitude(), position.getLongitude()));
- }
- } else if (msg instanceof List) {
- List<Position> positions = (List<Position>) msg;
- for (Position position : positions) {
- if (processInvalidPositions || position.getValid()) {
- position.setAddress(geocoder.getAddress(
- addressFormat, position.getLatitude(), position.getLongitude()));
- }
- }
- }
+ protected Position handlePosition(Position position) {
+ if (geocoder != null && (processInvalidPositions || position.getValid())) {
+ position.setAddress(geocoder.getAddress(
+ addressFormat, position.getLatitude(), position.getLongitude()));
}
-
- return msg;
+ return position;
}
}
diff --git a/src/org/traccar/TrackerEventHandler.java b/src/org/traccar/TrackerEventHandler.java
deleted file mode 100644
index 31e6fb5cb..000000000
--- a/src/org/traccar/TrackerEventHandler.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright 2012 - 2015 Anton Tananaev (anton.tananaev@gmail.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.traccar;
-
-import java.util.List;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
-import org.traccar.helper.Log;
-import org.traccar.model.Position;
-
-@ChannelHandler.Sharable
-public class TrackerEventHandler extends IdleStateAwareChannelHandler {
-
- private void processSinglePosition(Position position) {
- if (position == null) {
- Log.info("processSinglePosition null message");
- } else {
- StringBuilder s = new StringBuilder();
- s.append("device: ").append(position.getDeviceId()).append(", ");
- s.append("time: ").append(position.getFixTime()).append(", ");
- s.append("lat: ").append(position.getLatitude()).append(", ");
- s.append("lon: ").append(position.getLongitude());
- Log.info(s.toString());
- }
-
- try {
- Context.getDataManager().addPosition(position);
- } catch (Exception error) {
- Log.warning(error);
- }
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- Long id = null;
- Position lastPostition = null;
- if (e.getMessage() instanceof Position) {
- processSinglePosition((Position) e.getMessage());
- lastPostition = (Position) e.getMessage();
- } else if (e.getMessage() instanceof List) {
- List<Position> positions = (List<Position>) e.getMessage();
- for (Position position : positions) {
- processSinglePosition(position);
- lastPostition = position;
- }
- }
- if (lastPostition != null) {
- try {
- Context.getDataManager().updateLatestPosition(lastPostition);
- Context.getConnectionManager().update(lastPostition);
- } catch (Exception error) {
- Log.warning(error);
- }
- }
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- Log.info("Closing connection by disconnect");
- e.getChannel().close();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- Log.info("Closing connection by exception");
- e.getChannel().close();
- }
-
- @Override
- public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
- Log.info("Closing connection by timeout");
- e.getChannel().close();
- }
-
-}