From f84e2710e05660822633ec9e61cde44c03a42d7e Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sat, 30 Mar 2024 18:46:37 -0700 Subject: Refactor position and event handlers --- src/main/java/org/traccar/ProcessingHandler.java | 145 +++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 src/main/java/org/traccar/ProcessingHandler.java (limited to 'src/main/java/org/traccar/ProcessingHandler.java') diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java new file mode 100644 index 000000000..7627c719b --- /dev/null +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -0,0 +1,145 @@ +/* + * Copyright 2024 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar; + +import com.google.inject.Injector; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.traccar.database.NotificationManager; +import org.traccar.handler.BasePositionHandler; +import org.traccar.handler.ComputedAttributesHandler; +import org.traccar.handler.CopyAttributesHandler; +import org.traccar.handler.DatabaseHandler; +import org.traccar.handler.DistanceHandler; +import org.traccar.handler.EngineHoursHandler; +import org.traccar.handler.FilterHandler; +import org.traccar.handler.GeocoderHandler; +import org.traccar.handler.GeofenceHandler; +import org.traccar.handler.GeolocationHandler; +import org.traccar.handler.HemisphereHandler; +import org.traccar.handler.MotionHandler; +import org.traccar.handler.PositionForwardingHandler; +import org.traccar.handler.SpeedLimitHandler; +import org.traccar.handler.TimeHandler; +import org.traccar.handler.events.AlertEventHandler; +import org.traccar.handler.events.BaseEventHandler; +import org.traccar.handler.events.BehaviorEventHandler; +import org.traccar.handler.events.CommandResultEventHandler; +import org.traccar.handler.events.DriverEventHandler; +import org.traccar.handler.events.FuelEventHandler; +import org.traccar.handler.events.GeofenceEventHandler; +import org.traccar.handler.events.IgnitionEventHandler; +import org.traccar.handler.events.MaintenanceEventHandler; +import org.traccar.handler.events.MediaEventHandler; +import org.traccar.handler.events.MotionEventHandler; +import org.traccar.handler.events.OverspeedEventHandler; +import org.traccar.handler.network.AcknowledgementHandler; +import org.traccar.model.Position; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Singleton +@ChannelHandler.Sharable +public class ProcessingHandler extends ChannelInboundHandlerAdapter { + + private final NotificationManager notificationManager; + private final List positionHandlers; + private final List eventHandlers; + + @Inject + public ProcessingHandler(Injector injector, NotificationManager notificationManager) { + this.notificationManager = notificationManager; + + positionHandlers = Stream.of( + TimeHandler.class, + GeolocationHandler.class, + HemisphereHandler.class, + DistanceHandler.class, + FilterHandler.class, + GeofenceHandler.class, + GeocoderHandler.class, + SpeedLimitHandler.class, + MotionHandler.class, + EngineHoursHandler.class, + ComputedAttributesHandler.class, + CopyAttributesHandler.class, + PositionForwardingHandler.class, + DatabaseHandler.class) + .map((clazz) -> (BasePositionHandler) injector.getInstance(clazz)) + .filter(Objects::nonNull) + .collect(Collectors.toUnmodifiableList()); + + eventHandlers = Stream.of( + MediaEventHandler.class, + CommandResultEventHandler.class, + OverspeedEventHandler.class, + BehaviorEventHandler.class, + FuelEventHandler.class, + MotionEventHandler.class, + GeofenceEventHandler.class, + AlertEventHandler.class, + IgnitionEventHandler.class, + MaintenanceEventHandler.class, + DriverEventHandler.class) + .map((clazz) -> (BaseEventHandler) injector.getInstance(clazz)) + .filter(Objects::nonNull) + .collect(Collectors.toUnmodifiableList()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Position) { + processPositionHandlers(ctx, (Position) msg); + } + super.channelRead(ctx, msg); + } + + private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { + var iterator = positionHandlers.iterator(); + iterator.next().handlePosition(position, new BasePositionHandler.Callback() { + @Override + public void processed(Position position) { + if (position != null) { + if (iterator.hasNext()) { + iterator.next().handlePosition(position, this); + } else { + processEventHandlers(ctx, position); + } + } else { + finishedProcessing(ctx, null); + } + } + }); + } + + private void processEventHandlers(ChannelHandlerContext ctx, Position position) { + eventHandlers.forEach(handler -> handler.analyzePosition( + position, (event) -> notificationManager.updateEvents(Map.of(event, position)))); + finishedProcessing(ctx, position); + } + + private void finishedProcessing(ChannelHandlerContext ctx, Position position) { + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + } + +} -- cgit v1.2.3 From 05283e44dea22f7b928401f888289840581b28af Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 31 Mar 2024 07:14:34 -0700 Subject: Simplify main event handler --- src/main/java/org/traccar/ProcessingHandler.java | 12 +++- .../java/org/traccar/handler/DatabaseHandler.java | 6 +- .../org/traccar/handler/PostProcessHandler.java | 67 ++++++++++++++++++++++ .../traccar/handler/network/MainEventHandler.java | 40 ++----------- 4 files changed, 86 insertions(+), 39 deletions(-) create mode 100644 src/main/java/org/traccar/handler/PostProcessHandler.java (limited to 'src/main/java/org/traccar/ProcessingHandler.java') diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 7627c719b..09ec79f9d 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -26,6 +26,7 @@ import org.traccar.handler.BasePositionHandler; import org.traccar.handler.ComputedAttributesHandler; import org.traccar.handler.CopyAttributesHandler; import org.traccar.handler.DatabaseHandler; +import org.traccar.handler.PostProcessHandler; import org.traccar.handler.DistanceHandler; import org.traccar.handler.EngineHoursHandler; import org.traccar.handler.FilterHandler; @@ -65,6 +66,7 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { private final NotificationManager notificationManager; private final List positionHandlers; private final List eventHandlers; + private final PostProcessHandler postProcessHandler; @Inject public ProcessingHandler(Injector injector, NotificationManager notificationManager) { @@ -104,14 +106,17 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { .map((clazz) -> (BaseEventHandler) injector.getInstance(clazz)) .filter(Objects::nonNull) .collect(Collectors.toUnmodifiableList()); + + postProcessHandler = injector.getInstance(PostProcessHandler.class); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Position) { processPositionHandlers(ctx, (Position) msg); + } else { + super.channelRead(ctx, msg); } - super.channelRead(ctx, msg); } private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { @@ -139,7 +144,10 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { } private void finishedProcessing(ChannelHandlerContext ctx, Position position) { - ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + postProcessHandler.handlePosition(position, p -> { + ctx.fireChannelRead(p); + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); + }); } } diff --git a/src/main/java/org/traccar/handler/DatabaseHandler.java b/src/main/java/org/traccar/handler/DatabaseHandler.java index b1f218a1e..0c8d2717d 100644 --- a/src/main/java/org/traccar/handler/DatabaseHandler.java +++ b/src/main/java/org/traccar/handler/DatabaseHandler.java @@ -18,6 +18,7 @@ package org.traccar.handler; import jakarta.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.traccar.database.StatisticsManager; import org.traccar.model.Position; import org.traccar.storage.Storage; import org.traccar.storage.query.Columns; @@ -28,10 +29,12 @@ public class DatabaseHandler extends BasePositionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHandler.class); private final Storage storage; + private final StatisticsManager statisticsManager; @Inject - public DatabaseHandler(Storage storage) { + public DatabaseHandler(Storage storage, StatisticsManager statisticsManager) { this.storage = storage; + this.statisticsManager = statisticsManager; } @Override @@ -39,6 +42,7 @@ public class DatabaseHandler extends BasePositionHandler { try { position.setId(storage.addObject(position, new Request(new Columns.Exclude("id")))); + statisticsManager.messageStoredCount(position.getDeviceId()); } catch (Exception error) { LOGGER.warn("Failed to store position", error); } diff --git a/src/main/java/org/traccar/handler/PostProcessHandler.java b/src/main/java/org/traccar/handler/PostProcessHandler.java new file mode 100644 index 000000000..2d6bc03ad --- /dev/null +++ b/src/main/java/org/traccar/handler/PostProcessHandler.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.helper.model.PositionUtil; +import org.traccar.model.Device; +import org.traccar.model.Position; +import org.traccar.session.ConnectionManager; +import org.traccar.session.cache.CacheManager; +import org.traccar.storage.Storage; +import org.traccar.storage.StorageException; +import org.traccar.storage.query.Columns; +import org.traccar.storage.query.Condition; +import org.traccar.storage.query.Request; + +public class PostProcessHandler extends BasePositionHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostProcessHandler.class); + + private final CacheManager cacheManager; + private final Storage storage; + private final ConnectionManager connectionManager; + + @Inject + public PostProcessHandler(CacheManager cacheManager, Storage storage, ConnectionManager connectionManager) { + this.cacheManager = cacheManager; + this.storage = storage; + this.connectionManager = connectionManager; + } + + @Override + public void handlePosition(Position position, Callback callback) { + try { + if (PositionUtil.isLatest(cacheManager, position)) { + Device updatedDevice = new Device(); + updatedDevice.setId(position.getDeviceId()); + updatedDevice.setPositionId(position.getId()); + storage.updateObject(updatedDevice, new Request( + new Columns.Include("positionId"), + new Condition.Equals("id", updatedDevice.getId()))); + + cacheManager.updatePosition(position); + connectionManager.updatePosition(true, position); + } + } catch (StorageException error) { + LOGGER.warn("Failed to update device", error); + } + callback.processed(position); + } + +} diff --git a/src/main/java/org/traccar/handler/network/MainEventHandler.java b/src/main/java/org/traccar/handler/network/MainEventHandler.java index 901036d44..d6407f1e5 100644 --- a/src/main/java/org/traccar/handler/network/MainEventHandler.java +++ b/src/main/java/org/traccar/handler/network/MainEventHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 - 2022 Anton Tananaev (anton@traccar.org) + * Copyright 2012 - 2024 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,28 +22,21 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.socket.DatagramChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.timeout.IdleStateEvent; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.traccar.BasePipelineFactory; import org.traccar.BaseProtocolDecoder; import org.traccar.config.Config; import org.traccar.config.Keys; -import org.traccar.database.StatisticsManager; import org.traccar.helper.DateUtil; import org.traccar.helper.NetworkUtil; -import org.traccar.helper.model.PositionUtil; import org.traccar.model.Device; import org.traccar.model.Position; import org.traccar.session.ConnectionManager; import org.traccar.session.cache.CacheManager; -import org.traccar.storage.Storage; -import org.traccar.storage.StorageException; -import org.traccar.storage.query.Columns; -import org.traccar.storage.query.Condition; -import org.traccar.storage.query.Request; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; import java.util.Arrays; import java.util.HashSet; import java.util.LinkedHashSet; @@ -59,18 +52,13 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { private final Set logAttributes = new LinkedHashSet<>(); private final CacheManager cacheManager; - private final Storage storage; private final ConnectionManager connectionManager; - private final StatisticsManager statisticsManager; @Inject public MainEventHandler( - Config config, CacheManager cacheManager, Storage storage, ConnectionManager connectionManager, - StatisticsManager statisticsManager) { + Config config, CacheManager cacheManager, ConnectionManager connectionManager) { this.cacheManager = cacheManager; - this.storage = storage; this.connectionManager = connectionManager; - this.statisticsManager = statisticsManager; String connectionlessProtocolList = config.getString(Keys.STATUS_IGNORE_OFFLINE); if (connectionlessProtocolList != null) { connectionlessProtocols.addAll(Arrays.asList(connectionlessProtocolList.split("[, ]"))); @@ -85,22 +73,6 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { Position position = (Position) msg; Device device = cacheManager.getObject(Device.class, position.getDeviceId()); - try { - if (PositionUtil.isLatest(cacheManager, position)) { - Device updatedDevice = new Device(); - updatedDevice.setId(position.getDeviceId()); - updatedDevice.setPositionId(position.getId()); - storage.updateObject(updatedDevice, new Request( - new Columns.Include("positionId"), - new Condition.Equals("id", updatedDevice.getId()))); - - cacheManager.updatePosition(position); - connectionManager.updatePosition(true, position); - } - } catch (StorageException error) { - LOGGER.warn("Failed to update device", error); - } - StringBuilder builder = new StringBuilder(); builder.append("[").append(NetworkUtil.session(ctx.channel())).append("] "); builder.append("id: ").append(device.getUniqueId()); @@ -145,10 +117,6 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { } } LOGGER.info(builder.toString()); - - statisticsManager.registerMessageStored(position.getDeviceId(), position.getProtocol()); - - ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); } } -- cgit v1.2.3 From 301a643d62ccce053a1eaf10e5516a3d5bcc815f Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 31 Mar 2024 08:13:18 -0700 Subject: Extract position logger --- src/main/java/org/traccar/ProcessingHandler.java | 8 +- .../traccar/handler/network/MainEventHandler.java | 69 +--------------- .../java/org/traccar/helper/PositionLogger.java | 94 ++++++++++++++++++++++ 3 files changed, 102 insertions(+), 69 deletions(-) create mode 100644 src/main/java/org/traccar/helper/PositionLogger.java (limited to 'src/main/java/org/traccar/ProcessingHandler.java') diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 09ec79f9d..688389d98 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -51,6 +51,7 @@ import org.traccar.handler.events.MediaEventHandler; import org.traccar.handler.events.MotionEventHandler; import org.traccar.handler.events.OverspeedEventHandler; import org.traccar.handler.network.AcknowledgementHandler; +import org.traccar.helper.PositionLogger; import org.traccar.model.Position; import java.util.List; @@ -64,13 +65,16 @@ import java.util.stream.Stream; public class ProcessingHandler extends ChannelInboundHandlerAdapter { private final NotificationManager notificationManager; + private final PositionLogger positionLogger; private final List positionHandlers; private final List eventHandlers; private final PostProcessHandler postProcessHandler; @Inject - public ProcessingHandler(Injector injector, NotificationManager notificationManager) { + public ProcessingHandler( + Injector injector, NotificationManager notificationManager, PositionLogger positionLogger) { this.notificationManager = notificationManager; + this.positionLogger = positionLogger; positionHandlers = Stream.of( TimeHandler.class, @@ -145,7 +149,7 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { private void finishedProcessing(ChannelHandlerContext ctx, Position position) { postProcessHandler.handlePosition(position, p -> { - ctx.fireChannelRead(p); + positionLogger.log(ctx, p); ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); }); } diff --git a/src/main/java/org/traccar/handler/network/MainEventHandler.java b/src/main/java/org/traccar/handler/network/MainEventHandler.java index d6407f1e5..f60004126 100644 --- a/src/main/java/org/traccar/handler/network/MainEventHandler.java +++ b/src/main/java/org/traccar/handler/network/MainEventHandler.java @@ -30,16 +30,11 @@ import org.traccar.BasePipelineFactory; import org.traccar.BaseProtocolDecoder; import org.traccar.config.Config; import org.traccar.config.Keys; -import org.traccar.helper.DateUtil; import org.traccar.helper.NetworkUtil; -import org.traccar.model.Device; -import org.traccar.model.Position; import org.traccar.session.ConnectionManager; -import org.traccar.session.cache.CacheManager; import java.util.Arrays; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.Set; @Singleton @@ -48,76 +43,16 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(MainEventHandler.class); - private final Set connectionlessProtocols = new HashSet<>(); - private final Set logAttributes = new LinkedHashSet<>(); - - private final CacheManager cacheManager; private final ConnectionManager connectionManager; + private final Set connectionlessProtocols = new HashSet<>(); @Inject - public MainEventHandler( - Config config, CacheManager cacheManager, ConnectionManager connectionManager) { - this.cacheManager = cacheManager; + public MainEventHandler(Config config, ConnectionManager connectionManager) { this.connectionManager = connectionManager; String connectionlessProtocolList = config.getString(Keys.STATUS_IGNORE_OFFLINE); if (connectionlessProtocolList != null) { connectionlessProtocols.addAll(Arrays.asList(connectionlessProtocolList.split("[, ]"))); } - logAttributes.addAll(Arrays.asList(config.getString(Keys.LOGGER_ATTRIBUTES).split("[, ]"))); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof Position) { - - Position position = (Position) msg; - Device device = cacheManager.getObject(Device.class, position.getDeviceId()); - - StringBuilder builder = new StringBuilder(); - builder.append("[").append(NetworkUtil.session(ctx.channel())).append("] "); - builder.append("id: ").append(device.getUniqueId()); - for (String attribute : logAttributes) { - switch (attribute) { - case "time": - builder.append(", time: ").append(DateUtil.formatDate(position.getFixTime(), false)); - break; - case "position": - builder.append(", lat: ").append(String.format("%.5f", position.getLatitude())); - builder.append(", lon: ").append(String.format("%.5f", position.getLongitude())); - break; - case "speed": - if (position.getSpeed() > 0) { - builder.append(", speed: ").append(String.format("%.1f", position.getSpeed())); - } - break; - case "course": - builder.append(", course: ").append(String.format("%.1f", position.getCourse())); - break; - case "accuracy": - if (position.getAccuracy() > 0) { - builder.append(", accuracy: ").append(String.format("%.1f", position.getAccuracy())); - } - break; - case "outdated": - if (position.getOutdated()) { - builder.append(", outdated"); - } - break; - case "invalid": - if (!position.getValid()) { - builder.append(", invalid"); - } - break; - default: - Object value = position.getAttributes().get(attribute); - if (value != null) { - builder.append(", ").append(attribute).append(": ").append(value); - } - break; - } - } - LOGGER.info(builder.toString()); - } } @Override diff --git a/src/main/java/org/traccar/helper/PositionLogger.java b/src/main/java/org/traccar/helper/PositionLogger.java new file mode 100644 index 000000000..9f149edf4 --- /dev/null +++ b/src/main/java/org/traccar/helper/PositionLogger.java @@ -0,0 +1,94 @@ +/* + * Copyright 2024 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.helper; + +import io.netty.channel.ChannelHandlerContext; +import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.config.Config; +import org.traccar.config.Keys; +import org.traccar.model.Device; +import org.traccar.model.Position; +import org.traccar.session.cache.CacheManager; + +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Set; + +public class PositionLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(PositionLogger.class); + + private final CacheManager cacheManager; + private final Set logAttributes = new LinkedHashSet<>(); + + @Inject + public PositionLogger(Config config, CacheManager cacheManager) { + this.cacheManager = cacheManager; + logAttributes.addAll(Arrays.asList(config.getString(Keys.LOGGER_ATTRIBUTES).split("[, ]"))); + } + + public void log(ChannelHandlerContext context, Position position) { + Device device = cacheManager.getObject(Device.class, position.getDeviceId()); + + StringBuilder builder = new StringBuilder(); + builder.append("[").append(NetworkUtil.session(context.channel())).append("] "); + builder.append("id: ").append(device.getUniqueId()); + for (String attribute : logAttributes) { + switch (attribute) { + case "time": + builder.append(", time: ").append(DateUtil.formatDate(position.getFixTime(), false)); + break; + case "position": + builder.append(", lat: ").append(String.format("%.5f", position.getLatitude())); + builder.append(", lon: ").append(String.format("%.5f", position.getLongitude())); + break; + case "speed": + if (position.getSpeed() > 0) { + builder.append(", speed: ").append(String.format("%.1f", position.getSpeed())); + } + break; + case "course": + builder.append(", course: ").append(String.format("%.1f", position.getCourse())); + break; + case "accuracy": + if (position.getAccuracy() > 0) { + builder.append(", accuracy: ").append(String.format("%.1f", position.getAccuracy())); + } + break; + case "outdated": + if (position.getOutdated()) { + builder.append(", outdated"); + } + break; + case "invalid": + if (!position.getValid()) { + builder.append(", invalid"); + } + break; + default: + Object value = position.getAttributes().get(attribute); + if (value != null) { + builder.append(", ").append(attribute).append(": ").append(value); + } + break; + } + } + LOGGER.info(builder.toString()); + } + +} -- cgit v1.2.3 From ae91b2f07e88d2dea97aa4bb316cb00116b4a702 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 31 Mar 2024 08:16:28 -0700 Subject: Implement position buffering --- debug.xml | 3 + src/main/java/org/traccar/ProcessingHandler.java | 17 +++- src/main/java/org/traccar/config/Keys.java | 8 ++ .../org/traccar/database/BufferingManager.java | 103 +++++++++++++++++++++ 4 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 src/main/java/org/traccar/database/BufferingManager.java (limited to 'src/main/java/org/traccar/ProcessingHandler.java') diff --git a/debug.xml b/debug.xml index 028e9210b..f3abbe9a9 100644 --- a/debug.xml +++ b/debug.xml @@ -25,6 +25,9 @@ sa + + true 6037 diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 688389d98..cbb375a72 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -21,12 +21,13 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.traccar.config.Config; +import org.traccar.database.BufferingManager; import org.traccar.database.NotificationManager; import org.traccar.handler.BasePositionHandler; import org.traccar.handler.ComputedAttributesHandler; import org.traccar.handler.CopyAttributesHandler; import org.traccar.handler.DatabaseHandler; -import org.traccar.handler.PostProcessHandler; import org.traccar.handler.DistanceHandler; import org.traccar.handler.EngineHoursHandler; import org.traccar.handler.FilterHandler; @@ -36,6 +37,7 @@ import org.traccar.handler.GeolocationHandler; import org.traccar.handler.HemisphereHandler; import org.traccar.handler.MotionHandler; import org.traccar.handler.PositionForwardingHandler; +import org.traccar.handler.PostProcessHandler; import org.traccar.handler.SpeedLimitHandler; import org.traccar.handler.TimeHandler; import org.traccar.handler.events.AlertEventHandler; @@ -62,19 +64,21 @@ import java.util.stream.Stream; @Singleton @ChannelHandler.Sharable -public class ProcessingHandler extends ChannelInboundHandlerAdapter { +public class ProcessingHandler extends ChannelInboundHandlerAdapter implements BufferingManager.Callback { private final NotificationManager notificationManager; private final PositionLogger positionLogger; + private final BufferingManager bufferingManager; private final List positionHandlers; private final List eventHandlers; private final PostProcessHandler postProcessHandler; @Inject public ProcessingHandler( - Injector injector, NotificationManager notificationManager, PositionLogger positionLogger) { + Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) { this.notificationManager = notificationManager; this.positionLogger = positionLogger; + bufferingManager = new BufferingManager(config, this); positionHandlers = Stream.of( TimeHandler.class, @@ -117,12 +121,17 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Position) { - processPositionHandlers(ctx, (Position) msg); + bufferingManager.accept(ctx, (Position) msg); } else { super.channelRead(ctx, msg); } } + @Override + public void onReleased(ChannelHandlerContext context, Position position) { + processPositionHandlers(context, position); + } + private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { var iterator = positionHandlers.iterator(); iterator.next().handlePosition(position, new BasePositionHandler.Callback() { diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 4aacb2cd8..d346084bd 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -292,6 +292,14 @@ public final class Keys { List.of(KeyType.CONFIG, KeyType.DEVICE), false); + /** + * If not zero, enable buffering of incoming data to handle ordering locations. The value is threshold for + * buffering in milliseconds. + */ + public static final ConfigKey SERVER_BUFFERING_THRESHOLD = new LongConfigKey( + "server.buffering.threshold", + List.of(KeyType.CONFIG)); + /** * Server wide connection timeout value in seconds. See protocol timeout for more information. */ diff --git a/src/main/java/org/traccar/database/BufferingManager.java b/src/main/java/org/traccar/database/BufferingManager.java new file mode 100644 index 000000000..ab71b2860 --- /dev/null +++ b/src/main/java/org/traccar/database/BufferingManager.java @@ -0,0 +1,103 @@ +/* + * Copyright 2024 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.database; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.config.Config; +import org.traccar.config.Keys; +import org.traccar.model.Position; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +public class BufferingManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(BufferingManager.class); + + public interface Callback { + void onReleased(ChannelHandlerContext context, Position position); + } + + private static class Holder implements Comparable { + + private static final Comparator COMPARATOR = Comparator + .comparing(Position::getFixTime) + .thenComparing(Position::getDeviceTime) + .thenComparing(Position::getServerTime); + + private final ChannelHandlerContext context; + private final Position position; + private Timeout timeout; + + private Holder(ChannelHandlerContext context, Position position) { + this.context = context; + this.position = position; + } + + @Override + public int compareTo(Holder other) { + return COMPARATOR.compare(position, other.position); + } + } + + private final Timer timer = new HashedWheelTimer(); + private final Callback callback; + private final long threshold; + + private final Map> buffer = new HashMap<>(); + + public BufferingManager(Config config, Callback callback) { + this.callback = callback; + threshold = config.getLong(Keys.SERVER_BUFFERING_THRESHOLD); + } + + private Timeout scheduleTimeout(Holder holder) { + return timer.newTimeout( + timeout -> { + LOGGER.info("released {}", holder.position.getFixTime()); + buffer.get(holder.position.getDeviceId()).remove(holder); + callback.onReleased(holder.context, holder.position); + }, + threshold, TimeUnit.MILLISECONDS); + } + + public void accept(ChannelHandlerContext context, Position position) { + if (threshold > 0) { + synchronized (buffer) { + LOGGER.info("queued {}", position.getFixTime()); + var queue = buffer.computeIfAbsent(position.getDeviceId(), k -> new TreeSet<>()); + Holder holder = new Holder(context, position); + holder.timeout = scheduleTimeout(holder); + queue.add(holder); + queue.tailSet(holder).forEach(h -> { + h.timeout.cancel(); + h.timeout = scheduleTimeout(h); + }); + } + } else { + callback.onReleased(context, position); + } + } + +} -- cgit v1.2.3 From e4134911f17cbe6f556a8bc4d1b3f5073474a940 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 31 Mar 2024 13:39:13 -0700 Subject: Queue positions for processing --- src/main/java/org/traccar/ProcessingHandler.java | 29 +++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) (limited to 'src/main/java/org/traccar/ProcessingHandler.java') diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index cbb375a72..6a97b9dea 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -56,9 +56,12 @@ import org.traccar.handler.network.AcknowledgementHandler; import org.traccar.helper.PositionLogger; import org.traccar.model.Position; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -73,6 +76,12 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B private final List eventHandlers; private final PostProcessHandler postProcessHandler; + private final Map> queues = new HashMap<>(); + + private synchronized Queue getQueue(long deviceId) { + return queues.computeIfAbsent(deviceId, k -> new LinkedList<>()); + } + @Inject public ProcessingHandler( Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) { @@ -129,7 +138,15 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B @Override public void onReleased(ChannelHandlerContext context, Position position) { - processPositionHandlers(context, position); + Queue queue = getQueue(position.getDeviceId()); + boolean queued; + synchronized (queue) { + queued = !queue.isEmpty(); + queue.offer(position); + } + if (!queued) { + processPositionHandlers(context, position); + } } private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { @@ -160,6 +177,16 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B postProcessHandler.handlePosition(position, p -> { positionLogger.log(ctx, p); ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); + + Queue queue = getQueue(position.getDeviceId()); + Position nextPosition; + synchronized (queue) { + queue.poll(); // remove current position + nextPosition = queue.peek(); + } + if (nextPosition != null) { + processPositionHandlers(ctx, nextPosition); + } }); } -- cgit v1.2.3 From fc3a24148526b3d41ba40ed1f55cf40b54c6a255 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Wed, 3 Apr 2024 21:17:52 -0700 Subject: Handle outdated positions in buffer --- src/main/java/org/traccar/BaseProtocolDecoder.java | 19 +------- src/main/java/org/traccar/ProcessingHandler.java | 2 + .../org/traccar/database/BufferingManager.java | 26 +++++++--- .../java/org/traccar/handler/OutdatedHandler.java | 56 ++++++++++++++++++++++ 4 files changed, 78 insertions(+), 25 deletions(-) create mode 100644 src/main/java/org/traccar/handler/OutdatedHandler.java (limited to 'src/main/java/org/traccar/ProcessingHandler.java') diff --git a/src/main/java/org/traccar/BaseProtocolDecoder.java b/src/main/java/org/traccar/BaseProtocolDecoder.java index 495a866c0..b764e5cdf 100644 --- a/src/main/java/org/traccar/BaseProtocolDecoder.java +++ b/src/main/java/org/traccar/BaseProtocolDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 - 2022 Anton Tananaev (anton@traccar.org) + * Copyright 2012 - 2024 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -154,25 +154,8 @@ public abstract class BaseProtocolDecoder extends ExtendedObjectDecoder { public void getLastLocation(Position position, Date deviceTime) { if (position.getDeviceId() != 0) { position.setOutdated(true); - - Position last = cacheManager.getPosition(position.getDeviceId()); - if (last != null) { - position.setFixTime(last.getFixTime()); - position.setValid(last.getValid()); - position.setLatitude(last.getLatitude()); - position.setLongitude(last.getLongitude()); - position.setAltitude(last.getAltitude()); - position.setSpeed(last.getSpeed()); - position.setCourse(last.getCourse()); - position.setAccuracy(last.getAccuracy()); - } else { - position.setFixTime(new Date(0)); - } - if (deviceTime != null) { position.setDeviceTime(deviceTime); - } else { - position.setDeviceTime(new Date()); } } } diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 6a97b9dea..76a698a0a 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -36,6 +36,7 @@ import org.traccar.handler.GeofenceHandler; import org.traccar.handler.GeolocationHandler; import org.traccar.handler.HemisphereHandler; import org.traccar.handler.MotionHandler; +import org.traccar.handler.OutdatedHandler; import org.traccar.handler.PositionForwardingHandler; import org.traccar.handler.PostProcessHandler; import org.traccar.handler.SpeedLimitHandler; @@ -90,6 +91,7 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B bufferingManager = new BufferingManager(config, this); positionHandlers = Stream.of( + OutdatedHandler.class, TimeHandler.class, GeolocationHandler.class, HemisphereHandler.class, diff --git a/src/main/java/org/traccar/database/BufferingManager.java b/src/main/java/org/traccar/database/BufferingManager.java index 3e3cf587a..4d288c8d0 100644 --- a/src/main/java/org/traccar/database/BufferingManager.java +++ b/src/main/java/org/traccar/database/BufferingManager.java @@ -25,7 +25,7 @@ import org.traccar.config.Config; import org.traccar.config.Keys; import org.traccar.model.Position; -import java.util.Comparator; +import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.TreeSet; @@ -41,11 +41,6 @@ public class BufferingManager { private static final class Holder implements Comparable { - private static final Comparator COMPARATOR = Comparator - .comparing(Position::getFixTime) - .thenComparing(Position::getDeviceTime) - .thenComparing(Position::getServerTime); - private final ChannelHandlerContext context; private final Position position; private Timeout timeout; @@ -55,9 +50,26 @@ public class BufferingManager { this.position = position; } + private int compareTime(Date left, Date right) { + if (left != null && right != null) { + return left.compareTo(right); + } + return 0; + } + @Override public int compareTo(Holder other) { - return COMPARATOR.compare(position, other.position); + int fixTimeResult = compareTime(position.getFixTime(), other.position.getFixTime()); + if (fixTimeResult != 0) { + return fixTimeResult; + } + + int deviceTimeResult = compareTime(position.getDeviceTime(), other.position.getDeviceTime()); + if (deviceTimeResult != 0) { + return deviceTimeResult; + } + + return position.getServerTime().compareTo(other.position.getServerTime()); } } diff --git a/src/main/java/org/traccar/handler/OutdatedHandler.java b/src/main/java/org/traccar/handler/OutdatedHandler.java new file mode 100644 index 000000000..88f1c4a0c --- /dev/null +++ b/src/main/java/org/traccar/handler/OutdatedHandler.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import jakarta.inject.Inject; +import org.traccar.model.Position; +import org.traccar.session.cache.CacheManager; + +import java.util.Date; + +public class OutdatedHandler extends BasePositionHandler { + + private final CacheManager cacheManager; + + @Inject + public OutdatedHandler(CacheManager cacheManager) { + this.cacheManager = cacheManager; + } + + @Override + public void handlePosition(Position position, Callback callback) { + if (position.getOutdated()) { + Position last = cacheManager.getPosition(position.getDeviceId()); + if (last != null) { + position.setFixTime(last.getFixTime()); + position.setValid(last.getValid()); + position.setLatitude(last.getLatitude()); + position.setLongitude(last.getLongitude()); + position.setAltitude(last.getAltitude()); + position.setSpeed(last.getSpeed()); + position.setCourse(last.getCourse()); + position.setAccuracy(last.getAccuracy()); + } else { + position.setFixTime(new Date(315964819000L)); // gps epoch 1980-01-06 + } + if (position.getDeviceTime() == null) { + position.setDeviceTime(position.getServerTime()); + } + } + callback.processed(position); + } + +} -- cgit v1.2.3 From 9a285e59e580994dc9c3f80935f766f3dafdcd46 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sun, 7 Apr 2024 07:17:09 -0700 Subject: Fix filtering handler --- src/main/java/org/traccar/ProcessingHandler.java | 44 +++++++++++++--------- .../org/traccar/handler/BasePositionHandler.java | 2 +- .../traccar/handler/ComputedAttributesHandler.java | 2 +- .../org/traccar/handler/CopyAttributesHandler.java | 2 +- .../java/org/traccar/handler/DatabaseHandler.java | 2 +- .../java/org/traccar/handler/DistanceHandler.java | 2 +- .../org/traccar/handler/EngineHoursHandler.java | 2 +- .../java/org/traccar/handler/FilterHandler.java | 6 +-- .../java/org/traccar/handler/GeocoderHandler.java | 10 ++--- .../java/org/traccar/handler/GeofenceHandler.java | 2 +- .../org/traccar/handler/GeolocationHandler.java | 8 ++-- .../org/traccar/handler/HemisphereHandler.java | 2 +- .../java/org/traccar/handler/MotionHandler.java | 2 +- .../java/org/traccar/handler/OutdatedHandler.java | 2 +- .../traccar/handler/PositionForwardingHandler.java | 2 +- .../org/traccar/handler/PostProcessHandler.java | 4 +- .../org/traccar/handler/SpeedLimitHandler.java | 4 +- src/main/java/org/traccar/handler/TimeHandler.java | 2 +- 18 files changed, 52 insertions(+), 48 deletions(-) (limited to 'src/main/java/org/traccar/ProcessingHandler.java') diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 76a698a0a..fd048d127 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -155,15 +155,15 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B var iterator = positionHandlers.iterator(); iterator.next().handlePosition(position, new BasePositionHandler.Callback() { @Override - public void processed(Position position) { - if (position != null) { + public void processed(boolean filtered) { + if (!filtered) { if (iterator.hasNext()) { iterator.next().handlePosition(position, this); } else { processEventHandlers(ctx, position); } } else { - finishedProcessing(ctx, null); + finishedProcessing(ctx, position, true); } } }); @@ -172,24 +172,32 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B private void processEventHandlers(ChannelHandlerContext ctx, Position position) { eventHandlers.forEach(handler -> handler.analyzePosition( position, (event) -> notificationManager.updateEvents(Map.of(event, position)))); - finishedProcessing(ctx, position); + finishedProcessing(ctx, position, false); } - private void finishedProcessing(ChannelHandlerContext ctx, Position position) { - postProcessHandler.handlePosition(position, p -> { - positionLogger.log(ctx, p); - ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); + private void finishedProcessing(ChannelHandlerContext ctx, Position position, boolean filtered) { + if (!filtered) { + postProcessHandler.handlePosition(position, ignore -> { + positionLogger.log(ctx, position); + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + processNextPosition(ctx, position.getDeviceId()); + }); + } else { + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + processNextPosition(ctx, position.getDeviceId()); + } + } - Queue queue = getQueue(position.getDeviceId()); - Position nextPosition; - synchronized (queue) { - queue.poll(); // remove current position - nextPosition = queue.peek(); - } - if (nextPosition != null) { - processPositionHandlers(ctx, nextPosition); - } - }); + private void processNextPosition(ChannelHandlerContext ctx, long deviceId) { + Queue queue = getQueue(deviceId); + Position nextPosition; + synchronized (queue) { + queue.poll(); // remove current position + nextPosition = queue.peek(); + } + if (nextPosition != null) { + processPositionHandlers(ctx, nextPosition); + } } } diff --git a/src/main/java/org/traccar/handler/BasePositionHandler.java b/src/main/java/org/traccar/handler/BasePositionHandler.java index 2fee5c652..0a82e96b7 100644 --- a/src/main/java/org/traccar/handler/BasePositionHandler.java +++ b/src/main/java/org/traccar/handler/BasePositionHandler.java @@ -20,7 +20,7 @@ import org.traccar.model.Position; public abstract class BasePositionHandler { public interface Callback { - void processed(Position position); + void processed(boolean filtered); } public abstract void handlePosition(Position position, Callback callback); diff --git a/src/main/java/org/traccar/handler/ComputedAttributesHandler.java b/src/main/java/org/traccar/handler/ComputedAttributesHandler.java index 8d6fb39c3..4293bd1fc 100644 --- a/src/main/java/org/traccar/handler/ComputedAttributesHandler.java +++ b/src/main/java/org/traccar/handler/ComputedAttributesHandler.java @@ -196,7 +196,7 @@ public class ComputedAttributesHandler extends BasePositionHandler { } } } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/CopyAttributesHandler.java b/src/main/java/org/traccar/handler/CopyAttributesHandler.java index f682c99c9..c7452e58c 100644 --- a/src/main/java/org/traccar/handler/CopyAttributesHandler.java +++ b/src/main/java/org/traccar/handler/CopyAttributesHandler.java @@ -44,7 +44,7 @@ public class CopyAttributesHandler extends BasePositionHandler { } } } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/DatabaseHandler.java b/src/main/java/org/traccar/handler/DatabaseHandler.java index 0c8d2717d..5d96ebb34 100644 --- a/src/main/java/org/traccar/handler/DatabaseHandler.java +++ b/src/main/java/org/traccar/handler/DatabaseHandler.java @@ -47,7 +47,7 @@ public class DatabaseHandler extends BasePositionHandler { LOGGER.warn("Failed to store position", error); } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/DistanceHandler.java b/src/main/java/org/traccar/handler/DistanceHandler.java index ee5d64894..e8ae7753a 100644 --- a/src/main/java/org/traccar/handler/DistanceHandler.java +++ b/src/main/java/org/traccar/handler/DistanceHandler.java @@ -71,7 +71,7 @@ public class DistanceHandler extends BasePositionHandler { position.set(Position.KEY_DISTANCE, distance); position.set(Position.KEY_TOTAL_DISTANCE, totalDistance + distance); - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/EngineHoursHandler.java b/src/main/java/org/traccar/handler/EngineHoursHandler.java index ed5f9b509..5aae6f673 100644 --- a/src/main/java/org/traccar/handler/EngineHoursHandler.java +++ b/src/main/java/org/traccar/handler/EngineHoursHandler.java @@ -43,7 +43,7 @@ public class EngineHoursHandler extends BasePositionHandler { } } } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/FilterHandler.java b/src/main/java/org/traccar/handler/FilterHandler.java index a9e6024c8..796c302fb 100644 --- a/src/main/java/org/traccar/handler/FilterHandler.java +++ b/src/main/java/org/traccar/handler/FilterHandler.java @@ -270,11 +270,7 @@ public class FilterHandler extends BasePositionHandler { @Override public void handlePosition(Position position, Callback callback) { - if (filter(position)) { - callback.processed(null); - } else { - callback.processed(position); - } + callback.processed(filter(position)); } } diff --git a/src/main/java/org/traccar/handler/GeocoderHandler.java b/src/main/java/org/traccar/handler/GeocoderHandler.java index c62bcb6f8..b84237856 100644 --- a/src/main/java/org/traccar/handler/GeocoderHandler.java +++ b/src/main/java/org/traccar/handler/GeocoderHandler.java @@ -44,7 +44,7 @@ public class GeocoderHandler extends BasePositionHandler { @Override public void handlePosition(Position position, Callback callback) { if (!ignorePositions) { - callback.processed(position); + callback.processed(false); } if (processInvalidPositions || position.getValid()) { @@ -53,7 +53,7 @@ public class GeocoderHandler extends BasePositionHandler { if (lastPosition != null && lastPosition.getAddress() != null && position.getDouble(Position.KEY_DISTANCE) <= reuseDistance) { position.setAddress(lastPosition.getAddress()); - callback.processed(position); + callback.processed(false); return; } } @@ -63,17 +63,17 @@ public class GeocoderHandler extends BasePositionHandler { @Override public void onSuccess(String address) { position.setAddress(address); - callback.processed(position); + callback.processed(false); } @Override public void onFailure(Throwable e) { LOGGER.warn("Geocoding failed", e); - callback.processed(position); + callback.processed(false); } }); } else { - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/GeofenceHandler.java b/src/main/java/org/traccar/handler/GeofenceHandler.java index 33b46f058..8b363057a 100644 --- a/src/main/java/org/traccar/handler/GeofenceHandler.java +++ b/src/main/java/org/traccar/handler/GeofenceHandler.java @@ -41,7 +41,7 @@ public class GeofenceHandler extends BasePositionHandler { if (!geofenceIds.isEmpty()) { position.setGeofenceIds(geofenceIds); } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/GeolocationHandler.java b/src/main/java/org/traccar/handler/GeolocationHandler.java index cb9c04808..c46bd3250 100644 --- a/src/main/java/org/traccar/handler/GeolocationHandler.java +++ b/src/main/java/org/traccar/handler/GeolocationHandler.java @@ -57,7 +57,7 @@ public class GeolocationHandler extends BasePositionHandler { updatePosition( position, lastPosition.getLatitude(), lastPosition.getLongitude(), lastPosition.getAccuracy()); - callback.processed(position); + callback.processed(false); return; } } @@ -71,17 +71,17 @@ public class GeolocationHandler extends BasePositionHandler { @Override public void onSuccess(double latitude, double longitude, double accuracy) { updatePosition(position, latitude, longitude, accuracy); - callback.processed(position); + callback.processed(false); } @Override public void onFailure(Throwable e) { LOGGER.warn("Geolocation network error", e); - callback.processed(position); + callback.processed(false); } }); } else { - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/HemisphereHandler.java b/src/main/java/org/traccar/handler/HemisphereHandler.java index 6b64177e4..48929538f 100644 --- a/src/main/java/org/traccar/handler/HemisphereHandler.java +++ b/src/main/java/org/traccar/handler/HemisphereHandler.java @@ -53,7 +53,7 @@ public class HemisphereHandler extends BasePositionHandler { if (longitudeFactor != 0) { position.setLongitude(Math.abs(position.getLongitude()) * longitudeFactor); } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/MotionHandler.java b/src/main/java/org/traccar/handler/MotionHandler.java index bb7ff2a65..804ffdc25 100644 --- a/src/main/java/org/traccar/handler/MotionHandler.java +++ b/src/main/java/org/traccar/handler/MotionHandler.java @@ -38,7 +38,7 @@ public class MotionHandler extends BasePositionHandler { cacheManager, Keys.EVENT_MOTION_SPEED_THRESHOLD, position.getDeviceId()); position.set(Position.KEY_MOTION, position.getSpeed() > threshold); } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/OutdatedHandler.java b/src/main/java/org/traccar/handler/OutdatedHandler.java index 88f1c4a0c..536440f66 100644 --- a/src/main/java/org/traccar/handler/OutdatedHandler.java +++ b/src/main/java/org/traccar/handler/OutdatedHandler.java @@ -50,7 +50,7 @@ public class OutdatedHandler extends BasePositionHandler { position.setDeviceTime(position.getServerTime()); } } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/PositionForwardingHandler.java b/src/main/java/org/traccar/handler/PositionForwardingHandler.java index be62fff37..8512d4552 100644 --- a/src/main/java/org/traccar/handler/PositionForwardingHandler.java +++ b/src/main/java/org/traccar/handler/PositionForwardingHandler.java @@ -131,7 +131,7 @@ public class PositionForwardingHandler extends BasePositionHandler { positionData.setDevice(cacheManager.getObject(Device.class, position.getDeviceId())); new AsyncRequestAndCallback(positionData).send(); } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/PostProcessHandler.java b/src/main/java/org/traccar/handler/PostProcessHandler.java index e1d833e46..5b1b2ef86 100644 --- a/src/main/java/org/traccar/handler/PostProcessHandler.java +++ b/src/main/java/org/traccar/handler/PostProcessHandler.java @@ -47,7 +47,7 @@ public class PostProcessHandler extends BasePositionHandler { @Override public void handlePosition(Position position, Callback callback) { try { - if (position != null && PositionUtil.isLatest(cacheManager, position)) { + if (PositionUtil.isLatest(cacheManager, position)) { Device updatedDevice = new Device(); updatedDevice.setId(position.getDeviceId()); updatedDevice.setPositionId(position.getId()); @@ -61,7 +61,7 @@ public class PostProcessHandler extends BasePositionHandler { } catch (StorageException error) { LOGGER.warn("Failed to update device", error); } - callback.processed(position); + callback.processed(false); } } diff --git a/src/main/java/org/traccar/handler/SpeedLimitHandler.java b/src/main/java/org/traccar/handler/SpeedLimitHandler.java index 604c10ca7..4c0922d01 100644 --- a/src/main/java/org/traccar/handler/SpeedLimitHandler.java +++ b/src/main/java/org/traccar/handler/SpeedLimitHandler.java @@ -40,13 +40,13 @@ public class SpeedLimitHandler extends BasePositionHandler { @Override public void onSuccess(double speedLimit) { position.set(Position.KEY_SPEED_LIMIT, speedLimit); - callback.processed(position); + callback.processed(false); } @Override public void onFailure(Throwable e) { LOGGER.warn("Speed limit provider failed", e); - callback.processed(position); + callback.processed(false); } }); } diff --git a/src/main/java/org/traccar/handler/TimeHandler.java b/src/main/java/org/traccar/handler/TimeHandler.java index 052ad41c3..f6c67bb76 100644 --- a/src/main/java/org/traccar/handler/TimeHandler.java +++ b/src/main/java/org/traccar/handler/TimeHandler.java @@ -51,7 +51,7 @@ public class TimeHandler extends BasePositionHandler { position.setFixTime(position.getDeviceTime()); } } - callback.processed(position); + callback.processed(false); } } -- cgit v1.2.3