diff options
Diffstat (limited to 'src')
4 files changed, 86 insertions, 39 deletions
diff --git a/src/main/java/org/traccar/ProcessingHandler.java b/src/main/java/org/traccar/ProcessingHandler.java index 7627c719b..09ec79f9d 100644 --- a/src/main/java/org/traccar/ProcessingHandler.java +++ b/src/main/java/org/traccar/ProcessingHandler.java @@ -26,6 +26,7 @@ import org.traccar.handler.BasePositionHandler; import org.traccar.handler.ComputedAttributesHandler; import org.traccar.handler.CopyAttributesHandler; import org.traccar.handler.DatabaseHandler; +import org.traccar.handler.PostProcessHandler; import org.traccar.handler.DistanceHandler; import org.traccar.handler.EngineHoursHandler; import org.traccar.handler.FilterHandler; @@ -65,6 +66,7 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { private final NotificationManager notificationManager; private final List<BasePositionHandler> positionHandlers; private final List<BaseEventHandler> eventHandlers; + private final PostProcessHandler postProcessHandler; @Inject public ProcessingHandler(Injector injector, NotificationManager notificationManager) { @@ -104,14 +106,17 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { .map((clazz) -> (BaseEventHandler) injector.getInstance(clazz)) .filter(Objects::nonNull) .collect(Collectors.toUnmodifiableList()); + + postProcessHandler = injector.getInstance(PostProcessHandler.class); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Position) { processPositionHandlers(ctx, (Position) msg); + } else { + super.channelRead(ctx, msg); } - super.channelRead(ctx, msg); } private void processPositionHandlers(ChannelHandlerContext ctx, Position position) { @@ -139,7 +144,10 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter { } private void finishedProcessing(ChannelHandlerContext ctx, Position position) { - ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); + postProcessHandler.handlePosition(position, p -> { + ctx.fireChannelRead(p); + ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p)); + }); } } diff --git a/src/main/java/org/traccar/handler/DatabaseHandler.java b/src/main/java/org/traccar/handler/DatabaseHandler.java index b1f218a1e..0c8d2717d 100644 --- a/src/main/java/org/traccar/handler/DatabaseHandler.java +++ b/src/main/java/org/traccar/handler/DatabaseHandler.java @@ -18,6 +18,7 @@ package org.traccar.handler; import jakarta.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.traccar.database.StatisticsManager; import org.traccar.model.Position; import org.traccar.storage.Storage; import org.traccar.storage.query.Columns; @@ -28,10 +29,12 @@ public class DatabaseHandler extends BasePositionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHandler.class); private final Storage storage; + private final StatisticsManager statisticsManager; @Inject - public DatabaseHandler(Storage storage) { + public DatabaseHandler(Storage storage, StatisticsManager statisticsManager) { this.storage = storage; + this.statisticsManager = statisticsManager; } @Override @@ -39,6 +42,7 @@ public class DatabaseHandler extends BasePositionHandler { try { position.setId(storage.addObject(position, new Request(new Columns.Exclude("id")))); + statisticsManager.messageStoredCount(position.getDeviceId()); } catch (Exception error) { LOGGER.warn("Failed to store position", error); } diff --git a/src/main/java/org/traccar/handler/PostProcessHandler.java b/src/main/java/org/traccar/handler/PostProcessHandler.java new file mode 100644 index 000000000..2d6bc03ad --- /dev/null +++ b/src/main/java/org/traccar/handler/PostProcessHandler.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.helper.model.PositionUtil; +import org.traccar.model.Device; +import org.traccar.model.Position; +import org.traccar.session.ConnectionManager; +import org.traccar.session.cache.CacheManager; +import org.traccar.storage.Storage; +import org.traccar.storage.StorageException; +import org.traccar.storage.query.Columns; +import org.traccar.storage.query.Condition; +import org.traccar.storage.query.Request; + +public class PostProcessHandler extends BasePositionHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostProcessHandler.class); + + private final CacheManager cacheManager; + private final Storage storage; + private final ConnectionManager connectionManager; + + @Inject + public PostProcessHandler(CacheManager cacheManager, Storage storage, ConnectionManager connectionManager) { + this.cacheManager = cacheManager; + this.storage = storage; + this.connectionManager = connectionManager; + } + + @Override + public void handlePosition(Position position, Callback callback) { + try { + if (PositionUtil.isLatest(cacheManager, position)) { + Device updatedDevice = new Device(); + updatedDevice.setId(position.getDeviceId()); + updatedDevice.setPositionId(position.getId()); + storage.updateObject(updatedDevice, new Request( + new Columns.Include("positionId"), + new Condition.Equals("id", updatedDevice.getId()))); + + cacheManager.updatePosition(position); + connectionManager.updatePosition(true, position); + } + } catch (StorageException error) { + LOGGER.warn("Failed to update device", error); + } + callback.processed(position); + } + +} diff --git a/src/main/java/org/traccar/handler/network/MainEventHandler.java b/src/main/java/org/traccar/handler/network/MainEventHandler.java index 901036d44..d6407f1e5 100644 --- a/src/main/java/org/traccar/handler/network/MainEventHandler.java +++ b/src/main/java/org/traccar/handler/network/MainEventHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 - 2022 Anton Tananaev (anton@traccar.org) + * Copyright 2012 - 2024 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,28 +22,21 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.socket.DatagramChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.timeout.IdleStateEvent; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.traccar.BasePipelineFactory; import org.traccar.BaseProtocolDecoder; import org.traccar.config.Config; import org.traccar.config.Keys; -import org.traccar.database.StatisticsManager; import org.traccar.helper.DateUtil; import org.traccar.helper.NetworkUtil; -import org.traccar.helper.model.PositionUtil; import org.traccar.model.Device; import org.traccar.model.Position; import org.traccar.session.ConnectionManager; import org.traccar.session.cache.CacheManager; -import org.traccar.storage.Storage; -import org.traccar.storage.StorageException; -import org.traccar.storage.query.Columns; -import org.traccar.storage.query.Condition; -import org.traccar.storage.query.Request; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; import java.util.Arrays; import java.util.HashSet; import java.util.LinkedHashSet; @@ -59,18 +52,13 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { private final Set<String> logAttributes = new LinkedHashSet<>(); private final CacheManager cacheManager; - private final Storage storage; private final ConnectionManager connectionManager; - private final StatisticsManager statisticsManager; @Inject public MainEventHandler( - Config config, CacheManager cacheManager, Storage storage, ConnectionManager connectionManager, - StatisticsManager statisticsManager) { + Config config, CacheManager cacheManager, ConnectionManager connectionManager) { this.cacheManager = cacheManager; - this.storage = storage; this.connectionManager = connectionManager; - this.statisticsManager = statisticsManager; String connectionlessProtocolList = config.getString(Keys.STATUS_IGNORE_OFFLINE); if (connectionlessProtocolList != null) { connectionlessProtocols.addAll(Arrays.asList(connectionlessProtocolList.split("[, ]"))); @@ -85,22 +73,6 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { Position position = (Position) msg; Device device = cacheManager.getObject(Device.class, position.getDeviceId()); - try { - if (PositionUtil.isLatest(cacheManager, position)) { - Device updatedDevice = new Device(); - updatedDevice.setId(position.getDeviceId()); - updatedDevice.setPositionId(position.getId()); - storage.updateObject(updatedDevice, new Request( - new Columns.Include("positionId"), - new Condition.Equals("id", updatedDevice.getId()))); - - cacheManager.updatePosition(position); - connectionManager.updatePosition(true, position); - } - } catch (StorageException error) { - LOGGER.warn("Failed to update device", error); - } - StringBuilder builder = new StringBuilder(); builder.append("[").append(NetworkUtil.session(ctx.channel())).append("] "); builder.append("id: ").append(device.getUniqueId()); @@ -145,10 +117,6 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { } } LOGGER.info(builder.toString()); - - statisticsManager.registerMessageStored(position.getDeviceId(), position.getProtocol()); - - ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(position)); } } |