From 563243a8da888244e910a4a7a10fb86ad525fdd4 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sat, 23 Feb 2019 15:23:35 -0800 Subject: Handler refactoring --- src/org/traccar/BasePipelineFactory.java | 138 ++------------ src/org/traccar/MainModule.java | 2 +- .../traccar/api/resource/AttributeResource.java | 2 +- src/org/traccar/config/ConfigSuffix.java | 28 +++ src/org/traccar/config/Keys.java | 13 ++ .../traccar/handler/ComputedAttributesHandler.java | 129 +++++++++++++ src/org/traccar/handler/CopyAttributesHandler.java | 54 ++++++ src/org/traccar/handler/FilterHandler.java | 206 +++++++++++++++++++++ src/org/traccar/handler/NetworkMessageHandler.java | 57 ++++++ src/org/traccar/handler/OpenChannelHandler.java | 42 +++++ .../traccar/handler/StandardLoggingHandler.java | 81 ++++++++ .../processing/ComputedAttributesHandler.java | 129 ------------- .../traccar/processing/CopyAttributesHandler.java | 54 ------ src/org/traccar/processing/FilterHandler.java | 206 --------------------- 14 files changed, 625 insertions(+), 516 deletions(-) create mode 100644 src/org/traccar/config/ConfigSuffix.java create mode 100644 src/org/traccar/handler/ComputedAttributesHandler.java create mode 100644 src/org/traccar/handler/CopyAttributesHandler.java create mode 100644 src/org/traccar/handler/FilterHandler.java create mode 100644 src/org/traccar/handler/NetworkMessageHandler.java create mode 100644 src/org/traccar/handler/OpenChannelHandler.java create mode 100644 src/org/traccar/handler/StandardLoggingHandler.java delete mode 100644 src/org/traccar/processing/ComputedAttributesHandler.java delete mode 100644 src/org/traccar/processing/CopyAttributesHandler.java delete mode 100644 src/org/traccar/processing/FilterHandler.java (limited to 'src/org') diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java index 022eeeffa..7f617c470 100644 --- a/src/org/traccar/BasePipelineFactory.java +++ b/src/org/traccar/BasePipelineFactory.java @@ -15,23 +15,17 @@ */ package org.traccar; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.traccar.config.Keys; +import org.traccar.events.AlertEventHandler; import org.traccar.events.CommandResultEventHandler; import org.traccar.events.DriverEventHandler; import org.traccar.events.FuelDropEventHandler; @@ -40,13 +34,13 @@ import org.traccar.events.IgnitionEventHandler; import org.traccar.events.MaintenanceEventHandler; import org.traccar.events.MotionEventHandler; import org.traccar.events.OverspeedEventHandler; -import org.traccar.events.AlertEventHandler; -import org.traccar.processing.ComputedAttributesHandler; -import org.traccar.processing.CopyAttributesHandler; -import org.traccar.processing.FilterHandler; +import org.traccar.handler.ComputedAttributesHandler; +import org.traccar.handler.CopyAttributesHandler; +import org.traccar.handler.FilterHandler; +import org.traccar.handler.NetworkMessageHandler; +import org.traccar.handler.OpenChannelHandler; +import org.traccar.handler.StandardLoggingHandler; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.Map; public abstract class BasePipelineFactory extends ChannelInitializer { @@ -76,119 +70,12 @@ public abstract class BasePipelineFactory extends ChannelInitializer { private MaintenanceEventHandler maintenanceEventHandler; private DriverEventHandler driverEventHandler; - private static final class OpenChannelHandler extends ChannelDuplexHandler { - - private final TrackerServer server; - - private OpenChannelHandler(TrackerServer server) { - this.server = server; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - server.getChannelGroup().add(ctx.channel()); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - server.getChannelGroup().remove(ctx.channel()); - } - - } - - private static class NetworkMessageHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (ctx.channel() instanceof DatagramChannel) { - DatagramPacket packet = (DatagramPacket) msg; - ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); - } else if (msg instanceof ByteBuf) { - ByteBuf buffer = (ByteBuf) msg; - ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg instanceof NetworkMessage) { - NetworkMessage message = (NetworkMessage) msg; - if (ctx.channel() instanceof DatagramChannel) { - InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); - InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); - ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); - } else { - ctx.write(message.getMessage(), promise); - } - } else { - ctx.write(msg, promise); - } - } - - } - - private static class StandardLoggingHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log(ctx, false, msg); - super.channelRead(ctx, msg); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - log(ctx, true, msg); - super.write(ctx, msg, promise); - } - - public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { - if (o instanceof NetworkMessage) { - NetworkMessage networkMessage = (NetworkMessage) o; - if (networkMessage.getMessage() instanceof ByteBuf) { - log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); - } - } else if (o instanceof ByteBuf) { - log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); - } - } - - public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { - StringBuilder message = new StringBuilder(); - - message.append("[").append(ctx.channel().id().asShortText()).append(": "); - message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort()); - if (downstream) { - message.append(" > "); - } else { - message.append(" < "); - } - - if (remoteAddress instanceof InetSocketAddress) { - message.append(((InetSocketAddress) remoteAddress).getHostString()); - } else { - message.append("unknown"); - } - message.append("]"); - - message.append(" HEX: "); - message.append(ByteBufUtil.hexDump(buf)); - - LOGGER.info(message.toString()); - } - - } - public BasePipelineFactory(TrackerServer server, String protocol) { this.server = server; - timeout = Context.getConfig().getInteger(protocol + ".timeout"); + timeout = Context.getConfig().getInteger(Keys.PROTOCOL_TIMEOUT.withPrefix(protocol)); if (timeout == 0) { - timeout = Context.getConfig().getInteger(protocol + ".resetDelay"); // temporary - if (timeout == 0) { - timeout = Context.getConfig().getInteger("server.timeout"); - } + timeout = Context.getConfig().getInteger(Keys.SERVER_TIMEOUT); } distanceHandler = new DistanceHandler( @@ -196,7 +83,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer { Context.getConfig().getInteger("coordinates.minError"), Context.getConfig().getInteger("coordinates.maxError")); - if (Context.getConfig().getBoolean("processing.remoteAddress.enable")) { + if (Context.getConfig().getBoolean("handler.remoteAddress.enable")) { remoteAddressHandler = new RemoteAddressHandler(); } @@ -223,7 +110,7 @@ public abstract class BasePipelineFactory extends ChannelInitializer { hemisphereHandler = new HemisphereHandler(); } - if (Context.getConfig().getBoolean("processing.copyAttributes.enable")) { + if (Context.getConfig().getBoolean("handler.copyAttributes.enable")) { copyAttributesHandler = new CopyAttributesHandler(); } @@ -270,8 +157,9 @@ public abstract class BasePipelineFactory extends ChannelInitializer { } @Override - protected void initChannel(Channel channel) throws Exception { + protected void initChannel(Channel channel) { final ChannelPipeline pipeline = channel.pipeline(); + if (timeout > 0 && !server.isDatagram()) { pipeline.addLast(new IdleStateHandler(timeout, 0, 0)); } diff --git a/src/org/traccar/MainModule.java b/src/org/traccar/MainModule.java index 967b9cd1f..30cf25d3d 100644 --- a/src/org/traccar/MainModule.java +++ b/src/org/traccar/MainModule.java @@ -22,7 +22,7 @@ import com.google.inject.Singleton; import org.traccar.config.Config; import org.traccar.config.Keys; import org.traccar.database.IdentityManager; -import org.traccar.processing.FilterHandler; +import org.traccar.handler.FilterHandler; import javax.ws.rs.client.Client; diff --git a/src/org/traccar/api/resource/AttributeResource.java b/src/org/traccar/api/resource/AttributeResource.java index d10ca4a72..fefc84041 100644 --- a/src/org/traccar/api/resource/AttributeResource.java +++ b/src/org/traccar/api/resource/AttributeResource.java @@ -33,7 +33,7 @@ import org.traccar.Context; import org.traccar.api.ExtendedObjectResource; import org.traccar.model.Attribute; import org.traccar.model.Position; -import org.traccar.processing.ComputedAttributesHandler; +import org.traccar.handler.ComputedAttributesHandler; @Path("attributes/computed") @Produces(MediaType.APPLICATION_JSON) diff --git a/src/org/traccar/config/ConfigSuffix.java b/src/org/traccar/config/ConfigSuffix.java new file mode 100644 index 000000000..bfecfe197 --- /dev/null +++ b/src/org/traccar/config/ConfigSuffix.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.config; + +public class ConfigSuffix extends ConfigKey { + + ConfigSuffix(String key, Class clazz, String description) { + super(key, clazz, description); + } + + public ConfigKey withPrefix(String prefix) { + return new ConfigKey(prefix + getKey(), getValueClass(), getDescription()); + } + +} diff --git a/src/org/traccar/config/Keys.java b/src/org/traccar/config/Keys.java index 5b26854ed..aea8e1ebe 100644 --- a/src/org/traccar/config/Keys.java +++ b/src/org/traccar/config/Keys.java @@ -17,6 +17,19 @@ package org.traccar.config; public final class Keys { + public static final ConfigSuffix PROTOCOL_TIMEOUT = new ConfigSuffix( + ".timeout", + Integer.class, + "Connection timeout value in seconds. Because sometimes there is no way to detect lost TCP connection, " + + "old connections stay in open state. On most systems there is a limit on number of open " + + "connection, so this leads to problems with establishing new connections when number of " + + "devices is high or devices data connections are unstable."); + + public static final ConfigKey SERVER_TIMEOUT = new ConfigKey( + "server.timeout", + Integer.class, + "Server wide connection timeout value in seconds. See protocol timeout for more information."); + public static final ConfigKey EXTRA_HANDLERS = new ConfigKey( "extra.handlers", String.class, diff --git a/src/org/traccar/handler/ComputedAttributesHandler.java b/src/org/traccar/handler/ComputedAttributesHandler.java new file mode 100644 index 000000000..4b5b8c20d --- /dev/null +++ b/src/org/traccar/handler/ComputedAttributesHandler.java @@ -0,0 +1,129 @@ +/* + * Copyright 2017 Anton Tananaev (anton@traccar.org) + * Copyright 2017 Andrey Kunitsyn (andrey@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import io.netty.channel.ChannelHandler; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.jexl2.JexlException; +import org.apache.commons.jexl2.MapContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.BaseDataHandler; +import org.traccar.Context; +import org.traccar.model.Attribute; +import org.traccar.model.Device; +import org.traccar.model.Position; + +@ChannelHandler.Sharable +public class ComputedAttributesHandler extends BaseDataHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(ComputedAttributesHandler.class); + + private JexlEngine engine; + + private boolean mapDeviceAttributes; + + public ComputedAttributesHandler() { + engine = new JexlEngine(); + engine.setStrict(true); + engine.setFunctions(Collections.singletonMap("math", (Object) Math.class)); + if (Context.getConfig() != null) { + mapDeviceAttributes = Context.getConfig().getBoolean("handler.computedAttributes.deviceAttributes"); + } + } + + private MapContext prepareContext(Position position) { + MapContext result = new MapContext(); + if (mapDeviceAttributes) { + Device device = Context.getIdentityManager().getById(position.getDeviceId()); + if (device != null) { + for (Object key : device.getAttributes().keySet()) { + result.set((String) key, device.getAttributes().get(key)); + } + } + } + Set methods = new HashSet<>(Arrays.asList(position.getClass().getMethods())); + methods.removeAll(Arrays.asList(Object.class.getMethods())); + for (Method method : methods) { + if (method.getName().startsWith("get") && method.getParameterTypes().length == 0) { + String name = Character.toLowerCase(method.getName().charAt(3)) + method.getName().substring(4); + + try { + if (!method.getReturnType().equals(Map.class)) { + result.set(name, method.invoke(position)); + } else { + for (Object key : ((Map) method.invoke(position)).keySet()) { + result.set((String) key, ((Map) method.invoke(position)).get(key)); + } + } + } catch (IllegalAccessException | InvocationTargetException error) { + LOGGER.warn("Attribute reflection error", error); + } + } + } + return result; + } + + public Object computeAttribute(Attribute attribute, Position position) throws JexlException { + return engine.createExpression(attribute.getExpression()).evaluate(prepareContext(position)); + } + + @Override + protected Position handlePosition(Position position) { + Collection attributes = Context.getAttributesManager().getItems( + Context.getAttributesManager().getAllDeviceItems(position.getDeviceId())); + for (Attribute attribute : attributes) { + if (attribute.getAttribute() != null) { + Object result = null; + try { + result = computeAttribute(attribute, position); + } catch (JexlException error) { + LOGGER.warn("Attribute computation error", error); + } + if (result != null) { + try { + switch (attribute.getType()) { + case "number": + Number numberValue = (Number) result; + position.getAttributes().put(attribute.getAttribute(), numberValue); + break; + case "boolean": + Boolean booleanValue = (Boolean) result; + position.getAttributes().put(attribute.getAttribute(), booleanValue); + break; + default: + position.getAttributes().put(attribute.getAttribute(), result.toString()); + } + } catch (ClassCastException error) { + LOGGER.warn("Attribute cast error", error); + } + } + } + } + return position; + } + +} diff --git a/src/org/traccar/handler/CopyAttributesHandler.java b/src/org/traccar/handler/CopyAttributesHandler.java new file mode 100644 index 000000000..ce37e09cc --- /dev/null +++ b/src/org/traccar/handler/CopyAttributesHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright 2016 - 2017 Anton Tananaev (anton@traccar.org) + * Copyright 2016 - 2017 Andrey Kunitsyn (andrey@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.channel.ChannelHandler; +import org.traccar.BaseDataHandler; +import org.traccar.Context; +import org.traccar.model.Position; + +@ChannelHandler.Sharable +public class CopyAttributesHandler extends BaseDataHandler { + + private Position getLastPosition(long deviceId) { + if (Context.getIdentityManager() != null) { + return Context.getIdentityManager().getLastPosition(deviceId); + } + return null; + } + + @Override + protected Position handlePosition(Position position) { + String attributesString = Context.getDeviceManager().lookupAttributeString( + position.getDeviceId(), "handler.copyAttributes", "", true); + Position last = getLastPosition(position.getDeviceId()); + if (attributesString.isEmpty()) { + attributesString = Position.KEY_DRIVER_UNIQUE_ID; + } else { + attributesString += "," + Position.KEY_DRIVER_UNIQUE_ID; + } + if (last != null) { + for (String attribute : attributesString.split("[ ,]")) { + if (last.getAttributes().containsKey(attribute) && !position.getAttributes().containsKey(attribute)) { + position.getAttributes().put(attribute, last.getAttributes().get(attribute)); + } + } + } + return position; + } + +} diff --git a/src/org/traccar/handler/FilterHandler.java b/src/org/traccar/handler/FilterHandler.java new file mode 100644 index 000000000..dceaede01 --- /dev/null +++ b/src/org/traccar/handler/FilterHandler.java @@ -0,0 +1,206 @@ +/* + * Copyright 2014 - 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.channel.ChannelHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.BaseDataHandler; +import org.traccar.Context; +import org.traccar.config.Config; +import org.traccar.config.Keys; +import org.traccar.helper.UnitsConverter; +import org.traccar.model.Position; + +@ChannelHandler.Sharable +public class FilterHandler extends BaseDataHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(FilterHandler.class); + + private boolean filterInvalid; + private boolean filterZero; + private boolean filterDuplicate; + private long filterFuture; + private boolean filterApproximate; + private int filterAccuracy; + private boolean filterStatic; + private int filterDistance; + private int filterMaxSpeed; + private long filterMinPeriod; + private long skipLimit; + private boolean skipAttributes; + + public FilterHandler(Config config) { + filterInvalid = config.getBoolean(Keys.FILTER_INVALID); + filterZero = config.getBoolean(Keys.FILTER_ZERO); + filterDuplicate = config.getBoolean(Keys.FILTER_DUPLICATE); + filterFuture = config.getLong(Keys.FILTER_FUTURE) * 1000; + filterAccuracy = config.getInteger(Keys.FILTER_ACCURACY); + filterApproximate = config.getBoolean(Keys.FILTER_APPROXIMATE); + filterStatic = config.getBoolean(Keys.FILTER_STATIC); + filterDistance = config.getInteger(Keys.FILTER_DISTANCE); + filterMaxSpeed = config.getInteger(Keys.FILTER_MAX_SPEED); + filterMinPeriod = config.getInteger(Keys.FILTER_MIN_PERIOD) * 1000; + skipLimit = config.getLong(Keys.FILTER_SKIP_LIMIT) * 1000; + skipAttributes = config.getBoolean(Keys.FILTER_SKIP_ATTRIBUTES_ENABLE); + } + + private boolean filterInvalid(Position position) { + return filterInvalid && (!position.getValid() + || position.getLatitude() > 90 || position.getLongitude() > 180 + || position.getLatitude() < -90 || position.getLongitude() < -180); + } + + private boolean filterZero(Position position) { + return filterZero && position.getLatitude() == 0.0 && position.getLongitude() == 0.0; + } + + private boolean filterDuplicate(Position position, Position last) { + if (filterDuplicate && last != null && position.getFixTime().equals(last.getFixTime())) { + for (String key : position.getAttributes().keySet()) { + if (!last.getAttributes().containsKey(key)) { + return false; + } + } + return true; + } + return false; + } + + private boolean filterFuture(Position position) { + return filterFuture != 0 && position.getFixTime().getTime() > System.currentTimeMillis() + filterFuture; + } + + private boolean filterAccuracy(Position position) { + return filterAccuracy != 0 && position.getAccuracy() > filterAccuracy; + } + + private boolean filterApproximate(Position position) { + return filterApproximate && position.getBoolean(Position.KEY_APPROXIMATE); + } + + private boolean filterStatic(Position position) { + return filterStatic && position.getSpeed() == 0.0; + } + + private boolean filterDistance(Position position, Position last) { + if (filterDistance != 0 && last != null) { + return position.getDouble(Position.KEY_DISTANCE) < filterDistance; + } + return false; + } + + private boolean filterMaxSpeed(Position position, Position last) { + if (filterMaxSpeed != 0 && last != null) { + double distance = position.getDouble(Position.KEY_DISTANCE); + double time = position.getFixTime().getTime() - last.getFixTime().getTime(); + return UnitsConverter.knotsFromMps(distance / (time / 1000)) > filterMaxSpeed; + } + return false; + } + + private boolean filterMinPeriod(Position position, Position last) { + if (filterMinPeriod != 0 && last != null) { + long time = position.getFixTime().getTime() - last.getFixTime().getTime(); + return time > 0 && time < filterMinPeriod; + } + return false; + } + + private boolean skipLimit(Position position, Position last) { + if (skipLimit != 0 && last != null) { + return (position.getServerTime().getTime() - last.getServerTime().getTime()) > skipLimit; + } + return false; + } + + private boolean skipAttributes(Position position) { + if (skipAttributes) { + String attributesString = Context.getIdentityManager().lookupAttributeString( + position.getDeviceId(), "filter.skipAttributes", "", true); + for (String attribute : attributesString.split("[ ,]")) { + if (position.getAttributes().containsKey(attribute)) { + return true; + } + } + } + return false; + } + + private boolean filter(Position position) { + + StringBuilder filterType = new StringBuilder(); + + Position last = null; + if (Context.getIdentityManager() != null) { + last = Context.getIdentityManager().getLastPosition(position.getDeviceId()); + } + + if (filterInvalid(position)) { + filterType.append("Invalid "); + } + if (filterZero(position)) { + filterType.append("Zero "); + } + if (filterDuplicate(position, last) && !skipLimit(position, last) && !skipAttributes(position)) { + filterType.append("Duplicate "); + } + if (filterFuture(position)) { + filterType.append("Future "); + } + if (filterAccuracy(position)) { + filterType.append("Accuracy "); + } + if (filterApproximate(position)) { + filterType.append("Approximate "); + } + if (filterStatic(position) && !skipLimit(position, last) && !skipAttributes(position)) { + filterType.append("Static "); + } + if (filterDistance(position, last) && !skipLimit(position, last) && !skipAttributes(position)) { + filterType.append("Distance "); + } + if (filterMaxSpeed(position, last)) { + filterType.append("MaxSpeed "); + } + if (filterMinPeriod(position, last)) { + filterType.append("MinPeriod "); + } + + if (filterType.length() > 0) { + + StringBuilder message = new StringBuilder(); + message.append("Position filtered by "); + message.append(filterType.toString()); + message.append("filters from device: "); + message.append(Context.getIdentityManager().getById(position.getDeviceId()).getUniqueId()); + + LOGGER.info(message.toString()); + return true; + } + + return false; + } + + @Override + protected Position handlePosition(Position position) { + if (filter(position)) { + return null; + } + return position; + } + +} diff --git a/src/org/traccar/handler/NetworkMessageHandler.java b/src/org/traccar/handler/NetworkMessageHandler.java new file mode 100644 index 000000000..b1d926bfa --- /dev/null +++ b/src/org/traccar/handler/NetworkMessageHandler.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import org.traccar.NetworkMessage; + +import java.net.InetSocketAddress; + +public class NetworkMessageHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (ctx.channel() instanceof DatagramChannel) { + DatagramPacket packet = (DatagramPacket) msg; + ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); + } else if (msg instanceof ByteBuf) { + ByteBuf buffer = (ByteBuf) msg; + ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof NetworkMessage) { + NetworkMessage message = (NetworkMessage) msg; + if (ctx.channel() instanceof DatagramChannel) { + InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); + InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); + ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + } else { + ctx.write(message.getMessage(), promise); + } + } else { + ctx.write(msg, promise); + } + } + +} diff --git a/src/org/traccar/handler/OpenChannelHandler.java b/src/org/traccar/handler/OpenChannelHandler.java new file mode 100644 index 000000000..d09d617ab --- /dev/null +++ b/src/org/traccar/handler/OpenChannelHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import org.traccar.TrackerServer; + +public class OpenChannelHandler extends ChannelDuplexHandler { + + private final TrackerServer server; + + public OpenChannelHandler(TrackerServer server) { + this.server = server; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + server.getChannelGroup().add(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + server.getChannelGroup().remove(ctx.channel()); + } + +} diff --git a/src/org/traccar/handler/StandardLoggingHandler.java b/src/org/traccar/handler/StandardLoggingHandler.java new file mode 100644 index 000000000..88010458f --- /dev/null +++ b/src/org/traccar/handler/StandardLoggingHandler.java @@ -0,0 +1,81 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.NetworkMessage; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class StandardLoggingHandler extends ChannelDuplexHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardLoggingHandler.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log(ctx, false, msg); + super.channelRead(ctx, msg); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log(ctx, true, msg); + super.write(ctx, msg, promise); + } + + public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { + if (o instanceof NetworkMessage) { + NetworkMessage networkMessage = (NetworkMessage) o; + if (networkMessage.getMessage() instanceof ByteBuf) { + log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); + } + } else if (o instanceof ByteBuf) { + log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); + } + } + + public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { + StringBuilder message = new StringBuilder(); + + message.append("[").append(ctx.channel().id().asShortText()).append(": "); + message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort()); + if (downstream) { + message.append(" > "); + } else { + message.append(" < "); + } + + if (remoteAddress instanceof InetSocketAddress) { + message.append(((InetSocketAddress) remoteAddress).getHostString()); + } else { + message.append("unknown"); + } + message.append("]"); + + message.append(" HEX: "); + message.append(ByteBufUtil.hexDump(buf)); + + LOGGER.info(message.toString()); + } + +} diff --git a/src/org/traccar/processing/ComputedAttributesHandler.java b/src/org/traccar/processing/ComputedAttributesHandler.java deleted file mode 100644 index c346661d3..000000000 --- a/src/org/traccar/processing/ComputedAttributesHandler.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2017 Anton Tananaev (anton@traccar.org) - * Copyright 2017 Andrey Kunitsyn (andrey@traccar.org) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.traccar.processing; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import io.netty.channel.ChannelHandler; -import org.apache.commons.jexl2.JexlEngine; -import org.apache.commons.jexl2.JexlException; -import org.apache.commons.jexl2.MapContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.traccar.BaseDataHandler; -import org.traccar.Context; -import org.traccar.model.Attribute; -import org.traccar.model.Device; -import org.traccar.model.Position; - -@ChannelHandler.Sharable -public class ComputedAttributesHandler extends BaseDataHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(ComputedAttributesHandler.class); - - private JexlEngine engine; - - private boolean mapDeviceAttributes; - - public ComputedAttributesHandler() { - engine = new JexlEngine(); - engine.setStrict(true); - engine.setFunctions(Collections.singletonMap("math", (Object) Math.class)); - if (Context.getConfig() != null) { - mapDeviceAttributes = Context.getConfig().getBoolean("processing.computedAttributes.deviceAttributes"); - } - } - - private MapContext prepareContext(Position position) { - MapContext result = new MapContext(); - if (mapDeviceAttributes) { - Device device = Context.getIdentityManager().getById(position.getDeviceId()); - if (device != null) { - for (Object key : device.getAttributes().keySet()) { - result.set((String) key, device.getAttributes().get(key)); - } - } - } - Set methods = new HashSet<>(Arrays.asList(position.getClass().getMethods())); - methods.removeAll(Arrays.asList(Object.class.getMethods())); - for (Method method : methods) { - if (method.getName().startsWith("get") && method.getParameterTypes().length == 0) { - String name = Character.toLowerCase(method.getName().charAt(3)) + method.getName().substring(4); - - try { - if (!method.getReturnType().equals(Map.class)) { - result.set(name, method.invoke(position)); - } else { - for (Object key : ((Map) method.invoke(position)).keySet()) { - result.set((String) key, ((Map) method.invoke(position)).get(key)); - } - } - } catch (IllegalAccessException | InvocationTargetException error) { - LOGGER.warn("Attribute reflection error", error); - } - } - } - return result; - } - - public Object computeAttribute(Attribute attribute, Position position) throws JexlException { - return engine.createExpression(attribute.getExpression()).evaluate(prepareContext(position)); - } - - @Override - protected Position handlePosition(Position position) { - Collection attributes = Context.getAttributesManager().getItems( - Context.getAttributesManager().getAllDeviceItems(position.getDeviceId())); - for (Attribute attribute : attributes) { - if (attribute.getAttribute() != null) { - Object result = null; - try { - result = computeAttribute(attribute, position); - } catch (JexlException error) { - LOGGER.warn("Attribute computation error", error); - } - if (result != null) { - try { - switch (attribute.getType()) { - case "number": - Number numberValue = (Number) result; - position.getAttributes().put(attribute.getAttribute(), numberValue); - break; - case "boolean": - Boolean booleanValue = (Boolean) result; - position.getAttributes().put(attribute.getAttribute(), booleanValue); - break; - default: - position.getAttributes().put(attribute.getAttribute(), result.toString()); - } - } catch (ClassCastException error) { - LOGGER.warn("Attribute cast error", error); - } - } - } - } - return position; - } - -} diff --git a/src/org/traccar/processing/CopyAttributesHandler.java b/src/org/traccar/processing/CopyAttributesHandler.java deleted file mode 100644 index bdd73b141..000000000 --- a/src/org/traccar/processing/CopyAttributesHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2016 - 2017 Anton Tananaev (anton@traccar.org) - * Copyright 2016 - 2017 Andrey Kunitsyn (andrey@traccar.org) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.traccar.processing; - -import io.netty.channel.ChannelHandler; -import org.traccar.BaseDataHandler; -import org.traccar.Context; -import org.traccar.model.Position; - -@ChannelHandler.Sharable -public class CopyAttributesHandler extends BaseDataHandler { - - private Position getLastPosition(long deviceId) { - if (Context.getIdentityManager() != null) { - return Context.getIdentityManager().getLastPosition(deviceId); - } - return null; - } - - @Override - protected Position handlePosition(Position position) { - String attributesString = Context.getDeviceManager().lookupAttributeString( - position.getDeviceId(), "processing.copyAttributes", "", true); - Position last = getLastPosition(position.getDeviceId()); - if (attributesString.isEmpty()) { - attributesString = Position.KEY_DRIVER_UNIQUE_ID; - } else { - attributesString += "," + Position.KEY_DRIVER_UNIQUE_ID; - } - if (last != null) { - for (String attribute : attributesString.split("[ ,]")) { - if (last.getAttributes().containsKey(attribute) && !position.getAttributes().containsKey(attribute)) { - position.getAttributes().put(attribute, last.getAttributes().get(attribute)); - } - } - } - return position; - } - -} diff --git a/src/org/traccar/processing/FilterHandler.java b/src/org/traccar/processing/FilterHandler.java deleted file mode 100644 index eced5d253..000000000 --- a/src/org/traccar/processing/FilterHandler.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright 2014 - 2018 Anton Tananaev (anton@traccar.org) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.traccar.processing; - -import io.netty.channel.ChannelHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.traccar.BaseDataHandler; -import org.traccar.Context; -import org.traccar.config.Config; -import org.traccar.config.Keys; -import org.traccar.helper.UnitsConverter; -import org.traccar.model.Position; - -@ChannelHandler.Sharable -public class FilterHandler extends BaseDataHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(FilterHandler.class); - - private boolean filterInvalid; - private boolean filterZero; - private boolean filterDuplicate; - private long filterFuture; - private boolean filterApproximate; - private int filterAccuracy; - private boolean filterStatic; - private int filterDistance; - private int filterMaxSpeed; - private long filterMinPeriod; - private long skipLimit; - private boolean skipAttributes; - - public FilterHandler(Config config) { - filterInvalid = config.getBoolean(Keys.FILTER_INVALID); - filterZero = config.getBoolean(Keys.FILTER_ZERO); - filterDuplicate = config.getBoolean(Keys.FILTER_DUPLICATE); - filterFuture = config.getLong(Keys.FILTER_FUTURE) * 1000; - filterAccuracy = config.getInteger(Keys.FILTER_ACCURACY); - filterApproximate = config.getBoolean(Keys.FILTER_APPROXIMATE); - filterStatic = config.getBoolean(Keys.FILTER_STATIC); - filterDistance = config.getInteger(Keys.FILTER_DISTANCE); - filterMaxSpeed = config.getInteger(Keys.FILTER_MAX_SPEED); - filterMinPeriod = config.getInteger(Keys.FILTER_MIN_PERIOD) * 1000; - skipLimit = config.getLong(Keys.FILTER_SKIP_LIMIT) * 1000; - skipAttributes = config.getBoolean(Keys.FILTER_SKIP_ATTRIBUTES_ENABLE); - } - - private boolean filterInvalid(Position position) { - return filterInvalid && (!position.getValid() - || position.getLatitude() > 90 || position.getLongitude() > 180 - || position.getLatitude() < -90 || position.getLongitude() < -180); - } - - private boolean filterZero(Position position) { - return filterZero && position.getLatitude() == 0.0 && position.getLongitude() == 0.0; - } - - private boolean filterDuplicate(Position position, Position last) { - if (filterDuplicate && last != null && position.getFixTime().equals(last.getFixTime())) { - for (String key : position.getAttributes().keySet()) { - if (!last.getAttributes().containsKey(key)) { - return false; - } - } - return true; - } - return false; - } - - private boolean filterFuture(Position position) { - return filterFuture != 0 && position.getFixTime().getTime() > System.currentTimeMillis() + filterFuture; - } - - private boolean filterAccuracy(Position position) { - return filterAccuracy != 0 && position.getAccuracy() > filterAccuracy; - } - - private boolean filterApproximate(Position position) { - return filterApproximate && position.getBoolean(Position.KEY_APPROXIMATE); - } - - private boolean filterStatic(Position position) { - return filterStatic && position.getSpeed() == 0.0; - } - - private boolean filterDistance(Position position, Position last) { - if (filterDistance != 0 && last != null) { - return position.getDouble(Position.KEY_DISTANCE) < filterDistance; - } - return false; - } - - private boolean filterMaxSpeed(Position position, Position last) { - if (filterMaxSpeed != 0 && last != null) { - double distance = position.getDouble(Position.KEY_DISTANCE); - double time = position.getFixTime().getTime() - last.getFixTime().getTime(); - return UnitsConverter.knotsFromMps(distance / (time / 1000)) > filterMaxSpeed; - } - return false; - } - - private boolean filterMinPeriod(Position position, Position last) { - if (filterMinPeriod != 0 && last != null) { - long time = position.getFixTime().getTime() - last.getFixTime().getTime(); - return time > 0 && time < filterMinPeriod; - } - return false; - } - - private boolean skipLimit(Position position, Position last) { - if (skipLimit != 0 && last != null) { - return (position.getServerTime().getTime() - last.getServerTime().getTime()) > skipLimit; - } - return false; - } - - private boolean skipAttributes(Position position) { - if (skipAttributes) { - String attributesString = Context.getIdentityManager().lookupAttributeString( - position.getDeviceId(), "filter.skipAttributes", "", true); - for (String attribute : attributesString.split("[ ,]")) { - if (position.getAttributes().containsKey(attribute)) { - return true; - } - } - } - return false; - } - - private boolean filter(Position position) { - - StringBuilder filterType = new StringBuilder(); - - Position last = null; - if (Context.getIdentityManager() != null) { - last = Context.getIdentityManager().getLastPosition(position.getDeviceId()); - } - - if (filterInvalid(position)) { - filterType.append("Invalid "); - } - if (filterZero(position)) { - filterType.append("Zero "); - } - if (filterDuplicate(position, last) && !skipLimit(position, last) && !skipAttributes(position)) { - filterType.append("Duplicate "); - } - if (filterFuture(position)) { - filterType.append("Future "); - } - if (filterAccuracy(position)) { - filterType.append("Accuracy "); - } - if (filterApproximate(position)) { - filterType.append("Approximate "); - } - if (filterStatic(position) && !skipLimit(position, last) && !skipAttributes(position)) { - filterType.append("Static "); - } - if (filterDistance(position, last) && !skipLimit(position, last) && !skipAttributes(position)) { - filterType.append("Distance "); - } - if (filterMaxSpeed(position, last)) { - filterType.append("MaxSpeed "); - } - if (filterMinPeriod(position, last)) { - filterType.append("MinPeriod "); - } - - if (filterType.length() > 0) { - - StringBuilder message = new StringBuilder(); - message.append("Position filtered by "); - message.append(filterType.toString()); - message.append("filters from device: "); - message.append(Context.getIdentityManager().getById(position.getDeviceId()).getUniqueId()); - - LOGGER.info(message.toString()); - return true; - } - - return false; - } - - @Override - protected Position handlePosition(Position position) { - if (filter(position)) { - return null; - } - return position; - } - -} -- cgit v1.2.3