diff options
Diffstat (limited to 'src/main/java/org/traccar/session/ConnectionManager.java')
-rw-r--r-- | src/main/java/org/traccar/session/ConnectionManager.java | 66 |
1 files changed, 46 insertions, 20 deletions
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java index 74427c08b..0e0ea1eb8 100644 --- a/src/main/java/org/traccar/session/ConnectionManager.java +++ b/src/main/java/org/traccar/session/ConnectionManager.java @@ -30,6 +30,7 @@ import org.traccar.database.NotificationManager; import org.traccar.handler.events.MotionEventHandler; import org.traccar.handler.events.OverspeedEventHandler; import org.traccar.helper.model.AttributeUtil; +import org.traccar.model.BaseModel; import org.traccar.model.Device; import org.traccar.model.Event; import org.traccar.model.Position; @@ -45,6 +46,7 @@ import javax.inject.Inject; import javax.inject.Singleton; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -52,6 +54,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Singleton public class ConnectionManager implements BroadcastInterface { @@ -74,7 +77,10 @@ public class ConnectionManager implements BroadcastInterface { private final Timer timer; private final BroadcastService broadcastService; - private final Map<Long, Set<UpdateListener>> listeners = new ConcurrentHashMap<>(); + private final Map<Long, Set<UpdateListener>> listeners = new HashMap<>(); + private final Map<Long, Set<Long>> userDevices = new HashMap<>(); + private final Map<Long, Set<Long>> deviceUsers = new HashMap<>(); + private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>(); @Inject @@ -197,6 +203,10 @@ public class ConnectionManager implements BroadcastInterface { public void deviceUnknown(long deviceId) { updateDevice(deviceId, Device.STATUS_UNKNOWN, null); + removeDeviceSession(deviceId); + } + + private void removeDeviceSession(long deviceId) { DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId); if (deviceSession != null) { cacheManager.removeDevice(deviceId); @@ -274,8 +284,8 @@ public class ConnectionManager implements BroadcastInterface { LOGGER.warn("Update device status error", e); } - updateDevice(device); - broadcastService.updateDevice(device); + updateDevice(true, device); + broadcastService.updateDevice(true, device); } public DeviceState getDeviceState(long deviceId) { @@ -313,10 +323,14 @@ public class ConnectionManager implements BroadcastInterface { } @Override - public synchronized void updateDevice(Device device) { - for (User user : cacheManager.getDeviceObjects(device.getId(), User.class)) { - if (listeners.containsKey(user.getId())) { - for (UpdateListener listener : listeners.get(user.getId())) { + public synchronized void updateDevice(boolean local, Device device) { + if (!local && Device.STATUS_ONLINE.equals(device.getStatus())) { + timeouts.remove(device.getId()); + removeDeviceSession(device.getId()); + } + for (long userId : deviceUsers.getOrDefault(device.getId(), Collections.emptySet())) { + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { listener.onUpdateDevice(device); } } @@ -324,11 +338,10 @@ public class ConnectionManager implements BroadcastInterface { } @Override - public synchronized void updatePosition(Position position) { - long deviceId = position.getDeviceId(); - for (User user : cacheManager.getDeviceObjects(deviceId, User.class)) { - if (listeners.containsKey(user.getId())) { - for (UpdateListener listener : listeners.get(user.getId())) { + public synchronized void updatePosition(boolean local, Position position) { + for (long userId : deviceUsers.getOrDefault(position.getDeviceId(), Collections.emptySet())) { + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { listener.onUpdatePosition(position); } } @@ -336,7 +349,7 @@ public class ConnectionManager implements BroadcastInterface { } @Override - public synchronized void updateEvent(long userId, Event event) { + public synchronized void updateEvent(boolean local, long userId, Event event) { if (listeners.containsKey(userId)) { for (UpdateListener listener : listeners.get(userId)) { listener.onUpdateEvent(event); @@ -351,18 +364,31 @@ public class ConnectionManager implements BroadcastInterface { void onUpdateEvent(Event event); } - public synchronized void addListener(long userId, UpdateListener listener) { - if (!listeners.containsKey(userId)) { - listeners.put(userId, new HashSet<>()); + public synchronized void addListener(long userId, UpdateListener listener) throws StorageException { + var set = listeners.get(userId); + if (set == null) { + set = new HashSet<>(); + listeners.put(userId, set); + + var devices = storage.getObjects(Device.class, new Request( + new Columns.Include("id"), new Condition.Permission(User.class, userId, Device.class))); + userDevices.put(userId, devices.stream().map(BaseModel::getId).collect(Collectors.toUnmodifiableSet())); + devices.forEach(device -> deviceUsers.computeIfAbsent(device.getId(), id -> new HashSet<>()).add(userId)); } - listeners.get(userId).add(listener); + set.add(listener); } public synchronized void removeListener(long userId, UpdateListener listener) { - if (!listeners.containsKey(userId)) { - listeners.put(userId, new HashSet<>()); + var set = listeners.get(userId); + set.remove(listener); + if (set.isEmpty()) { + listeners.remove(userId); + + userDevices.remove(userId).forEach(deviceId -> deviceUsers.computeIfPresent(deviceId, (x, userIds) -> { + userIds.remove(userId); + return userIds.isEmpty() ? null : userIds; + })); } - listeners.get(userId).remove(listener); } } |