aboutsummaryrefslogtreecommitdiff
path: root/src/org
diff options
context:
space:
mode:
authorAnton Tananaev <anton.tananaev@gmail.com>2019-02-23 15:23:35 -0800
committerAnton Tananaev <anton.tananaev@gmail.com>2019-02-23 15:23:35 -0800
commit563243a8da888244e910a4a7a10fb86ad525fdd4 (patch)
tree0a50db4a2ca280d3cd3b2b78065d31ef96e5fef5 /src/org
parent7aa3760f02361a04e2c3b73ba5b9cc41bcb8ec3d (diff)
downloadtraccar-server-563243a8da888244e910a4a7a10fb86ad525fdd4.tar.gz
traccar-server-563243a8da888244e910a4a7a10fb86ad525fdd4.tar.bz2
traccar-server-563243a8da888244e910a4a7a10fb86ad525fdd4.zip
Handler refactoring
Diffstat (limited to 'src/org')
-rw-r--r--src/org/traccar/BasePipelineFactory.java138
-rw-r--r--src/org/traccar/MainModule.java2
-rw-r--r--src/org/traccar/api/resource/AttributeResource.java2
-rw-r--r--src/org/traccar/config/ConfigSuffix.java28
-rw-r--r--src/org/traccar/config/Keys.java13
-rw-r--r--src/org/traccar/handler/ComputedAttributesHandler.java (renamed from src/org/traccar/processing/ComputedAttributesHandler.java)4
-rw-r--r--src/org/traccar/handler/CopyAttributesHandler.java (renamed from src/org/traccar/processing/CopyAttributesHandler.java)4
-rw-r--r--src/org/traccar/handler/FilterHandler.java (renamed from src/org/traccar/processing/FilterHandler.java)2
-rw-r--r--src/org/traccar/handler/NetworkMessageHandler.java57
-rw-r--r--src/org/traccar/handler/OpenChannelHandler.java42
-rw-r--r--src/org/traccar/handler/StandardLoggingHandler.java81
11 files changed, 241 insertions, 132 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index 022eeeffa..7f617c470 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -15,23 +15,17 @@
*/
package org.traccar;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.DatagramChannel;
-import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.config.Keys;
+import org.traccar.events.AlertEventHandler;
import org.traccar.events.CommandResultEventHandler;
import org.traccar.events.DriverEventHandler;
import org.traccar.events.FuelDropEventHandler;
@@ -40,13 +34,13 @@ import org.traccar.events.IgnitionEventHandler;
import org.traccar.events.MaintenanceEventHandler;
import org.traccar.events.MotionEventHandler;
import org.traccar.events.OverspeedEventHandler;
-import org.traccar.events.AlertEventHandler;
-import org.traccar.processing.ComputedAttributesHandler;
-import org.traccar.processing.CopyAttributesHandler;
-import org.traccar.processing.FilterHandler;
+import org.traccar.handler.ComputedAttributesHandler;
+import org.traccar.handler.CopyAttributesHandler;
+import org.traccar.handler.FilterHandler;
+import org.traccar.handler.NetworkMessageHandler;
+import org.traccar.handler.OpenChannelHandler;
+import org.traccar.handler.StandardLoggingHandler;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.Map;
public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
@@ -76,119 +70,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
private MaintenanceEventHandler maintenanceEventHandler;
private DriverEventHandler driverEventHandler;
- private static final class OpenChannelHandler extends ChannelDuplexHandler {
-
- private final TrackerServer server;
-
- private OpenChannelHandler(TrackerServer server) {
- this.server = server;
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- server.getChannelGroup().add(ctx.channel());
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- super.channelInactive(ctx);
- server.getChannelGroup().remove(ctx.channel());
- }
-
- }
-
- private static class NetworkMessageHandler extends ChannelDuplexHandler {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- if (ctx.channel() instanceof DatagramChannel) {
- DatagramPacket packet = (DatagramPacket) msg;
- ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender()));
- } else if (msg instanceof ByteBuf) {
- ByteBuf buffer = (ByteBuf) msg;
- ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress()));
- }
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- if (msg instanceof NetworkMessage) {
- NetworkMessage message = (NetworkMessage) msg;
- if (ctx.channel() instanceof DatagramChannel) {
- InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
- InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
- ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
- } else {
- ctx.write(message.getMessage(), promise);
- }
- } else {
- ctx.write(msg, promise);
- }
- }
-
- }
-
- private static class StandardLoggingHandler extends ChannelDuplexHandler {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- log(ctx, false, msg);
- super.channelRead(ctx, msg);
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- log(ctx, true, msg);
- super.write(ctx, msg, promise);
- }
-
- public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
- if (o instanceof NetworkMessage) {
- NetworkMessage networkMessage = (NetworkMessage) o;
- if (networkMessage.getMessage() instanceof ByteBuf) {
- log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage());
- }
- } else if (o instanceof ByteBuf) {
- log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o);
- }
- }
-
- public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) {
- StringBuilder message = new StringBuilder();
-
- message.append("[").append(ctx.channel().id().asShortText()).append(": ");
- message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort());
- if (downstream) {
- message.append(" > ");
- } else {
- message.append(" < ");
- }
-
- if (remoteAddress instanceof InetSocketAddress) {
- message.append(((InetSocketAddress) remoteAddress).getHostString());
- } else {
- message.append("unknown");
- }
- message.append("]");
-
- message.append(" HEX: ");
- message.append(ByteBufUtil.hexDump(buf));
-
- LOGGER.info(message.toString());
- }
-
- }
-
public BasePipelineFactory(TrackerServer server, String protocol) {
this.server = server;
- timeout = Context.getConfig().getInteger(protocol + ".timeout");
+ timeout = Context.getConfig().getInteger(Keys.PROTOCOL_TIMEOUT.withPrefix(protocol));
if (timeout == 0) {
- timeout = Context.getConfig().getInteger(protocol + ".resetDelay"); // temporary
- if (timeout == 0) {
- timeout = Context.getConfig().getInteger("server.timeout");
- }
+ timeout = Context.getConfig().getInteger(Keys.SERVER_TIMEOUT);
}
distanceHandler = new DistanceHandler(
@@ -196,7 +83,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
Context.getConfig().getInteger("coordinates.minError"),
Context.getConfig().getInteger("coordinates.maxError"));
- if (Context.getConfig().getBoolean("processing.remoteAddress.enable")) {
+ if (Context.getConfig().getBoolean("handler.remoteAddress.enable")) {
remoteAddressHandler = new RemoteAddressHandler();
}
@@ -223,7 +110,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
hemisphereHandler = new HemisphereHandler();
}
- if (Context.getConfig().getBoolean("processing.copyAttributes.enable")) {
+ if (Context.getConfig().getBoolean("handler.copyAttributes.enable")) {
copyAttributesHandler = new CopyAttributesHandler();
}
@@ -270,8 +157,9 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
}
@Override
- protected void initChannel(Channel channel) throws Exception {
+ protected void initChannel(Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
+
if (timeout > 0 && !server.isDatagram()) {
pipeline.addLast(new IdleStateHandler(timeout, 0, 0));
}
diff --git a/src/org/traccar/MainModule.java b/src/org/traccar/MainModule.java
index 967b9cd1f..30cf25d3d 100644
--- a/src/org/traccar/MainModule.java
+++ b/src/org/traccar/MainModule.java
@@ -22,7 +22,7 @@ import com.google.inject.Singleton;
import org.traccar.config.Config;
import org.traccar.config.Keys;
import org.traccar.database.IdentityManager;
-import org.traccar.processing.FilterHandler;
+import org.traccar.handler.FilterHandler;
import javax.ws.rs.client.Client;
diff --git a/src/org/traccar/api/resource/AttributeResource.java b/src/org/traccar/api/resource/AttributeResource.java
index d10ca4a72..fefc84041 100644
--- a/src/org/traccar/api/resource/AttributeResource.java
+++ b/src/org/traccar/api/resource/AttributeResource.java
@@ -33,7 +33,7 @@ import org.traccar.Context;
import org.traccar.api.ExtendedObjectResource;
import org.traccar.model.Attribute;
import org.traccar.model.Position;
-import org.traccar.processing.ComputedAttributesHandler;
+import org.traccar.handler.ComputedAttributesHandler;
@Path("attributes/computed")
@Produces(MediaType.APPLICATION_JSON)
diff --git a/src/org/traccar/config/ConfigSuffix.java b/src/org/traccar/config/ConfigSuffix.java
new file mode 100644
index 000000000..bfecfe197
--- /dev/null
+++ b/src/org/traccar/config/ConfigSuffix.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2019 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.config;
+
+public class ConfigSuffix extends ConfigKey {
+
+ ConfigSuffix(String key, Class clazz, String description) {
+ super(key, clazz, description);
+ }
+
+ public ConfigKey withPrefix(String prefix) {
+ return new ConfigKey(prefix + getKey(), getValueClass(), getDescription());
+ }
+
+}
diff --git a/src/org/traccar/config/Keys.java b/src/org/traccar/config/Keys.java
index 5b26854ed..aea8e1ebe 100644
--- a/src/org/traccar/config/Keys.java
+++ b/src/org/traccar/config/Keys.java
@@ -17,6 +17,19 @@ package org.traccar.config;
public final class Keys {
+ public static final ConfigSuffix PROTOCOL_TIMEOUT = new ConfigSuffix(
+ ".timeout",
+ Integer.class,
+ "Connection timeout value in seconds. Because sometimes there is no way to detect lost TCP connection, "
+ + "old connections stay in open state. On most systems there is a limit on number of open "
+ + "connection, so this leads to problems with establishing new connections when number of "
+ + "devices is high or devices data connections are unstable.");
+
+ public static final ConfigKey SERVER_TIMEOUT = new ConfigKey(
+ "server.timeout",
+ Integer.class,
+ "Server wide connection timeout value in seconds. See protocol timeout for more information.");
+
public static final ConfigKey EXTRA_HANDLERS = new ConfigKey(
"extra.handlers",
String.class,
diff --git a/src/org/traccar/processing/ComputedAttributesHandler.java b/src/org/traccar/handler/ComputedAttributesHandler.java
index c346661d3..4b5b8c20d 100644
--- a/src/org/traccar/processing/ComputedAttributesHandler.java
+++ b/src/org/traccar/handler/ComputedAttributesHandler.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.traccar.processing;
+package org.traccar.handler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -51,7 +51,7 @@ public class ComputedAttributesHandler extends BaseDataHandler {
engine.setStrict(true);
engine.setFunctions(Collections.singletonMap("math", (Object) Math.class));
if (Context.getConfig() != null) {
- mapDeviceAttributes = Context.getConfig().getBoolean("processing.computedAttributes.deviceAttributes");
+ mapDeviceAttributes = Context.getConfig().getBoolean("handler.computedAttributes.deviceAttributes");
}
}
diff --git a/src/org/traccar/processing/CopyAttributesHandler.java b/src/org/traccar/handler/CopyAttributesHandler.java
index bdd73b141..ce37e09cc 100644
--- a/src/org/traccar/processing/CopyAttributesHandler.java
+++ b/src/org/traccar/handler/CopyAttributesHandler.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.traccar.processing;
+package org.traccar.handler;
import io.netty.channel.ChannelHandler;
import org.traccar.BaseDataHandler;
@@ -34,7 +34,7 @@ public class CopyAttributesHandler extends BaseDataHandler {
@Override
protected Position handlePosition(Position position) {
String attributesString = Context.getDeviceManager().lookupAttributeString(
- position.getDeviceId(), "processing.copyAttributes", "", true);
+ position.getDeviceId(), "handler.copyAttributes", "", true);
Position last = getLastPosition(position.getDeviceId());
if (attributesString.isEmpty()) {
attributesString = Position.KEY_DRIVER_UNIQUE_ID;
diff --git a/src/org/traccar/processing/FilterHandler.java b/src/org/traccar/handler/FilterHandler.java
index eced5d253..dceaede01 100644
--- a/src/org/traccar/processing/FilterHandler.java
+++ b/src/org/traccar/handler/FilterHandler.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.traccar.processing;
+package org.traccar.handler;
import io.netty.channel.ChannelHandler;
import org.slf4j.Logger;
diff --git a/src/org/traccar/handler/NetworkMessageHandler.java b/src/org/traccar/handler/NetworkMessageHandler.java
new file mode 100644
index 000000000..b1d926bfa
--- /dev/null
+++ b/src/org/traccar/handler/NetworkMessageHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.DatagramPacket;
+import org.traccar.NetworkMessage;
+
+import java.net.InetSocketAddress;
+
+public class NetworkMessageHandler extends ChannelDuplexHandler {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ if (ctx.channel() instanceof DatagramChannel) {
+ DatagramPacket packet = (DatagramPacket) msg;
+ ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender()));
+ } else if (msg instanceof ByteBuf) {
+ ByteBuf buffer = (ByteBuf) msg;
+ ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress()));
+ }
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+ if (msg instanceof NetworkMessage) {
+ NetworkMessage message = (NetworkMessage) msg;
+ if (ctx.channel() instanceof DatagramChannel) {
+ InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
+ InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
+ ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
+ } else {
+ ctx.write(message.getMessage(), promise);
+ }
+ } else {
+ ctx.write(msg, promise);
+ }
+ }
+
+}
diff --git a/src/org/traccar/handler/OpenChannelHandler.java b/src/org/traccar/handler/OpenChannelHandler.java
new file mode 100644
index 000000000..d09d617ab
--- /dev/null
+++ b/src/org/traccar/handler/OpenChannelHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2019 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.handler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.traccar.TrackerServer;
+
+public class OpenChannelHandler extends ChannelDuplexHandler {
+
+ private final TrackerServer server;
+
+ public OpenChannelHandler(TrackerServer server) {
+ this.server = server;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ super.channelActive(ctx);
+ server.getChannelGroup().add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ server.getChannelGroup().remove(ctx.channel());
+ }
+
+}
diff --git a/src/org/traccar/handler/StandardLoggingHandler.java b/src/org/traccar/handler/StandardLoggingHandler.java
new file mode 100644
index 000000000..88010458f
--- /dev/null
+++ b/src/org/traccar/handler/StandardLoggingHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.traccar.NetworkMessage;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+public class StandardLoggingHandler extends ChannelDuplexHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StandardLoggingHandler.class);
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ log(ctx, false, msg);
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ log(ctx, true, msg);
+ super.write(ctx, msg, promise);
+ }
+
+ public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
+ if (o instanceof NetworkMessage) {
+ NetworkMessage networkMessage = (NetworkMessage) o;
+ if (networkMessage.getMessage() instanceof ByteBuf) {
+ log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage());
+ }
+ } else if (o instanceof ByteBuf) {
+ log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o);
+ }
+ }
+
+ public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) {
+ StringBuilder message = new StringBuilder();
+
+ message.append("[").append(ctx.channel().id().asShortText()).append(": ");
+ message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort());
+ if (downstream) {
+ message.append(" > ");
+ } else {
+ message.append(" < ");
+ }
+
+ if (remoteAddress instanceof InetSocketAddress) {
+ message.append(((InetSocketAddress) remoteAddress).getHostString());
+ } else {
+ message.append("unknown");
+ }
+ message.append("]");
+
+ message.append(" HEX: ");
+ message.append(ByteBufUtil.hexDump(buf));
+
+ LOGGER.info(message.toString());
+ }
+
+}