aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/PositionForwardingHandler.java
diff options
context:
space:
mode:
authorAnton Tananaev <anton@traccar.org>2022-11-13 11:48:43 -0800
committerAnton Tananaev <anton@traccar.org>2022-11-13 11:48:43 -0800
commit5877cb1b3f1fa7331c4310b9754a3ec442586497 (patch)
tree6cc3e7193412aa152c1b593bf0ad7e4119687a1e /src/main/java/org/traccar/PositionForwardingHandler.java
parentff793b2e2872f8ba076e748fab41d944f92b64d4 (diff)
downloadtrackermap-server-5877cb1b3f1fa7331c4310b9754a3ec442586497.tar.gz
trackermap-server-5877cb1b3f1fa7331c4310b9754a3ec442586497.tar.bz2
trackermap-server-5877cb1b3f1fa7331c4310b9754a3ec442586497.zip
Refactor position forwarding
Diffstat (limited to 'src/main/java/org/traccar/PositionForwardingHandler.java')
-rw-r--r--src/main/java/org/traccar/PositionForwardingHandler.java141
1 files changed, 141 insertions, 0 deletions
diff --git a/src/main/java/org/traccar/PositionForwardingHandler.java b/src/main/java/org/traccar/PositionForwardingHandler.java
new file mode 100644
index 000000000..83f91e937
--- /dev/null
+++ b/src/main/java/org/traccar/PositionForwardingHandler.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2015 - 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.traccar.config.Config;
+import org.traccar.config.Keys;
+import org.traccar.forward.PositionData;
+import org.traccar.forward.PositionForwarder;
+import org.traccar.forward.ResultHandler;
+import org.traccar.model.Device;
+import org.traccar.model.Position;
+import org.traccar.session.cache.CacheManager;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Singleton
+@ChannelHandler.Sharable
+public class PositionForwardingHandler extends BaseDataHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PositionForwardingHandler.class);
+
+ private final CacheManager cacheManager;
+ private final Timer timer;
+
+ private final PositionForwarder positionForwarder;
+
+ private final boolean retryEnabled;
+ private final int retryDelay;
+ private final int retryCount;
+ private final int retryLimit;
+
+ private final AtomicInteger deliveryPending;
+
+ @Inject
+ public PositionForwardingHandler(
+ Config config, CacheManager cacheManager, Timer timer, @Nullable PositionForwarder positionForwarder) {
+
+ this.cacheManager = cacheManager;
+ this.timer = timer;
+ this.positionForwarder = positionForwarder;
+
+ this.retryEnabled = config.getBoolean(Keys.FORWARD_RETRY_ENABLE);
+ this.retryDelay = config.getInteger(Keys.FORWARD_RETRY_DELAY);
+ this.retryCount = config.getInteger(Keys.FORWARD_RETRY_COUNT);
+ this.retryLimit = config.getInteger(Keys.FORWARD_RETRY_LIMIT);
+
+ this.deliveryPending = new AtomicInteger();
+ }
+
+ class AsyncRequestAndCallback implements ResultHandler, TimerTask {
+
+ private final PositionData positionData;
+
+ private int retries = 0;
+
+ AsyncRequestAndCallback(PositionData positionData) {
+ this.positionData = positionData;
+ deliveryPending.incrementAndGet();
+ }
+
+ private void send() {
+ positionForwarder.forward(positionData, this);
+ }
+
+ private void retry(Throwable throwable) {
+ boolean scheduled = false;
+ try {
+ if (retryEnabled && deliveryPending.get() <= retryLimit && retries < retryCount) {
+ schedule();
+ scheduled = true;
+ }
+ } finally {
+ int pending = scheduled ? deliveryPending.get() : deliveryPending.decrementAndGet();
+ LOGGER.warn("Position forwarding failed: " + pending + " pending", throwable);
+ }
+ }
+
+ private void schedule() {
+ timer.newTimeout(this, retryDelay * (long) Math.pow(2, retries++), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void onResult(boolean success, Throwable throwable) {
+ if (success) {
+ deliveryPending.decrementAndGet();
+ } else {
+ retry(throwable);
+ }
+ }
+
+ @Override
+ public void run(Timeout timeout) {
+ boolean sent = false;
+ try {
+ if (!timeout.isCancelled()) {
+ send();
+ sent = true;
+ }
+ } finally {
+ if (!sent) {
+ deliveryPending.decrementAndGet();
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Position handlePosition(Position position) {
+ if (positionForwarder != null) {
+ PositionData positionData = new PositionData();
+ positionData.setPosition(position);
+ positionData.setDevice(cacheManager.getObject(Device.class, position.getDeviceId()));
+ new AsyncRequestAndCallback(positionData).send();
+ }
+ return position;
+ }
+
+}