aboutsummaryrefslogtreecommitdiff
path: root/src/org/traccar/BasePipelineFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r--src/org/traccar/BasePipelineFactory.java102
1 files changed, 54 insertions, 48 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index b0de67a15..0b076a834 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012 - 2017 Anton Tananaev (anton@traccar.org)
+ * Copyright 2012 - 2018 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,20 +15,16 @@
*/
package org.traccar;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DownstreamMessageEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.logging.LoggingHandler;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.IdleStateHandler;
import org.traccar.events.CommandResultEventHandler;
import org.traccar.events.DriverEventHandler;
import org.traccar.events.FuelDropEventHandler;
@@ -44,7 +40,7 @@ import org.traccar.processing.CopyAttributesHandler;
import java.net.InetSocketAddress;
-public abstract class BasePipelineFactory implements ChannelPipelineFactory {
+public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
private final TrackerServer server;
private int timeout;
@@ -70,7 +66,7 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
private MaintenanceEventHandler maintenanceEventHandler;
private DriverEventHandler driverEventHandler;
- private static final class OpenChannelHandler extends SimpleChannelHandler {
+ private static final class OpenChannelHandler extends ChannelDuplexHandler {
private final TrackerServer server;
@@ -79,41 +75,51 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
- server.getChannelGroup().add(e.getChannel());
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ super.channelActive(ctx);
+ server.getChannelGroup().add(ctx.channel());
}
+
}
- private static class StandardLoggingHandler extends LoggingHandler {
+ private static class StandardLoggingHandler extends ChannelDuplexHandler {
@Override
- public void log(ChannelEvent e) {
- if (e instanceof MessageEvent) {
- MessageEvent event = (MessageEvent) e;
- StringBuilder msg = new StringBuilder();
-
- msg.append("[").append(String.format("%08X", e.getChannel().getId())).append(": ");
- msg.append(((InetSocketAddress) e.getChannel().getLocalAddress()).getPort());
- if (e instanceof DownstreamMessageEvent) {
- msg.append(" > ");
- } else {
- msg.append(" < ");
- }
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ log(ctx, false, msg);
+ super.channelRead(ctx, msg);
+ }
- if (event.getRemoteAddress() != null) {
- msg.append(((InetSocketAddress) event.getRemoteAddress()).getHostString());
- } else {
- msg.append("null");
- }
- msg.append("]");
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ log(ctx, true, msg);
+ super.write(ctx, msg, promise);
+ }
- if (event.getMessage() instanceof ChannelBuffer) {
- msg.append(" HEX: ");
- msg.append(ChannelBuffers.hexDump((ChannelBuffer) event.getMessage()));
- }
+ public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
+ StringBuilder message = new StringBuilder();
- Log.debug(msg.toString());
+ message.append("[").append(ctx.channel().id().asShortText()).append(": ");
+ message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort());
+ if (downstream) {
+ message.append(" > ");
+ } else {
+ message.append(" < ");
}
+
+ if (ctx.channel().remoteAddress() != null) {
+ message.append(((InetSocketAddress) ctx.channel().remoteAddress()).getHostString());
+ } else {
+ message.append("null");
+ }
+ message.append("]");
+
+ if (o instanceof ByteBuf) {
+ message.append(" HEX: ");
+ message.append(ByteBufUtil.hexDump((ByteBuf) o));
+ }
+
+ Log.debug(message.toString());
}
}
@@ -189,10 +195,10 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
protected abstract void addSpecificHandlers(ChannelPipeline pipeline);
@Override
- public ChannelPipeline getPipeline() {
- ChannelPipeline pipeline = Channels.pipeline();
- if (timeout > 0 && !server.isConnectionless()) {
- pipeline.addLast("idleHandler", new IdleStateHandler(GlobalTimer.getTimer(), timeout, 0, 0));
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ if (timeout > 0 && !server.isDatagram()) {
+ pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0));
}
pipeline.addLast("openHandler", new OpenChannelHandler(server));
if (Context.isLoggerEnabled()) {
@@ -288,7 +294,6 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
}
pipeline.addLast("mainHandler", new MainEventHandler());
- return pipeline;
}
private void addDynamicHandlers(ChannelPipeline pipeline) {
@@ -303,4 +308,5 @@ public abstract class BasePipelineFactory implements ChannelPipelineFactory {
}
}
}
+
}