aboutsummaryrefslogtreecommitdiff
path: root/src/org/traccar/BasePipelineFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r--src/org/traccar/BasePipelineFactory.java138
1 files changed, 13 insertions, 125 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index 022eeeffa..7f617c470 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -15,23 +15,17 @@
*/
package org.traccar;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.DatagramChannel;
-import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.config.Keys;
+import org.traccar.events.AlertEventHandler;
import org.traccar.events.CommandResultEventHandler;
import org.traccar.events.DriverEventHandler;
import org.traccar.events.FuelDropEventHandler;
@@ -40,13 +34,13 @@ import org.traccar.events.IgnitionEventHandler;
import org.traccar.events.MaintenanceEventHandler;
import org.traccar.events.MotionEventHandler;
import org.traccar.events.OverspeedEventHandler;
-import org.traccar.events.AlertEventHandler;
-import org.traccar.processing.ComputedAttributesHandler;
-import org.traccar.processing.CopyAttributesHandler;
-import org.traccar.processing.FilterHandler;
+import org.traccar.handler.ComputedAttributesHandler;
+import org.traccar.handler.CopyAttributesHandler;
+import org.traccar.handler.FilterHandler;
+import org.traccar.handler.NetworkMessageHandler;
+import org.traccar.handler.OpenChannelHandler;
+import org.traccar.handler.StandardLoggingHandler;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.Map;
public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
@@ -76,119 +70,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
private MaintenanceEventHandler maintenanceEventHandler;
private DriverEventHandler driverEventHandler;
- private static final class OpenChannelHandler extends ChannelDuplexHandler {
-
- private final TrackerServer server;
-
- private OpenChannelHandler(TrackerServer server) {
- this.server = server;
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- server.getChannelGroup().add(ctx.channel());
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- super.channelInactive(ctx);
- server.getChannelGroup().remove(ctx.channel());
- }
-
- }
-
- private static class NetworkMessageHandler extends ChannelDuplexHandler {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- if (ctx.channel() instanceof DatagramChannel) {
- DatagramPacket packet = (DatagramPacket) msg;
- ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender()));
- } else if (msg instanceof ByteBuf) {
- ByteBuf buffer = (ByteBuf) msg;
- ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress()));
- }
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- if (msg instanceof NetworkMessage) {
- NetworkMessage message = (NetworkMessage) msg;
- if (ctx.channel() instanceof DatagramChannel) {
- InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
- InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
- ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
- } else {
- ctx.write(message.getMessage(), promise);
- }
- } else {
- ctx.write(msg, promise);
- }
- }
-
- }
-
- private static class StandardLoggingHandler extends ChannelDuplexHandler {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- log(ctx, false, msg);
- super.channelRead(ctx, msg);
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- log(ctx, true, msg);
- super.write(ctx, msg, promise);
- }
-
- public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
- if (o instanceof NetworkMessage) {
- NetworkMessage networkMessage = (NetworkMessage) o;
- if (networkMessage.getMessage() instanceof ByteBuf) {
- log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage());
- }
- } else if (o instanceof ByteBuf) {
- log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o);
- }
- }
-
- public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) {
- StringBuilder message = new StringBuilder();
-
- message.append("[").append(ctx.channel().id().asShortText()).append(": ");
- message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort());
- if (downstream) {
- message.append(" > ");
- } else {
- message.append(" < ");
- }
-
- if (remoteAddress instanceof InetSocketAddress) {
- message.append(((InetSocketAddress) remoteAddress).getHostString());
- } else {
- message.append("unknown");
- }
- message.append("]");
-
- message.append(" HEX: ");
- message.append(ByteBufUtil.hexDump(buf));
-
- LOGGER.info(message.toString());
- }
-
- }
-
public BasePipelineFactory(TrackerServer server, String protocol) {
this.server = server;
- timeout = Context.getConfig().getInteger(protocol + ".timeout");
+ timeout = Context.getConfig().getInteger(Keys.PROTOCOL_TIMEOUT.withPrefix(protocol));
if (timeout == 0) {
- timeout = Context.getConfig().getInteger(protocol + ".resetDelay"); // temporary
- if (timeout == 0) {
- timeout = Context.getConfig().getInteger("server.timeout");
- }
+ timeout = Context.getConfig().getInteger(Keys.SERVER_TIMEOUT);
}
distanceHandler = new DistanceHandler(
@@ -196,7 +83,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
Context.getConfig().getInteger("coordinates.minError"),
Context.getConfig().getInteger("coordinates.maxError"));
- if (Context.getConfig().getBoolean("processing.remoteAddress.enable")) {
+ if (Context.getConfig().getBoolean("handler.remoteAddress.enable")) {
remoteAddressHandler = new RemoteAddressHandler();
}
@@ -223,7 +110,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
hemisphereHandler = new HemisphereHandler();
}
- if (Context.getConfig().getBoolean("processing.copyAttributes.enable")) {
+ if (Context.getConfig().getBoolean("handler.copyAttributes.enable")) {
copyAttributesHandler = new CopyAttributesHandler();
}
@@ -270,8 +157,9 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
}
@Override
- protected void initChannel(Channel channel) throws Exception {
+ protected void initChannel(Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
+
if (timeout > 0 && !server.isDatagram()) {
pipeline.addLast(new IdleStateHandler(timeout, 0, 0));
}