diff options
13 files changed, 243 insertions, 134 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 022eeeffa..7f617c470 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -15,23 +15,17 @@ */ package org.traccar; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.traccar.config.Keys; +import org.traccar.events.AlertEventHandler; import org.traccar.events.CommandResultEventHandler; import org.traccar.events.DriverEventHandler; import org.traccar.events.FuelDropEventHandler; @@ -40,13 +34,13 @@ import org.traccar.events.IgnitionEventHandler; import org.traccar.events.MaintenanceEventHandler; import org.traccar.events.MotionEventHandler; import org.traccar.events.OverspeedEventHandler; -import org.traccar.events.AlertEventHandler; -import org.traccar.processing.ComputedAttributesHandler; -import org.traccar.processing.CopyAttributesHandler; -import org.traccar.processing.FilterHandler; +import org.traccar.handler.ComputedAttributesHandler; +import org.traccar.handler.CopyAttributesHandler; +import org.traccar.handler.FilterHandler; +import org.traccar.handler.NetworkMessageHandler; +import org.traccar.handler.OpenChannelHandler; +import org.traccar.handler.StandardLoggingHandler; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.Map; public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { @@ -76,119 +70,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { private MaintenanceEventHandler maintenanceEventHandler; private DriverEventHandler driverEventHandler; - private static final class OpenChannelHandler extends ChannelDuplexHandler { - - private final TrackerServer server; - - private OpenChannelHandler(TrackerServer server) { - this.server = server; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - server.getChannelGroup().add(ctx.channel()); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - server.getChannelGroup().remove(ctx.channel()); - } - - } - - private static class NetworkMessageHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (ctx.channel() instanceof DatagramChannel) { - DatagramPacket packet = (DatagramPacket) msg; - ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); - } else if (msg instanceof ByteBuf) { - ByteBuf buffer = (ByteBuf) msg; - ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg instanceof NetworkMessage) { - NetworkMessage message = (NetworkMessage) msg; - if (ctx.channel() instanceof DatagramChannel) { - InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); - InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); - ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); - } else { - ctx.write(message.getMessage(), promise); - } - } else { - ctx.write(msg, promise); - } - } - - } - - private static class StandardLoggingHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log(ctx, false, msg); - super.channelRead(ctx, msg); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - log(ctx, true, msg); - super.write(ctx, msg, promise); - } - - public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { - if (o instanceof NetworkMessage) { - NetworkMessage networkMessage = (NetworkMessage) o; - if (networkMessage.getMessage() instanceof ByteBuf) { - log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); - } - } else if (o instanceof ByteBuf) { - log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); - } - } - - public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { - StringBuilder message = new StringBuilder(); - - message.append("[").append(ctx.channel().id().asShortText()).append(": "); - message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort()); - if (downstream) { - message.append(" > "); - } else { - message.append(" < "); - } - - if (remoteAddress instanceof InetSocketAddress) { - message.append(((InetSocketAddress) remoteAddress).getHostString()); - } else { - message.append("unknown"); - } - message.append("]"); - - message.append(" HEX: "); - message.append(ByteBufUtil.hexDump(buf)); - - LOGGER.info(message.toString()); - } - - } - public BasePipelineFactory(TrackerServer server, String protocol) { this.server = server; - timeout = Context.getConfig().getInteger(protocol + ".timeout"); + timeout = Context.getConfig().getInteger(Keys.PROTOCOL_TIMEOUT.withPrefix(protocol)); if (timeout == 0) { - timeout = Context.getConfig().getInteger(protocol + ".resetDelay"); // temporary - if (timeout == 0) { - timeout = Context.getConfig().getInteger("server.timeout"); - } + timeout = Context.getConfig().getInteger(Keys.SERVER_TIMEOUT); } distanceHandler = new DistanceHandler( @@ -196,7 +83,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { Context.getConfig().getInteger("coordinates.minError"), Context.getConfig().getInteger("coordinates.maxError")); - if (Context.getConfig().getBoolean("processing.remoteAddress.enable")) { + if (Context.getConfig().getBoolean("handler.remoteAddress.enable")) { remoteAddressHandler = new RemoteAddressHandler(); } @@ -223,7 +110,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { hemisphereHandler = new HemisphereHandler(); } - if (Context.getConfig().getBoolean("processing.copyAttributes.enable")) { + if (Context.getConfig().getBoolean("handler.copyAttributes.enable")) { copyAttributesHandler = new CopyAttributesHandler(); } @@ -270,8 +157,9 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> { } @Override - protected void initChannel(Channel channel) throws Exception { + protected void initChannel(Channel channel) { final ChannelPipeline pipeline = channel.pipeline(); + if (timeout > 0 && !server.isDatagram()) { pipeline.addLast(new IdleStateHandler(timeout, 0, 0)); } diff --git a/src/org/traccar/MainModule.java b/src/org/traccar/MainModule.java index 967b9cd1f..30cf25d3d 100644 --- a/src/org/traccar/MainModule.java +++ b/src/org/traccar/MainModule.java @@ -22,7 +22,7 @@ import com.google.inject.Singleton; import org.traccar.config.Config; import org.traccar.config.Keys; import org.traccar.database.IdentityManager; -import org.traccar.processing.FilterHandler; +import org.traccar.handler.FilterHandler; import javax.ws.rs.client.Client; diff --git a/src/org/traccar/api/resource/AttributeResource.java b/src/org/traccar/api/resource/AttributeResource.java index d10ca4a72..fefc84041 100644 --- a/src/org/traccar/api/resource/AttributeResource.java +++ b/src/org/traccar/api/resource/AttributeResource.java @@ -33,7 +33,7 @@ import org.traccar.Context; import org.traccar.api.ExtendedObjectResource; import org.traccar.model.Attribute; import org.traccar.model.Position; -import org.traccar.processing.ComputedAttributesHandler; +import org.traccar.handler.ComputedAttributesHandler; @Path("attributes/computed") @Produces(MediaType.APPLICATION_JSON) diff --git a/src/org/traccar/config/ConfigSuffix.java b/src/org/traccar/config/ConfigSuffix.java new file mode 100644 index 000000000..bfecfe197 --- /dev/null +++ b/src/org/traccar/config/ConfigSuffix.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.config; + +public class ConfigSuffix extends ConfigKey { + + ConfigSuffix(String key, Class clazz, String description) { + super(key, clazz, description); + } + + public ConfigKey withPrefix(String prefix) { + return new ConfigKey(prefix + getKey(), getValueClass(), getDescription()); + } + +} diff --git a/src/org/traccar/config/Keys.java b/src/org/traccar/config/Keys.java index 5b26854ed..aea8e1ebe 100644 --- a/src/org/traccar/config/Keys.java +++ b/src/org/traccar/config/Keys.java @@ -17,6 +17,19 @@ package org.traccar.config; public final class Keys { + public static final ConfigSuffix PROTOCOL_TIMEOUT = new ConfigSuffix( + ".timeout", + Integer.class, + "Connection timeout value in seconds. Because sometimes there is no way to detect lost TCP connection, " + + "old connections stay in open state. On most systems there is a limit on number of open " + + "connection, so this leads to problems with establishing new connections when number of " + + "devices is high or devices data connections are unstable."); + + public static final ConfigKey SERVER_TIMEOUT = new ConfigKey( + "server.timeout", + Integer.class, + "Server wide connection timeout value in seconds. See protocol timeout for more information."); + public static final ConfigKey EXTRA_HANDLERS = new ConfigKey( "extra.handlers", String.class, diff --git a/src/org/traccar/processing/ComputedAttributesHandler.java b/src/org/traccar/handler/ComputedAttributesHandler.java index c346661d3..4b5b8c20d 100644 --- a/src/org/traccar/processing/ComputedAttributesHandler.java +++ b/src/org/traccar/handler/ComputedAttributesHandler.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.traccar.processing; +package org.traccar.handler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -51,7 +51,7 @@ public class ComputedAttributesHandler extends BaseDataHandler { engine.setStrict(true); engine.setFunctions(Collections.singletonMap("math", (Object) Math.class)); if (Context.getConfig() != null) { - mapDeviceAttributes = Context.getConfig().getBoolean("processing.computedAttributes.deviceAttributes"); + mapDeviceAttributes = Context.getConfig().getBoolean("handler.computedAttributes.deviceAttributes"); } } diff --git a/src/org/traccar/processing/CopyAttributesHandler.java b/src/org/traccar/handler/CopyAttributesHandler.java index bdd73b141..ce37e09cc 100644 --- a/src/org/traccar/processing/CopyAttributesHandler.java +++ b/src/org/traccar/handler/CopyAttributesHandler.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.traccar.processing; +package org.traccar.handler; import io.netty.channel.ChannelHandler; import org.traccar.BaseDataHandler; @@ -34,7 +34,7 @@ public class CopyAttributesHandler extends BaseDataHandler { @Override protected Position handlePosition(Position position) { String attributesString = Context.getDeviceManager().lookupAttributeString( - position.getDeviceId(), "processing.copyAttributes", "", true); + position.getDeviceId(), "handler.copyAttributes", "", true); Position last = getLastPosition(position.getDeviceId()); if (attributesString.isEmpty()) { attributesString = Position.KEY_DRIVER_UNIQUE_ID; diff --git a/src/org/traccar/processing/FilterHandler.java b/src/org/traccar/handler/FilterHandler.java index eced5d253..dceaede01 100644 --- a/src/org/traccar/processing/FilterHandler.java +++ b/src/org/traccar/handler/FilterHandler.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.traccar.processing; +package org.traccar.handler; import io.netty.channel.ChannelHandler; import org.slf4j.Logger; diff --git a/src/org/traccar/handler/NetworkMessageHandler.java b/src/org/traccar/handler/NetworkMessageHandler.java new file mode 100644 index 000000000..b1d926bfa --- /dev/null +++ b/src/org/traccar/handler/NetworkMessageHandler.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import org.traccar.NetworkMessage; + +import java.net.InetSocketAddress; + +public class NetworkMessageHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (ctx.channel() instanceof DatagramChannel) { + DatagramPacket packet = (DatagramPacket) msg; + ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); + } else if (msg instanceof ByteBuf) { + ByteBuf buffer = (ByteBuf) msg; + ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof NetworkMessage) { + NetworkMessage message = (NetworkMessage) msg; + if (ctx.channel() instanceof DatagramChannel) { + InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); + InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); + ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + } else { + ctx.write(message.getMessage(), promise); + } + } else { + ctx.write(msg, promise); + } + } + +} diff --git a/src/org/traccar/handler/OpenChannelHandler.java b/src/org/traccar/handler/OpenChannelHandler.java new file mode 100644 index 000000000..d09d617ab --- /dev/null +++ b/src/org/traccar/handler/OpenChannelHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import org.traccar.TrackerServer; + +public class OpenChannelHandler extends ChannelDuplexHandler { + + private final TrackerServer server; + + public OpenChannelHandler(TrackerServer server) { + this.server = server; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + server.getChannelGroup().add(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + server.getChannelGroup().remove(ctx.channel()); + } + +} diff --git a/src/org/traccar/handler/StandardLoggingHandler.java b/src/org/traccar/handler/StandardLoggingHandler.java new file mode 100644 index 000000000..88010458f --- /dev/null +++ b/src/org/traccar/handler/StandardLoggingHandler.java @@ -0,0 +1,81 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.NetworkMessage; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class StandardLoggingHandler extends ChannelDuplexHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardLoggingHandler.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log(ctx, false, msg); + super.channelRead(ctx, msg); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log(ctx, true, msg); + super.write(ctx, msg, promise); + } + + public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { + if (o instanceof NetworkMessage) { + NetworkMessage networkMessage = (NetworkMessage) o; + if (networkMessage.getMessage() instanceof ByteBuf) { + log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); + } + } else if (o instanceof ByteBuf) { + log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); + } + } + + public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { + StringBuilder message = new StringBuilder(); + + message.append("[").append(ctx.channel().id().asShortText()).append(": "); + message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort()); + if (downstream) { + message.append(" > "); + } else { + message.append(" < "); + } + + if (remoteAddress instanceof InetSocketAddress) { + message.append(((InetSocketAddress) remoteAddress).getHostString()); + } else { + message.append("unknown"); + } + message.append("]"); + + message.append(" HEX: "); + message.append(ByteBufUtil.hexDump(buf)); + + LOGGER.info(message.toString()); + } + +} diff --git a/test/org/traccar/processing/ComputedAttributesTest.java b/test/org/traccar/handler/ComputedAttributesTest.java index 160067915..8f4f69bcd 100644 --- a/test/org/traccar/processing/ComputedAttributesTest.java +++ b/test/org/traccar/handler/ComputedAttributesTest.java @@ -1,4 +1,4 @@ -package org.traccar.processing; +package org.traccar.handler; import java.util.Date; diff --git a/test/org/traccar/processing/FilterHandlerTest.java b/test/org/traccar/handler/FilterHandlerTest.java index a497f1043..ad8d244a6 100644 --- a/test/org/traccar/processing/FilterHandlerTest.java +++ b/test/org/traccar/handler/FilterHandlerTest.java @@ -1,4 +1,4 @@ -package org.traccar.processing; +package org.traccar.handler; import org.junit.Before; import org.junit.Test; |