From 563243a8da888244e910a4a7a10fb86ad525fdd4 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sat, 23 Feb 2019 15:23:35 -0800 Subject: Handler refactoring --- .../traccar/handler/ComputedAttributesHandler.java | 129 +++++++++++++ src/org/traccar/handler/CopyAttributesHandler.java | 54 ++++++ src/org/traccar/handler/FilterHandler.java | 206 +++++++++++++++++++++ src/org/traccar/handler/NetworkMessageHandler.java | 57 ++++++ src/org/traccar/handler/OpenChannelHandler.java | 42 +++++ .../traccar/handler/StandardLoggingHandler.java | 81 ++++++++ 6 files changed, 569 insertions(+) create mode 100644 src/org/traccar/handler/ComputedAttributesHandler.java create mode 100644 src/org/traccar/handler/CopyAttributesHandler.java create mode 100644 src/org/traccar/handler/FilterHandler.java create mode 100644 src/org/traccar/handler/NetworkMessageHandler.java create mode 100644 src/org/traccar/handler/OpenChannelHandler.java create mode 100644 src/org/traccar/handler/StandardLoggingHandler.java (limited to 'src/org/traccar/handler') diff --git a/src/org/traccar/handler/ComputedAttributesHandler.java b/src/org/traccar/handler/ComputedAttributesHandler.java new file mode 100644 index 000000000..4b5b8c20d --- /dev/null +++ b/src/org/traccar/handler/ComputedAttributesHandler.java @@ -0,0 +1,129 @@ +/* + * Copyright 2017 Anton Tananaev (anton@traccar.org) + * Copyright 2017 Andrey Kunitsyn (andrey@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import io.netty.channel.ChannelHandler; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.jexl2.JexlException; +import org.apache.commons.jexl2.MapContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.BaseDataHandler; +import org.traccar.Context; +import org.traccar.model.Attribute; +import org.traccar.model.Device; +import org.traccar.model.Position; + +@ChannelHandler.Sharable +public class ComputedAttributesHandler extends BaseDataHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(ComputedAttributesHandler.class); + + private JexlEngine engine; + + private boolean mapDeviceAttributes; + + public ComputedAttributesHandler() { + engine = new JexlEngine(); + engine.setStrict(true); + engine.setFunctions(Collections.singletonMap("math", (Object) Math.class)); + if (Context.getConfig() != null) { + mapDeviceAttributes = Context.getConfig().getBoolean("handler.computedAttributes.deviceAttributes"); + } + } + + private MapContext prepareContext(Position position) { + MapContext result = new MapContext(); + if (mapDeviceAttributes) { + Device device = Context.getIdentityManager().getById(position.getDeviceId()); + if (device != null) { + for (Object key : device.getAttributes().keySet()) { + result.set((String) key, device.getAttributes().get(key)); + } + } + } + Set methods = new HashSet<>(Arrays.asList(position.getClass().getMethods())); + methods.removeAll(Arrays.asList(Object.class.getMethods())); + for (Method method : methods) { + if (method.getName().startsWith("get") && method.getParameterTypes().length == 0) { + String name = Character.toLowerCase(method.getName().charAt(3)) + method.getName().substring(4); + + try { + if (!method.getReturnType().equals(Map.class)) { + result.set(name, method.invoke(position)); + } else { + for (Object key : ((Map) method.invoke(position)).keySet()) { + result.set((String) key, ((Map) method.invoke(position)).get(key)); + } + } + } catch (IllegalAccessException | InvocationTargetException error) { + LOGGER.warn("Attribute reflection error", error); + } + } + } + return result; + } + + public Object computeAttribute(Attribute attribute, Position position) throws JexlException { + return engine.createExpression(attribute.getExpression()).evaluate(prepareContext(position)); + } + + @Override + protected Position handlePosition(Position position) { + Collection attributes = Context.getAttributesManager().getItems( + Context.getAttributesManager().getAllDeviceItems(position.getDeviceId())); + for (Attribute attribute : attributes) { + if (attribute.getAttribute() != null) { + Object result = null; + try { + result = computeAttribute(attribute, position); + } catch (JexlException error) { + LOGGER.warn("Attribute computation error", error); + } + if (result != null) { + try { + switch (attribute.getType()) { + case "number": + Number numberValue = (Number) result; + position.getAttributes().put(attribute.getAttribute(), numberValue); + break; + case "boolean": + Boolean booleanValue = (Boolean) result; + position.getAttributes().put(attribute.getAttribute(), booleanValue); + break; + default: + position.getAttributes().put(attribute.getAttribute(), result.toString()); + } + } catch (ClassCastException error) { + LOGGER.warn("Attribute cast error", error); + } + } + } + } + return position; + } + +} diff --git a/src/org/traccar/handler/CopyAttributesHandler.java b/src/org/traccar/handler/CopyAttributesHandler.java new file mode 100644 index 000000000..ce37e09cc --- /dev/null +++ b/src/org/traccar/handler/CopyAttributesHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright 2016 - 2017 Anton Tananaev (anton@traccar.org) + * Copyright 2016 - 2017 Andrey Kunitsyn (andrey@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.channel.ChannelHandler; +import org.traccar.BaseDataHandler; +import org.traccar.Context; +import org.traccar.model.Position; + +@ChannelHandler.Sharable +public class CopyAttributesHandler extends BaseDataHandler { + + private Position getLastPosition(long deviceId) { + if (Context.getIdentityManager() != null) { + return Context.getIdentityManager().getLastPosition(deviceId); + } + return null; + } + + @Override + protected Position handlePosition(Position position) { + String attributesString = Context.getDeviceManager().lookupAttributeString( + position.getDeviceId(), "handler.copyAttributes", "", true); + Position last = getLastPosition(position.getDeviceId()); + if (attributesString.isEmpty()) { + attributesString = Position.KEY_DRIVER_UNIQUE_ID; + } else { + attributesString += "," + Position.KEY_DRIVER_UNIQUE_ID; + } + if (last != null) { + for (String attribute : attributesString.split("[ ,]")) { + if (last.getAttributes().containsKey(attribute) && !position.getAttributes().containsKey(attribute)) { + position.getAttributes().put(attribute, last.getAttributes().get(attribute)); + } + } + } + return position; + } + +} diff --git a/src/org/traccar/handler/FilterHandler.java b/src/org/traccar/handler/FilterHandler.java new file mode 100644 index 000000000..dceaede01 --- /dev/null +++ b/src/org/traccar/handler/FilterHandler.java @@ -0,0 +1,206 @@ +/* + * Copyright 2014 - 2018 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.channel.ChannelHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.BaseDataHandler; +import org.traccar.Context; +import org.traccar.config.Config; +import org.traccar.config.Keys; +import org.traccar.helper.UnitsConverter; +import org.traccar.model.Position; + +@ChannelHandler.Sharable +public class FilterHandler extends BaseDataHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(FilterHandler.class); + + private boolean filterInvalid; + private boolean filterZero; + private boolean filterDuplicate; + private long filterFuture; + private boolean filterApproximate; + private int filterAccuracy; + private boolean filterStatic; + private int filterDistance; + private int filterMaxSpeed; + private long filterMinPeriod; + private long skipLimit; + private boolean skipAttributes; + + public FilterHandler(Config config) { + filterInvalid = config.getBoolean(Keys.FILTER_INVALID); + filterZero = config.getBoolean(Keys.FILTER_ZERO); + filterDuplicate = config.getBoolean(Keys.FILTER_DUPLICATE); + filterFuture = config.getLong(Keys.FILTER_FUTURE) * 1000; + filterAccuracy = config.getInteger(Keys.FILTER_ACCURACY); + filterApproximate = config.getBoolean(Keys.FILTER_APPROXIMATE); + filterStatic = config.getBoolean(Keys.FILTER_STATIC); + filterDistance = config.getInteger(Keys.FILTER_DISTANCE); + filterMaxSpeed = config.getInteger(Keys.FILTER_MAX_SPEED); + filterMinPeriod = config.getInteger(Keys.FILTER_MIN_PERIOD) * 1000; + skipLimit = config.getLong(Keys.FILTER_SKIP_LIMIT) * 1000; + skipAttributes = config.getBoolean(Keys.FILTER_SKIP_ATTRIBUTES_ENABLE); + } + + private boolean filterInvalid(Position position) { + return filterInvalid && (!position.getValid() + || position.getLatitude() > 90 || position.getLongitude() > 180 + || position.getLatitude() < -90 || position.getLongitude() < -180); + } + + private boolean filterZero(Position position) { + return filterZero && position.getLatitude() == 0.0 && position.getLongitude() == 0.0; + } + + private boolean filterDuplicate(Position position, Position last) { + if (filterDuplicate && last != null && position.getFixTime().equals(last.getFixTime())) { + for (String key : position.getAttributes().keySet()) { + if (!last.getAttributes().containsKey(key)) { + return false; + } + } + return true; + } + return false; + } + + private boolean filterFuture(Position position) { + return filterFuture != 0 && position.getFixTime().getTime() > System.currentTimeMillis() + filterFuture; + } + + private boolean filterAccuracy(Position position) { + return filterAccuracy != 0 && position.getAccuracy() > filterAccuracy; + } + + private boolean filterApproximate(Position position) { + return filterApproximate && position.getBoolean(Position.KEY_APPROXIMATE); + } + + private boolean filterStatic(Position position) { + return filterStatic && position.getSpeed() == 0.0; + } + + private boolean filterDistance(Position position, Position last) { + if (filterDistance != 0 && last != null) { + return position.getDouble(Position.KEY_DISTANCE) < filterDistance; + } + return false; + } + + private boolean filterMaxSpeed(Position position, Position last) { + if (filterMaxSpeed != 0 && last != null) { + double distance = position.getDouble(Position.KEY_DISTANCE); + double time = position.getFixTime().getTime() - last.getFixTime().getTime(); + return UnitsConverter.knotsFromMps(distance / (time / 1000)) > filterMaxSpeed; + } + return false; + } + + private boolean filterMinPeriod(Position position, Position last) { + if (filterMinPeriod != 0 && last != null) { + long time = position.getFixTime().getTime() - last.getFixTime().getTime(); + return time > 0 && time < filterMinPeriod; + } + return false; + } + + private boolean skipLimit(Position position, Position last) { + if (skipLimit != 0 && last != null) { + return (position.getServerTime().getTime() - last.getServerTime().getTime()) > skipLimit; + } + return false; + } + + private boolean skipAttributes(Position position) { + if (skipAttributes) { + String attributesString = Context.getIdentityManager().lookupAttributeString( + position.getDeviceId(), "filter.skipAttributes", "", true); + for (String attribute : attributesString.split("[ ,]")) { + if (position.getAttributes().containsKey(attribute)) { + return true; + } + } + } + return false; + } + + private boolean filter(Position position) { + + StringBuilder filterType = new StringBuilder(); + + Position last = null; + if (Context.getIdentityManager() != null) { + last = Context.getIdentityManager().getLastPosition(position.getDeviceId()); + } + + if (filterInvalid(position)) { + filterType.append("Invalid "); + } + if (filterZero(position)) { + filterType.append("Zero "); + } + if (filterDuplicate(position, last) && !skipLimit(position, last) && !skipAttributes(position)) { + filterType.append("Duplicate "); + } + if (filterFuture(position)) { + filterType.append("Future "); + } + if (filterAccuracy(position)) { + filterType.append("Accuracy "); + } + if (filterApproximate(position)) { + filterType.append("Approximate "); + } + if (filterStatic(position) && !skipLimit(position, last) && !skipAttributes(position)) { + filterType.append("Static "); + } + if (filterDistance(position, last) && !skipLimit(position, last) && !skipAttributes(position)) { + filterType.append("Distance "); + } + if (filterMaxSpeed(position, last)) { + filterType.append("MaxSpeed "); + } + if (filterMinPeriod(position, last)) { + filterType.append("MinPeriod "); + } + + if (filterType.length() > 0) { + + StringBuilder message = new StringBuilder(); + message.append("Position filtered by "); + message.append(filterType.toString()); + message.append("filters from device: "); + message.append(Context.getIdentityManager().getById(position.getDeviceId()).getUniqueId()); + + LOGGER.info(message.toString()); + return true; + } + + return false; + } + + @Override + protected Position handlePosition(Position position) { + if (filter(position)) { + return null; + } + return position; + } + +} diff --git a/src/org/traccar/handler/NetworkMessageHandler.java b/src/org/traccar/handler/NetworkMessageHandler.java new file mode 100644 index 000000000..b1d926bfa --- /dev/null +++ b/src/org/traccar/handler/NetworkMessageHandler.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import org.traccar.NetworkMessage; + +import java.net.InetSocketAddress; + +public class NetworkMessageHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (ctx.channel() instanceof DatagramChannel) { + DatagramPacket packet = (DatagramPacket) msg; + ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender())); + } else if (msg instanceof ByteBuf) { + ByteBuf buffer = (ByteBuf) msg; + ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress())); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof NetworkMessage) { + NetworkMessage message = (NetworkMessage) msg; + if (ctx.channel() instanceof DatagramChannel) { + InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress(); + InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress(); + ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise); + } else { + ctx.write(message.getMessage(), promise); + } + } else { + ctx.write(msg, promise); + } + } + +} diff --git a/src/org/traccar/handler/OpenChannelHandler.java b/src/org/traccar/handler/OpenChannelHandler.java new file mode 100644 index 000000000..d09d617ab --- /dev/null +++ b/src/org/traccar/handler/OpenChannelHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import org.traccar.TrackerServer; + +public class OpenChannelHandler extends ChannelDuplexHandler { + + private final TrackerServer server; + + public OpenChannelHandler(TrackerServer server) { + this.server = server; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + server.getChannelGroup().add(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + server.getChannelGroup().remove(ctx.channel()); + } + +} diff --git a/src/org/traccar/handler/StandardLoggingHandler.java b/src/org/traccar/handler/StandardLoggingHandler.java new file mode 100644 index 000000000..88010458f --- /dev/null +++ b/src/org/traccar/handler/StandardLoggingHandler.java @@ -0,0 +1,81 @@ +/* + * Copyright 2019 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.NetworkMessage; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class StandardLoggingHandler extends ChannelDuplexHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardLoggingHandler.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log(ctx, false, msg); + super.channelRead(ctx, msg); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log(ctx, true, msg); + super.write(ctx, msg, promise); + } + + public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { + if (o instanceof NetworkMessage) { + NetworkMessage networkMessage = (NetworkMessage) o; + if (networkMessage.getMessage() instanceof ByteBuf) { + log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); + } + } else if (o instanceof ByteBuf) { + log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); + } + } + + public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { + StringBuilder message = new StringBuilder(); + + message.append("[").append(ctx.channel().id().asShortText()).append(": "); + message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort()); + if (downstream) { + message.append(" > "); + } else { + message.append(" < "); + } + + if (remoteAddress instanceof InetSocketAddress) { + message.append(((InetSocketAddress) remoteAddress).getHostString()); + } else { + message.append("unknown"); + } + message.append("]"); + + message.append(" HEX: "); + message.append(ByteBufUtil.hexDump(buf)); + + LOGGER.info(message.toString()); + } + +} -- cgit v1.2.3