aboutsummaryrefslogtreecommitdiff
path: root/src/org/traccar/BasePipelineFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r--src/org/traccar/BasePipelineFactory.java164
1 files changed, 114 insertions, 50 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index 5a077da7c..6269fb8cc 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012 - 2017 Anton Tananaev (anton@traccar.org)
+ * Copyright 2012 - 2018 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,20 +15,20 @@
*/
package org.traccar;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DownstreamMessageEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.logging.LoggingHandler;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOutboundHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.handler.timeout.IdleStateHandler;
import org.traccar.events.CommandResultEventHandler;
import org.traccar.events.DriverEventHandler;
import org.traccar.events.FuelDropEventHandler;
@@ -44,13 +44,14 @@ import org.traccar.processing.CopyAttributesHandler;
import java.net.InetSocketAddress;
-public abstract class BasePipelineFactory implements ChannelPipelineFactory {
+public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
private final TrackerServer server;
private int timeout;
private FilterHandler filterHandler;
private DistanceHandler distanceHandler;
+ private EngineHoursHandler engineHoursHandler;
private RemoteAddressHandler remoteAddressHandler;
private MotionHandler motionHandler;
private GeocoderHandler geocoderHandler;
@@ -69,7 +70,7 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
private MaintenanceEventHandler maintenanceEventHandler;
private DriverEventHandler driverEventHandler;
- private static final class OpenChannelHandler extends SimpleChannelHandler {
+ private static final class OpenChannelHandler extends ChannelDuplexHandler {
private final TrackerServer server;
@@ -78,41 +79,83 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
- server.getChannelGroup().add(e.getChannel());
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ super.channelActive(ctx);
+ server.getChannelGroup().add(ctx.channel());
}
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ server.getChannelGroup().remove(ctx.channel());
+ }
+
}
- private static class StandardLoggingHandler extends LoggingHandler {
+ private static class NetworkMessageHandler extends ChannelDuplexHandler {
@Override
- public void log(ChannelEvent e) {
- if (e instanceof MessageEvent) {
- MessageEvent event = (MessageEvent) e;
- StringBuilder msg = new StringBuilder();
-
- msg.append("[").append(String.format("%08X", e.getChannel().getId())).append(": ");
- msg.append(((InetSocketAddress) e.getChannel().getLocalAddress()).getPort());
- if (e instanceof DownstreamMessageEvent) {
- msg.append(" > ");
- } else {
- msg.append(" < ");
- }
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (ctx.channel() instanceof DatagramChannel) {
+ DatagramPacket packet = (DatagramPacket) msg;
+ ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender()));
+ } else {
+ ByteBuf buffer = (ByteBuf) msg;
+ ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress()));
+ }
+ }
- if (event.getRemoteAddress() != null) {
- msg.append(((InetSocketAddress) event.getRemoteAddress()).getHostString());
- } else {
- msg.append("null");
- }
- msg.append("]");
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ NetworkMessage message = (NetworkMessage) msg;
+ if (ctx.channel() instanceof DatagramChannel) {
+ InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
+ InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
+ ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
+ } else {
+ ctx.write(message.getMessage(), promise);
+ }
+ }
- if (event.getMessage() instanceof ChannelBuffer) {
- msg.append(" HEX: ");
- msg.append(ChannelBuffers.hexDump((ChannelBuffer) event.getMessage()));
- }
+ }
+
+ private static class StandardLoggingHandler extends ChannelDuplexHandler {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ log(ctx, false, msg);
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ log(ctx, true, msg);
+ super.write(ctx, msg, promise);
+ }
- Log.debug(msg.toString());
+ public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
+ NetworkMessage networkMessage = (NetworkMessage) o;
+ StringBuilder message = new StringBuilder();
+
+ message.append("[").append(ctx.channel().id().asShortText()).append(": ");
+ message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort());
+ if (downstream) {
+ message.append(" > ");
+ } else {
+ message.append(" < ");
+ }
+
+ if (networkMessage.getRemoteAddress() != null) {
+ message.append(((InetSocketAddress) networkMessage.getRemoteAddress()).getHostString());
+ } else {
+ message.append("null");
}
+ message.append("]");
+
+ message.append(" HEX: ");
+ message.append(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage()));
+
+ Log.debug(message.toString());
}
}
@@ -155,6 +198,10 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
motionHandler = new MotionHandler(Context.getTripsConfig().getSpeedThreshold());
+ if (Context.getConfig().getBoolean("processing.engineHours.enable")) {
+ engineHoursHandler = new EngineHoursHandler();
+ }
+
if (Context.getConfig().hasKey("location.latitudeHemisphere")
|| Context.getConfig().hasKey("location.longitudeHemisphere")) {
hemisphereHandler = new HemisphereHandler();
@@ -181,20 +228,33 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
}
}
- protected abstract void addSpecificHandlers(ChannelPipeline pipeline);
+ protected abstract void addProtocolHandlers(PipelineBuilder pipeline);
@Override
- public ChannelPipeline getPipeline() {
- ChannelPipeline pipeline = Channels.pipeline();
- if (timeout > 0 && !server.isConnectionless()) {
- pipeline.addLast("idleHandler", new IdleStateHandler(GlobalTimer.getTimer(), timeout, 0, 0));
+ protected void initChannel(Channel channel) throws Exception {
+ final ChannelPipeline pipeline = channel.pipeline();
+ if (timeout > 0 && !server.isDatagram()) {
+ pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0));
}
pipeline.addLast("openHandler", new OpenChannelHandler(server));
+ pipeline.addLast("messageHandler", new NetworkMessageHandler());
if (Context.isLoggerEnabled()) {
pipeline.addLast("logger", new StandardLoggingHandler());
}
- addSpecificHandlers(pipeline);
+ addProtocolHandlers(new PipelineBuilder() {
+ @Override
+ public void addLast(String name, ChannelHandler handler) {
+ if (!(handler instanceof BaseProtocolDecoder || handler instanceof BaseProtocolEncoder)) {
+ if (handler instanceof ChannelInboundHandler) {
+ handler = new WrapperInboundHandler((ChannelInboundHandler) handler);
+ } else {
+ handler = new WrapperOutboundHandler((ChannelOutboundHandler) handler);
+ }
+ }
+ pipeline.addLast(name, handler);
+ }
+ });
if (geolocationHandler != null) {
pipeline.addLast("location", geolocationHandler);
@@ -225,6 +285,10 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
pipeline.addLast("motion", motionHandler);
}
+ if (engineHoursHandler != null) {
+ pipeline.addLast("engineHours", engineHoursHandler);
+ }
+
if (copyAttributesHandler != null) {
pipeline.addLast("copyAttributes", copyAttributesHandler);
}
@@ -279,7 +343,6 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
}
pipeline.addLast("mainHandler", new MainEventHandler());
- return pipeline;
}
private void addDynamicHandlers(ChannelPipeline pipeline) {
@@ -294,4 +357,5 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
}
}
}
+
}