diff options
-rw-r--r-- | src/org/traccar/AdvancedStringEncoder.java | 37 | ||||
-rw-r--r-- | src/org/traccar/BasePipelineFactory.java | 20 | ||||
-rw-r--r-- | src/org/traccar/BaseProtocol.java | 2 | ||||
-rw-r--r-- | src/org/traccar/BaseProtocolDecoder.java | 2 | ||||
-rw-r--r-- | src/org/traccar/PipelineBuilder.java (renamed from src/org/traccar/AdvancedStringDecoder.java) | 16 | ||||
-rw-r--r-- | src/org/traccar/Protocol.java | 15 | ||||
-rw-r--r-- | src/org/traccar/TrackerServer.java | 4 | ||||
-rw-r--r-- | src/org/traccar/WrapperContext.java | 253 | ||||
-rw-r--r-- | src/org/traccar/WrapperInboundHandler.java | 89 | ||||
-rw-r--r-- | src/org/traccar/WrapperOutboundHandler.java | 92 | ||||
-rw-r--r-- | src/org/traccar/protocol/Gps103Protocol.java | 22 |
11 files changed, 485 insertions, 67 deletions
diff --git a/src/org/traccar/AdvancedStringEncoder.java b/src/org/traccar/AdvancedStringEncoder.java deleted file mode 100644 index 7104b2154..000000000 --- a/src/org/traccar/AdvancedStringEncoder.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2018 Anton Tananaev (anton@traccar.org) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.traccar; - -import io.netty.buffer.ByteBufUtil; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; - -import java.nio.CharBuffer; -import java.nio.charset.StandardCharsets; - -public class AdvancedStringEncoder extends ChannelOutboundHandlerAdapter { - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - NetworkMessage networkMessage = (NetworkMessage) msg; - String string = (String) networkMessage.getMessage(); - ctx.write(new NetworkMessage( - ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(string), StandardCharsets.US_ASCII), - networkMessage.getRemoteAddress()), promise); - } - -} diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 2cf4333f5..6269fb8cc 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -21,7 +21,9 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.socket.DatagramChannel; @@ -226,11 +228,11 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { } } - protected abstract void addProtocolHandlers(ChannelPipeline pipeline); + protected abstract void addProtocolHandlers(PipelineBuilder pipeline); @Override protected void initChannel(Channel channel) throws Exception { - ChannelPipeline pipeline = channel.pipeline(); + final ChannelPipeline pipeline = channel.pipeline(); if (timeout > 0 && !server.isDatagram()) { pipeline.addLast("idleHandler", new IdleStateHandler(timeout, 0, 0)); } @@ -240,7 +242,19 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { pipeline.addLast("logger", new StandardLoggingHandler()); } - addProtocolHandlers(pipeline); + addProtocolHandlers(new PipelineBuilder() { + @Override + public void addLast(String name, ChannelHandler handler) { + if (!(handler instanceof BaseProtocolDecoder || handler instanceof BaseProtocolEncoder)) { + if (handler instanceof ChannelInboundHandler) { + handler = new WrapperInboundHandler((ChannelInboundHandler) handler); + } else { + handler = new WrapperOutboundHandler((ChannelOutboundHandler) handler); + } + } + pipeline.addLast(name, handler); + } + }); if (geolocationHandler != null) { pipeline.addLast("location", geolocationHandler); diff --git a/src/org/traccar/BaseProtocol.java b/src/org/traccar/BaseProtocol.java index 8dbec362f..c33270fc6 100644 --- a/src/org/traccar/BaseProtocol.java +++ b/src/org/traccar/BaseProtocol.java @@ -76,7 +76,7 @@ public abstract class BaseProtocol implements Protocol { activeDevice.write(command); } else if (command.getType().equals(Command.TYPE_CUSTOM)) { String data = command.getString(Command.KEY_DATA); - if (activeDevice.getChannel().pipeline().get(StringEncoder.class) != null) { + if (activeDevice.getChannel().pipeline().get(StringEncoder.class) != null) { // TODO change activeDevice.write(data); } else { activeDevice.write(Unpooled.wrappedBuffer(DataConverter.parseHex(data))); diff --git a/src/org/traccar/BaseProtocolDecoder.java b/src/org/traccar/BaseProtocolDecoder.java index 0750e1096..9c0b51c79 100644 --- a/src/org/traccar/BaseProtocolDecoder.java +++ b/src/org/traccar/BaseProtocolDecoder.java @@ -145,7 +145,7 @@ public abstract class BaseProtocolDecoder extends ExtendedObjectDecoder { } public DeviceSession getDeviceSession(Channel channel, SocketAddress remoteAddress, String... uniqueIds) { - if (channel != null && channel.pipeline().get(HttpRequestDecoder.class) != null + if (channel != null && channel.pipeline().get(HttpRequestDecoder.class) != null // TODO change || Context.getConfig().getBoolean("decoder.ignoreSessionCache")) { long deviceId = findDeviceId(remoteAddress, uniqueIds); if (deviceId != 0) { diff --git a/src/org/traccar/AdvancedStringDecoder.java b/src/org/traccar/PipelineBuilder.java index d7cb6e7b2..20c40f159 100644 --- a/src/org/traccar/AdvancedStringDecoder.java +++ b/src/org/traccar/PipelineBuilder.java @@ -15,20 +15,10 @@ */ package org.traccar; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelHandler; -import java.nio.charset.StandardCharsets; +public interface PipelineBuilder { -public class AdvancedStringDecoder extends ChannelInboundHandlerAdapter { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - NetworkMessage networkMessage = (NetworkMessage) msg; - ByteBuf buf = (ByteBuf) networkMessage.getMessage(); - ctx.fireChannelRead(new NetworkMessage( - buf.toString(StandardCharsets.US_ASCII), networkMessage.getRemoteAddress())); - } + void addLast(String name, ChannelHandler handler); } diff --git a/src/org/traccar/Protocol.java b/src/org/traccar/Protocol.java index 87ac05298..c7aa67f9d 100644 --- a/src/org/traccar/Protocol.java +++ b/src/org/traccar/Protocol.java @@ -1,3 +1,18 @@ +/* + * Copyright 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.traccar; import org.traccar.database.ActiveDevice; diff --git a/src/org/traccar/TrackerServer.java b/src/org/traccar/TrackerServer.java index 7081d455d..fb643c958 100644 --- a/src/org/traccar/TrackerServer.java +++ b/src/org/traccar/TrackerServer.java @@ -45,7 +45,7 @@ public abstract class TrackerServer { BasePipelineFactory pipelineFactory = new BasePipelineFactory(this, protocol) { @Override - protected void addProtocolHandlers(ChannelPipeline pipeline) { + protected void addProtocolHandlers(PipelineBuilder pipeline) { TrackerServer.this.addProtocolHandlers(pipeline); } }; @@ -67,7 +67,7 @@ public abstract class TrackerServer { } } - protected abstract void addProtocolHandlers(ChannelPipeline pipeline); + protected abstract void addProtocolHandlers(PipelineBuilder pipeline); private int port; diff --git a/src/org/traccar/WrapperContext.java b/src/org/traccar/WrapperContext.java new file mode 100644 index 000000000..e1c30cdc1 --- /dev/null +++ b/src/org/traccar/WrapperContext.java @@ -0,0 +1,253 @@ +/* + * Copyright 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.ChannelPromise; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.EventExecutor; + +import java.net.SocketAddress; + +public class WrapperContext implements ChannelHandlerContext { + + private ChannelHandlerContext context; + private SocketAddress remoteAddress; + + public WrapperContext(ChannelHandlerContext context, SocketAddress remoteAddress) { + this.context = context; + this.remoteAddress = remoteAddress; + } + + @Override + public Channel channel() { + return context.channel(); + } + + @Override + public EventExecutor executor() { + return context.executor(); + } + + @Override + public String name() { + return context.name(); + } + + @Override + public ChannelHandler handler() { + return context.handler(); + } + + @Override + public boolean isRemoved() { + return context.isRemoved(); + } + + @Override + public ChannelHandlerContext fireChannelRegistered() { + return context.fireChannelRegistered(); + } + + @Override + public ChannelHandlerContext fireChannelUnregistered() { + return context.fireChannelUnregistered(); + } + + @Override + public ChannelHandlerContext fireChannelActive() { + return context.fireChannelActive(); + } + + @Override + public ChannelHandlerContext fireChannelInactive() { + return context.fireChannelInactive(); + } + + @Override + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + return context.fireExceptionCaught(cause); + } + + @Override + public ChannelHandlerContext fireUserEventTriggered(Object evt) { + return context.fireUserEventTriggered(evt); + } + + @Override + public ChannelHandlerContext fireChannelRead(Object msg) { + if (!(msg instanceof NetworkMessage)) { + msg = new NetworkMessage(msg, remoteAddress); + } + return context.fireChannelRead(msg); + } + + @Override + public ChannelHandlerContext fireChannelReadComplete() { + return context.fireChannelReadComplete(); + } + + @Override + public ChannelHandlerContext fireChannelWritabilityChanged() { + return context.fireChannelWritabilityChanged(); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return context.bind(localAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return context.connect(remoteAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return context.connect(remoteAddress, localAddress); + } + + @Override + public ChannelFuture disconnect() { + return context.disconnect(); + } + + @Override + public ChannelFuture close() { + return context.close(); + } + + @Override + public ChannelFuture deregister() { + return context.deregister(); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + return context.bind(localAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + return context.connect(remoteAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + return context.connect(remoteAddress, localAddress, promise); + } + + @Override + public ChannelFuture disconnect(ChannelPromise promise) { + return context.disconnect(promise); + } + + @Override + public ChannelFuture close(ChannelPromise promise) { + return context.close(promise); + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return context.deregister(promise); + } + + @Override + public ChannelHandlerContext read() { + return context.read(); + } + + @Override + public ChannelFuture write(Object msg) { + return context.write(msg); + } + + @Override + public ChannelFuture write(Object msg, ChannelPromise promise) { + if (!(msg instanceof NetworkMessage)) { + msg = new NetworkMessage(msg, remoteAddress); + } + return context.write(msg, promise); + } + + @Override + public ChannelHandlerContext flush() { + return context.flush(); + } + + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return context.writeAndFlush(msg, promise); + } + + @Override + public ChannelFuture writeAndFlush(Object msg) { + return context.writeAndFlush(msg); + } + + @Override + public ChannelPromise newPromise() { + return context.newPromise(); + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return context.newProgressivePromise(); + } + + @Override + public ChannelFuture newSucceededFuture() { + return context.newSucceededFuture(); + } + + @Override + public ChannelFuture newFailedFuture(Throwable cause) { + return context.newFailedFuture(cause); + } + + @Override + public ChannelPromise voidPromise() { + return context.voidPromise(); + } + + @Override + public ChannelPipeline pipeline() { + return context.pipeline(); + } + + @Override + public ByteBufAllocator alloc() { + return context.alloc(); + } + + @Override + public <T> Attribute<T> attr(AttributeKey<T> key) { + return context.attr(key); + } + + @Override + public <T> boolean hasAttr(AttributeKey<T> key) { + return context.hasAttr(key); + } + +} diff --git a/src/org/traccar/WrapperInboundHandler.java b/src/org/traccar/WrapperInboundHandler.java new file mode 100644 index 000000000..3cef72060 --- /dev/null +++ b/src/org/traccar/WrapperInboundHandler.java @@ -0,0 +1,89 @@ +/* + * Copyright 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; + +public class WrapperInboundHandler implements ChannelInboundHandler { + + private ChannelInboundHandler handler; + + public WrapperInboundHandler(ChannelInboundHandler handler) { + this.handler = handler; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + handler.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + handler.channelUnregistered(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + handler.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + handler.channelInactive(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof NetworkMessage) { + NetworkMessage nm = (NetworkMessage) msg; + handler.channelRead(new WrapperContext(ctx, nm.getRemoteAddress()), nm.getMessage()); + } else { + handler.channelRead(ctx, msg); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + handler.channelReadComplete(ctx); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + handler.userEventTriggered(ctx, evt); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + handler.channelWritabilityChanged(ctx); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + handler.handlerAdded(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + handler.handlerRemoved(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + handler.exceptionCaught(ctx, cause); + } + +} diff --git a/src/org/traccar/WrapperOutboundHandler.java b/src/org/traccar/WrapperOutboundHandler.java new file mode 100644 index 000000000..ad342fa6c --- /dev/null +++ b/src/org/traccar/WrapperOutboundHandler.java @@ -0,0 +1,92 @@ +/* + * Copyright 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandler; +import io.netty.channel.ChannelPromise; + +import java.net.SocketAddress; + +public class WrapperOutboundHandler implements ChannelOutboundHandler { + + private ChannelOutboundHandler handler; + + public WrapperOutboundHandler(ChannelOutboundHandler handler) { + this.handler = handler; + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { + handler.bind(ctx, localAddress, promise); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { + handler.connect(ctx, remoteAddress, localAddress, promise); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + handler.disconnect(ctx, promise); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + handler.close(ctx, promise); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + handler.deregister(ctx, promise); + } + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + handler.read(ctx); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof NetworkMessage) { + NetworkMessage nm = (NetworkMessage) msg; + handler.write(new WrapperContext(ctx, nm.getRemoteAddress()), nm.getMessage(), promise); + } else { + handler.write(ctx, msg, promise); + } + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + handler.flush(ctx); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + handler.handlerAdded(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + handler.handlerRemoved(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + handler.exceptionCaught(ctx, cause); + } + +} diff --git a/src/org/traccar/protocol/Gps103Protocol.java b/src/org/traccar/protocol/Gps103Protocol.java index cdf6e10a9..b49415ddf 100644 --- a/src/org/traccar/protocol/Gps103Protocol.java +++ b/src/org/traccar/protocol/Gps103Protocol.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 Anton Tananaev (anton@traccar.org) + * Copyright 2015 - 2018 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,12 +15,14 @@ */ package org.traccar.protocol; -import io.netty.channel.ChannelPipeline; -import org.traccar.AdvancedStringDecoder; -import org.traccar.AdvancedStringEncoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; import org.traccar.BaseProtocol; import org.traccar.CharacterDelimiterFrameDecoder; +import org.traccar.PipelineBuilder; import org.traccar.TrackerServer; +import org.traccar.WrapperInboundHandler; +import org.traccar.WrapperOutboundHandler; import org.traccar.model.Command; import java.util.List; @@ -45,19 +47,19 @@ public class Gps103Protocol extends BaseProtocol { public void initTrackerServers(List<TrackerServer> serverList) { serverList.add(new TrackerServer(false, getName()) { @Override - protected void addProtocolHandlers(ChannelPipeline pipeline) { + protected void addProtocolHandlers(PipelineBuilder pipeline) { pipeline.addLast("frameDecoder", new CharacterDelimiterFrameDecoder(2048, "\r\n", "\n", ";")); - pipeline.addLast("stringEncoder", new AdvancedStringEncoder()); - pipeline.addLast("stringDecoder", new AdvancedStringDecoder()); + pipeline.addLast("stringEncoder", new WrapperOutboundHandler(new StringEncoder())); + pipeline.addLast("stringDecoder", new WrapperInboundHandler(new StringDecoder())); pipeline.addLast("objectEncoder", new Gps103ProtocolEncoder()); pipeline.addLast("objectDecoder", new Gps103ProtocolDecoder(Gps103Protocol.this)); } }); serverList.add(new TrackerServer(true, getName()) { @Override - protected void addProtocolHandlers(ChannelPipeline pipeline) { - pipeline.addLast("stringEncoder", new AdvancedStringEncoder()); - pipeline.addLast("stringDecoder", new AdvancedStringDecoder()); + protected void addProtocolHandlers(PipelineBuilder pipeline) { + pipeline.addLast("stringEncoder", new StringEncoder()); + pipeline.addLast("stringDecoder", new StringDecoder()); pipeline.addLast("objectEncoder", new Gps103ProtocolEncoder()); pipeline.addLast("objectDecoder", new Gps103ProtocolDecoder(Gps103Protocol.this)); } |