From f304021d16e433b261226a8293f03e7de30a08a4 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 12 Jul 2015 13:05:10 +1200 Subject: Re-factor all channel handlers --- src/org/traccar/BaseDataHandler.java | 37 ++++++++++++ src/org/traccar/BasePipelineFactory.java | 17 ++++-- src/org/traccar/ExtendedObjectDecoder.java | 9 ++- src/org/traccar/FilterHandler.java | 26 +------- src/org/traccar/MainEventHandler.java | 86 ++++++++++++++++++++++++++ src/org/traccar/RemoteAddressHandler.java | 6 -- src/org/traccar/ReverseGeocoderHandler.java | 33 ++-------- src/org/traccar/TrackerEventHandler.java | 93 ----------------------------- 8 files changed, 153 insertions(+), 154 deletions(-) create mode 100644 src/org/traccar/BaseDataHandler.java create mode 100644 src/org/traccar/MainEventHandler.java delete mode 100644 src/org/traccar/TrackerEventHandler.java diff --git a/src/org/traccar/BaseDataHandler.java b/src/org/traccar/BaseDataHandler.java new file mode 100644 index 000000000..5a7e2a455 --- /dev/null +++ b/src/org/traccar/BaseDataHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 Anton Tananaev (anton.tananaev@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; +import org.traccar.model.Position; + +public abstract class BaseDataHandler extends OneToOneDecoder { + + @Override + protected final Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { + + if (msg instanceof Position) { + return handlePosition((Position) msg); + } + + return msg; + } + + protected abstract Position handlePosition(Position position); + +} diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index c4ca8f47c..4ff4030f0 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -18,7 +18,16 @@ package org.traccar; import java.net.InetSocketAddress; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.DownstreamMessageEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.handler.logging.LoggingHandler; import org.jboss.netty.handler.timeout.IdleStateHandler; import org.traccar.helper.Log; @@ -57,13 +66,13 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { msg.append("[").append(String.format("%08X", e.getChannel().getId())).append(": "); msg.append(((InetSocketAddress) e.getChannel().getLocalAddress()).getPort()); - msg.append((e instanceof DownstreamMessageEvent) ? " -> " : " <- "); + msg.append((e instanceof DownstreamMessageEvent) ? " > " : " < "); msg.append(((InetSocketAddress) event.getRemoteAddress()).getAddress().getHostAddress()).append("]"); // Append hex message if (event.getMessage() instanceof ChannelBuffer) { - msg.append(" - HEX: "); + msg.append(" HEX: "); msg.append(ChannelBuffers.hexDump((ChannelBuffer) event.getMessage())); } @@ -116,7 +125,7 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory { pipeline.addLast("geocoder", new ReverseGeocoderHandler(Context.getReverseGeocoder(), processInvalidPositions)); } pipeline.addLast("remoteAddress", new RemoteAddressHandler()); - pipeline.addLast("handler", new TrackerEventHandler()); + pipeline.addLast("handler", new MainEventHandler()); return pipeline; } diff --git a/src/org/traccar/ExtendedObjectDecoder.java b/src/org/traccar/ExtendedObjectDecoder.java index ad0210934..d0f82dace 100644 --- a/src/org/traccar/ExtendedObjectDecoder.java +++ b/src/org/traccar/ExtendedObjectDecoder.java @@ -16,6 +16,7 @@ package org.traccar; import java.net.SocketAddress; +import java.util.Collection; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; @@ -39,7 +40,13 @@ public abstract class ExtendedObjectDecoder implements ChannelUpstreamHandler { if (originalMessage == decodedMessage) { ctx.sendUpstream(evt); } else if (decodedMessage != null) { - fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); + if (decodedMessage instanceof Collection) { + for (Object o : (Collection) e.getMessage()) { + fireMessageReceived(ctx, o, e.getRemoteAddress()); + } + } else { + fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); + } } } diff --git a/src/org/traccar/FilterHandler.java b/src/org/traccar/FilterHandler.java index e5a06579c..a5be11463 100644 --- a/src/org/traccar/FilterHandler.java +++ b/src/org/traccar/FilterHandler.java @@ -15,16 +15,12 @@ */ package org.traccar; -import java.net.SocketAddress; -import java.util.Iterator; -import java.util.List; import java.util.Properties; -import org.jboss.netty.channel.Channel; import org.traccar.helper.DistanceCalculator; import org.traccar.helper.Log; import org.traccar.model.Position; -public class FilterHandler extends ExtendedObjectDecoder { +public class FilterHandler extends BaseDataHandler { private boolean filterInvalid; private boolean filterZero; @@ -144,24 +140,8 @@ public class FilterHandler extends ExtendedObjectDecoder { } @Override - protected Object decode( - Channel channel, SocketAddress remoteAddress, Object msg) - throws Exception { - - if (msg instanceof Position) { - if (filter((Position) msg)) { - return null; - } - } else if (msg instanceof List) { - Iterator i = ((List) msg).iterator(); - while (i.hasNext()) { - if (filter(i.next())) { - i.remove(); - } - } - } - - return msg; + protected Position handlePosition(Position position) { + return filter(position) ? null : position; } } diff --git a/src/org/traccar/MainEventHandler.java b/src/org/traccar/MainEventHandler.java new file mode 100644 index 000000000..30ec75f58 --- /dev/null +++ b/src/org/traccar/MainEventHandler.java @@ -0,0 +1,86 @@ +/* + * Copyright 2012 - 2015 Anton Tananaev (anton.tananaev@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler; +import org.jboss.netty.handler.timeout.IdleStateEvent; +import org.traccar.helper.Log; +import org.traccar.model.Position; + +public class MainEventHandler extends IdleStateAwareChannelHandler { + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + + if (e.getMessage() != null) { + + if (e.getMessage() instanceof Position) { + + Position position = (Position) e.getMessage(); + + // Log position + StringBuilder s = new StringBuilder(); + s.append("device: ").append(position.getDeviceId()).append(", "); + s.append("time: ").append(position.getFixTime()).append(", "); + s.append("lat: ").append(position.getLatitude()).append(", "); + s.append("lon: ").append(position.getLongitude()); + Log.info(s.toString()); + + try { + Context.getDataManager().addPosition(position); + Context.getDataManager().updateLatestPosition(position); + Context.getConnectionManager().update(position); + } catch (Exception error) { + Log.warning(error); + } + + } + } + } + + private static String formatChannel(Channel channel) { + return String.format("[%08X]", channel.getId()); + } + + @Override + public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { + Log.info(formatChannel(e.getChannel()) + " connected"); + } + + @Override + public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) { + Log.info(formatChannel(e.getChannel()) + " disconnected"); + e.getChannel().close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + Log.info(formatChannel(e.getChannel()) + " error caught"); + e.getChannel().close(); + } + + @Override + public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { + Log.info(formatChannel(e.getChannel()) + " timed out"); + e.getChannel().close(); + } + +} diff --git a/src/org/traccar/RemoteAddressHandler.java b/src/org/traccar/RemoteAddressHandler.java index d9522020d..a26a1aff2 100644 --- a/src/org/traccar/RemoteAddressHandler.java +++ b/src/org/traccar/RemoteAddressHandler.java @@ -17,7 +17,6 @@ package org.traccar; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.List; import org.jboss.netty.channel.Channel; import org.traccar.model.Event; import org.traccar.model.Position; @@ -34,11 +33,6 @@ public class RemoteAddressHandler extends ExtendedObjectDecoder { if (msg instanceof Position) { Position position = (Position) msg; position.set(Event.KEY_IP, hostAddress); - } else if (msg instanceof List) { - List positions = (List) msg; - for (Position position : positions) { - position.set(Event.KEY_IP, hostAddress); - } } return msg; diff --git a/src/org/traccar/ReverseGeocoderHandler.java b/src/org/traccar/ReverseGeocoderHandler.java index aaa491ae7..bb4f5cedd 100644 --- a/src/org/traccar/ReverseGeocoderHandler.java +++ b/src/org/traccar/ReverseGeocoderHandler.java @@ -15,14 +15,11 @@ */ package org.traccar; -import java.net.SocketAddress; -import java.util.List; -import org.jboss.netty.channel.Channel; import org.traccar.geocode.AddressFormat; import org.traccar.geocode.ReverseGeocoder; import org.traccar.model.Position; -public class ReverseGeocoderHandler extends ExtendedObjectDecoder { +public class ReverseGeocoderHandler extends BaseDataHandler { private final ReverseGeocoder geocoder; private final boolean processInvalidPositions; @@ -35,30 +32,12 @@ public class ReverseGeocoderHandler extends ExtendedObjectDecoder { } @Override - protected Object decode( - Channel channel, SocketAddress remoteAddress, Object msg) - throws Exception { - - if (geocoder != null) { - if (msg instanceof Position) { - Position position = (Position) msg; - - if (processInvalidPositions || position.getValid()) { - position.setAddress(geocoder.getAddress( - addressFormat, position.getLatitude(), position.getLongitude())); - } - } else if (msg instanceof List) { - List positions = (List) msg; - for (Position position : positions) { - if (processInvalidPositions || position.getValid()) { - position.setAddress(geocoder.getAddress( - addressFormat, position.getLatitude(), position.getLongitude())); - } - } - } + protected Position handlePosition(Position position) { + if (geocoder != null && (processInvalidPositions || position.getValid())) { + position.setAddress(geocoder.getAddress( + addressFormat, position.getLatitude(), position.getLongitude())); } - - return msg; + return position; } } diff --git a/src/org/traccar/TrackerEventHandler.java b/src/org/traccar/TrackerEventHandler.java deleted file mode 100644 index 31e6fb5cb..000000000 --- a/src/org/traccar/TrackerEventHandler.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2012 - 2015 Anton Tananaev (anton.tananaev@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.traccar; - -import java.util.List; -import org.jboss.netty.channel.ChannelHandler; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler; -import org.jboss.netty.handler.timeout.IdleStateEvent; -import org.traccar.helper.Log; -import org.traccar.model.Position; - -@ChannelHandler.Sharable -public class TrackerEventHandler extends IdleStateAwareChannelHandler { - - private void processSinglePosition(Position position) { - if (position == null) { - Log.info("processSinglePosition null message"); - } else { - StringBuilder s = new StringBuilder(); - s.append("device: ").append(position.getDeviceId()).append(", "); - s.append("time: ").append(position.getFixTime()).append(", "); - s.append("lat: ").append(position.getLatitude()).append(", "); - s.append("lon: ").append(position.getLongitude()); - Log.info(s.toString()); - } - - try { - Context.getDataManager().addPosition(position); - } catch (Exception error) { - Log.warning(error); - } - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - Long id = null; - Position lastPostition = null; - if (e.getMessage() instanceof Position) { - processSinglePosition((Position) e.getMessage()); - lastPostition = (Position) e.getMessage(); - } else if (e.getMessage() instanceof List) { - List positions = (List) e.getMessage(); - for (Position position : positions) { - processSinglePosition(position); - lastPostition = position; - } - } - if (lastPostition != null) { - try { - Context.getDataManager().updateLatestPosition(lastPostition); - Context.getConnectionManager().update(lastPostition); - } catch (Exception error) { - Log.warning(error); - } - } - } - - @Override - public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) { - Log.info("Closing connection by disconnect"); - e.getChannel().close(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - Log.info("Closing connection by exception"); - e.getChannel().close(); - } - - @Override - public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { - Log.info("Closing connection by timeout"); - e.getChannel().close(); - } - -} -- cgit v1.2.3