From e7bd758824386e4f77f78ceca57675ef7934435a Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Thu, 29 Sep 2022 17:43:30 -0700 Subject: Use DB command queue --- .../java/org/traccar/database/CommandsManager.java | 62 ++++++---------------- 1 file changed, 17 insertions(+), 45 deletions(-) (limited to 'src/main/java/org/traccar/database/CommandsManager.java') diff --git a/src/main/java/org/traccar/database/CommandsManager.java b/src/main/java/org/traccar/database/CommandsManager.java index 945fb13af..d56b4d472 100644 --- a/src/main/java/org/traccar/database/CommandsManager.java +++ b/src/main/java/org/traccar/database/CommandsManager.java @@ -21,33 +21,27 @@ import org.traccar.ServerManager; import org.traccar.model.Command; import org.traccar.model.Device; import org.traccar.model.Position; +import org.traccar.model.QueuedCommand; import org.traccar.session.ConnectionManager; import org.traccar.session.DeviceSession; import org.traccar.sms.SmsManager; import org.traccar.storage.Storage; +import org.traccar.storage.StorageException; import org.traccar.storage.query.Columns; import org.traccar.storage.query.Condition; +import org.traccar.storage.query.Limit; +import org.traccar.storage.query.Order; import org.traccar.storage.query.Request; import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Singleton; -import java.util.ArrayList; import java.util.Collection; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; @Singleton public class CommandsManager { - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - private final Map> deviceQueues = new ConcurrentHashMap<>(); - private final Storage storage; private final ServerManager serverManager; private final SmsManager smsManager; @@ -86,54 +80,32 @@ public class CommandsManager { if (deviceSession != null && deviceSession.supportsLiveCommands()) { deviceSession.sendCommand(command); } else { - getDeviceQueue(deviceId).add(command); + storage.addObject(QueuedCommand.fromCommand(command), new Request(new Columns.Exclude("id"))); return false; } } return true; } - private Queue getDeviceQueue(long deviceId) { - Queue deviceQueue; - try { - lock.readLock().lock(); - deviceQueue = deviceQueues.get(deviceId); - } finally { - lock.readLock().unlock(); - } - if (deviceQueue != null) { - return deviceQueue; - } else { - try { - lock.writeLock().lock(); - return deviceQueues.computeIfAbsent(deviceId, key -> new ConcurrentLinkedQueue<>()); - } finally { - lock.writeLock().unlock(); - } - } - } - public Collection readQueuedCommands(long deviceId) { return readQueuedCommands(deviceId, Integer.MAX_VALUE); } public Collection readQueuedCommands(long deviceId, int count) { - Queue deviceQueue; try { - lock.readLock().lock(); - deviceQueue = deviceQueues.get(deviceId); - } finally { - lock.readLock().unlock(); - } - Collection result = new ArrayList<>(); - if (deviceQueue != null) { - Command command = deviceQueue.poll(); - while (command != null && result.size() < count) { - result.add(command); - command = deviceQueue.poll(); + var commands = storage.getObjects(QueuedCommand.class, new Request( + new Columns.All(), + new Condition.Equals("deviceId", "deviceId", deviceId), + new Order(false, "id"), + new Limit(count))); + for (var command : commands) { + storage.removeObject(QueuedCommand.class, new Request( + new Condition.Equals("id", "id", command.getId()))); } + return commands.stream().map(QueuedCommand::toCommand).collect(Collectors.toList()); + } catch (StorageException e) { + throw new RuntimeException(e); } - return result; } } -- cgit v1.2.3