aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/database/CommandsManager.java
diff options
context:
space:
mode:
authorAnton Tananaev <anton@traccar.org>2022-09-29 17:43:30 -0700
committerAnton Tananaev <anton@traccar.org>2022-09-29 17:43:30 -0700
commite7bd758824386e4f77f78ceca57675ef7934435a (patch)
treed88f9129447599bd87c8d24de0eade8af0a9548e /src/main/java/org/traccar/database/CommandsManager.java
parent7246644f393f9a373f4e3f2be846500e8456a286 (diff)
downloadtrackermap-server-e7bd758824386e4f77f78ceca57675ef7934435a.tar.gz
trackermap-server-e7bd758824386e4f77f78ceca57675ef7934435a.tar.bz2
trackermap-server-e7bd758824386e4f77f78ceca57675ef7934435a.zip
Use DB command queue
Diffstat (limited to 'src/main/java/org/traccar/database/CommandsManager.java')
-rw-r--r--src/main/java/org/traccar/database/CommandsManager.java62
1 files changed, 17 insertions, 45 deletions
diff --git a/src/main/java/org/traccar/database/CommandsManager.java b/src/main/java/org/traccar/database/CommandsManager.java
index 945fb13af..d56b4d472 100644
--- a/src/main/java/org/traccar/database/CommandsManager.java
+++ b/src/main/java/org/traccar/database/CommandsManager.java
@@ -21,33 +21,27 @@ import org.traccar.ServerManager;
import org.traccar.model.Command;
import org.traccar.model.Device;
import org.traccar.model.Position;
+import org.traccar.model.QueuedCommand;
import org.traccar.session.ConnectionManager;
import org.traccar.session.DeviceSession;
import org.traccar.sms.SmsManager;
import org.traccar.storage.Storage;
+import org.traccar.storage.StorageException;
import org.traccar.storage.query.Columns;
import org.traccar.storage.query.Condition;
+import org.traccar.storage.query.Limit;
+import org.traccar.storage.query.Order;
import org.traccar.storage.query.Request;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
@Singleton
public class CommandsManager {
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- private final Map<Long, Queue<Command>> deviceQueues = new ConcurrentHashMap<>();
-
private final Storage storage;
private final ServerManager serverManager;
private final SmsManager smsManager;
@@ -86,54 +80,32 @@ public class CommandsManager {
if (deviceSession != null && deviceSession.supportsLiveCommands()) {
deviceSession.sendCommand(command);
} else {
- getDeviceQueue(deviceId).add(command);
+ storage.addObject(QueuedCommand.fromCommand(command), new Request(new Columns.Exclude("id")));
return false;
}
}
return true;
}
- private Queue<Command> getDeviceQueue(long deviceId) {
- Queue<Command> deviceQueue;
- try {
- lock.readLock().lock();
- deviceQueue = deviceQueues.get(deviceId);
- } finally {
- lock.readLock().unlock();
- }
- if (deviceQueue != null) {
- return deviceQueue;
- } else {
- try {
- lock.writeLock().lock();
- return deviceQueues.computeIfAbsent(deviceId, key -> new ConcurrentLinkedQueue<>());
- } finally {
- lock.writeLock().unlock();
- }
- }
- }
-
public Collection<Command> readQueuedCommands(long deviceId) {
return readQueuedCommands(deviceId, Integer.MAX_VALUE);
}
public Collection<Command> readQueuedCommands(long deviceId, int count) {
- Queue<Command> deviceQueue;
try {
- lock.readLock().lock();
- deviceQueue = deviceQueues.get(deviceId);
- } finally {
- lock.readLock().unlock();
- }
- Collection<Command> result = new ArrayList<>();
- if (deviceQueue != null) {
- Command command = deviceQueue.poll();
- while (command != null && result.size() < count) {
- result.add(command);
- command = deviceQueue.poll();
+ var commands = storage.getObjects(QueuedCommand.class, new Request(
+ new Columns.All(),
+ new Condition.Equals("deviceId", "deviceId", deviceId),
+ new Order(false, "id"),
+ new Limit(count)));
+ for (var command : commands) {
+ storage.removeObject(QueuedCommand.class, new Request(
+ new Condition.Equals("id", "id", command.getId())));
}
+ return commands.stream().map(QueuedCommand::toCommand).collect(Collectors.toList());
+ } catch (StorageException e) {
+ throw new RuntimeException(e);
}
- return result;
}
}