aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/session/ConnectionManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/traccar/session/ConnectionManager.java')
-rw-r--r--src/main/java/org/traccar/session/ConnectionManager.java66
1 files changed, 46 insertions, 20 deletions
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java
index 74427c08b..0e0ea1eb8 100644
--- a/src/main/java/org/traccar/session/ConnectionManager.java
+++ b/src/main/java/org/traccar/session/ConnectionManager.java
@@ -30,6 +30,7 @@ import org.traccar.database.NotificationManager;
import org.traccar.handler.events.MotionEventHandler;
import org.traccar.handler.events.OverspeedEventHandler;
import org.traccar.helper.model.AttributeUtil;
+import org.traccar.model.BaseModel;
import org.traccar.model.Device;
import org.traccar.model.Event;
import org.traccar.model.Position;
@@ -45,6 +46,7 @@ import javax.inject.Inject;
import javax.inject.Singleton;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -52,6 +54,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@Singleton
public class ConnectionManager implements BroadcastInterface {
@@ -74,7 +77,10 @@ public class ConnectionManager implements BroadcastInterface {
private final Timer timer;
private final BroadcastService broadcastService;
- private final Map<Long, Set<UpdateListener>> listeners = new ConcurrentHashMap<>();
+ private final Map<Long, Set<UpdateListener>> listeners = new HashMap<>();
+ private final Map<Long, Set<Long>> userDevices = new HashMap<>();
+ private final Map<Long, Set<Long>> deviceUsers = new HashMap<>();
+
private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>();
@Inject
@@ -197,6 +203,10 @@ public class ConnectionManager implements BroadcastInterface {
public void deviceUnknown(long deviceId) {
updateDevice(deviceId, Device.STATUS_UNKNOWN, null);
+ removeDeviceSession(deviceId);
+ }
+
+ private void removeDeviceSession(long deviceId) {
DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId);
if (deviceSession != null) {
cacheManager.removeDevice(deviceId);
@@ -274,8 +284,8 @@ public class ConnectionManager implements BroadcastInterface {
LOGGER.warn("Update device status error", e);
}
- updateDevice(device);
- broadcastService.updateDevice(device);
+ updateDevice(true, device);
+ broadcastService.updateDevice(true, device);
}
public DeviceState getDeviceState(long deviceId) {
@@ -313,10 +323,14 @@ public class ConnectionManager implements BroadcastInterface {
}
@Override
- public synchronized void updateDevice(Device device) {
- for (User user : cacheManager.getDeviceObjects(device.getId(), User.class)) {
- if (listeners.containsKey(user.getId())) {
- for (UpdateListener listener : listeners.get(user.getId())) {
+ public synchronized void updateDevice(boolean local, Device device) {
+ if (!local && Device.STATUS_ONLINE.equals(device.getStatus())) {
+ timeouts.remove(device.getId());
+ removeDeviceSession(device.getId());
+ }
+ for (long userId : deviceUsers.getOrDefault(device.getId(), Collections.emptySet())) {
+ if (listeners.containsKey(userId)) {
+ for (UpdateListener listener : listeners.get(userId)) {
listener.onUpdateDevice(device);
}
}
@@ -324,11 +338,10 @@ public class ConnectionManager implements BroadcastInterface {
}
@Override
- public synchronized void updatePosition(Position position) {
- long deviceId = position.getDeviceId();
- for (User user : cacheManager.getDeviceObjects(deviceId, User.class)) {
- if (listeners.containsKey(user.getId())) {
- for (UpdateListener listener : listeners.get(user.getId())) {
+ public synchronized void updatePosition(boolean local, Position position) {
+ for (long userId : deviceUsers.getOrDefault(position.getDeviceId(), Collections.emptySet())) {
+ if (listeners.containsKey(userId)) {
+ for (UpdateListener listener : listeners.get(userId)) {
listener.onUpdatePosition(position);
}
}
@@ -336,7 +349,7 @@ public class ConnectionManager implements BroadcastInterface {
}
@Override
- public synchronized void updateEvent(long userId, Event event) {
+ public synchronized void updateEvent(boolean local, long userId, Event event) {
if (listeners.containsKey(userId)) {
for (UpdateListener listener : listeners.get(userId)) {
listener.onUpdateEvent(event);
@@ -351,18 +364,31 @@ public class ConnectionManager implements BroadcastInterface {
void onUpdateEvent(Event event);
}
- public synchronized void addListener(long userId, UpdateListener listener) {
- if (!listeners.containsKey(userId)) {
- listeners.put(userId, new HashSet<>());
+ public synchronized void addListener(long userId, UpdateListener listener) throws StorageException {
+ var set = listeners.get(userId);
+ if (set == null) {
+ set = new HashSet<>();
+ listeners.put(userId, set);
+
+ var devices = storage.getObjects(Device.class, new Request(
+ new Columns.Include("id"), new Condition.Permission(User.class, userId, Device.class)));
+ userDevices.put(userId, devices.stream().map(BaseModel::getId).collect(Collectors.toUnmodifiableSet()));
+ devices.forEach(device -> deviceUsers.computeIfAbsent(device.getId(), id -> new HashSet<>()).add(userId));
}
- listeners.get(userId).add(listener);
+ set.add(listener);
}
public synchronized void removeListener(long userId, UpdateListener listener) {
- if (!listeners.containsKey(userId)) {
- listeners.put(userId, new HashSet<>());
+ var set = listeners.get(userId);
+ set.remove(listener);
+ if (set.isEmpty()) {
+ listeners.remove(userId);
+
+ userDevices.remove(userId).forEach(deviceId -> deviceUsers.computeIfPresent(deviceId, (x, userIds) -> {
+ userIds.remove(userId);
+ return userIds.isEmpty() ? null : userIds;
+ }));
}
- listeners.get(userId).remove(listener);
}
}