diff options
8 files changed, 65 insertions, 40 deletions
diff --git a/src/main/java/org/traccar/MainEventHandler.java b/src/main/java/org/traccar/MainEventHandler.java index 0a8c69b54..e2cad15c6 100644 --- a/src/main/java/org/traccar/MainEventHandler.java +++ b/src/main/java/org/traccar/MainEventHandler.java @@ -92,8 +92,8 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { new Condition.Equals("id", "id"))); cacheManager.updatePosition(position); - connectionManager.updatePosition(position); - broadcastService.updatePosition(position); + connectionManager.updatePosition(true, position); + broadcastService.updatePosition(true, position); } } catch (StorageException error) { LOGGER.warn("Failed to update device", error); diff --git a/src/main/java/org/traccar/api/AsyncSocket.java b/src/main/java/org/traccar/api/AsyncSocket.java index 40aa68e88..5fc4b4412 100644 --- a/src/main/java/org/traccar/api/AsyncSocket.java +++ b/src/main/java/org/traccar/api/AsyncSocket.java @@ -58,15 +58,14 @@ public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.U public void onWebSocketConnect(Session session) { super.onWebSocketConnect(session); - Map<String, Collection<?>> data = new HashMap<>(); try { + Map<String, Collection<?>> data = new HashMap<>(); data.put(KEY_POSITIONS, PositionUtil.getLatestPositions(storage, userId)); + sendData(data); + connectionManager.addListener(userId, this); } catch (StorageException e) { throw new RuntimeException(e); } - sendData(data); - - connectionManager.addListener(userId, this); } @Override diff --git a/src/main/java/org/traccar/api/resource/DeviceResource.java b/src/main/java/org/traccar/api/resource/DeviceResource.java index 2b673a108..e205f2d28 100644 --- a/src/main/java/org/traccar/api/resource/DeviceResource.java +++ b/src/main/java/org/traccar/api/resource/DeviceResource.java @@ -137,8 +137,8 @@ public class DeviceResource extends BaseObjectResource<Device> { try { cacheManager.addDevice(position.getDeviceId()); cacheManager.updatePosition(position); - connectionManager.updatePosition(position); - broadcastService.updatePosition(position); + connectionManager.updatePosition(true, position); + broadcastService.updatePosition(true, position); } finally { cacheManager.removeDevice(position.getDeviceId()); } diff --git a/src/main/java/org/traccar/broadcast/BroadcastInterface.java b/src/main/java/org/traccar/broadcast/BroadcastInterface.java index d5b49f213..69e610dc6 100644 --- a/src/main/java/org/traccar/broadcast/BroadcastInterface.java +++ b/src/main/java/org/traccar/broadcast/BroadcastInterface.java @@ -22,13 +22,13 @@ import org.traccar.model.Position; public interface BroadcastInterface { - default void updateDevice(Device device) { + default void updateDevice(boolean local, Device device) { } - default void updatePosition(Position position) { + default void updatePosition(boolean local, Position position) { } - default void updateEvent(long userId, Event event) { + default void updateEvent(boolean local, long userId, Event event) { } default void invalidateObject(Class<? extends BaseModel> clazz, long id) { diff --git a/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java b/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java index 0525fa742..ac0fcbd86 100644 --- a/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java +++ b/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java @@ -66,21 +66,21 @@ public class MulticastBroadcastService implements BroadcastService { } @Override - public void updateDevice(Device device) { + public void updateDevice(boolean local, Device device) { BroadcastMessage message = new BroadcastMessage(); message.setDevice(device); sendMessage(message); } @Override - public void updatePosition(Position position) { + public void updatePosition(boolean local, Position position) { BroadcastMessage message = new BroadcastMessage(); message.setPosition(position); sendMessage(message); } @Override - public void updateEvent(long userId, Event event) { + public void updateEvent(boolean local, long userId, Event event) { BroadcastMessage message = new BroadcastMessage(); message.setUserId(userId); message.setEvent(event); @@ -115,11 +115,11 @@ public class MulticastBroadcastService implements BroadcastService { private void handleMessage(BroadcastMessage message) { if (message.getDevice() != null) { - listeners.forEach(listener -> listener.updateDevice(message.getDevice())); + listeners.forEach(listener -> listener.updateDevice(false, message.getDevice())); } else if (message.getPosition() != null) { - listeners.forEach(listener -> listener.updatePosition(message.getPosition())); + listeners.forEach(listener -> listener.updatePosition(false, message.getPosition())); } else if (message.getUserId() != null && message.getEvent() != null) { - listeners.forEach(listener -> listener.updateEvent(message.getUserId(), message.getEvent())); + listeners.forEach(listener -> listener.updateEvent(false, message.getUserId(), message.getEvent())); } else if (message.getChanges() != null) { var iterator = message.getChanges().entrySet().iterator(); if (iterator.hasNext()) { diff --git a/src/main/java/org/traccar/handler/events/GeofenceEventHandler.java b/src/main/java/org/traccar/handler/events/GeofenceEventHandler.java index 0a924cfc3..ca3fc3f89 100644 --- a/src/main/java/org/traccar/handler/events/GeofenceEventHandler.java +++ b/src/main/java/org/traccar/handler/events/GeofenceEventHandler.java @@ -68,7 +68,7 @@ public class GeofenceEventHandler extends BaseEventHandler { device.setGeofenceIds(currentGeofences); if (!oldGeofences.isEmpty() || !newGeofences.isEmpty()) { - connectionManager.updateDevice(device); + connectionManager.updateDevice(true, device); } Map<Event, Position> events = new HashMap<>(); diff --git a/src/main/java/org/traccar/notificators/NotificatorWeb.java b/src/main/java/org/traccar/notificators/NotificatorWeb.java index efbbf24cc..061018ba9 100644 --- a/src/main/java/org/traccar/notificators/NotificatorWeb.java +++ b/src/main/java/org/traccar/notificators/NotificatorWeb.java @@ -56,8 +56,8 @@ public final class NotificatorWeb implements Notificator { var message = notificationFormatter.formatMessage(user, event, position, "short"); copy.set("message", message.getBody()); - connectionManager.updateEvent(user.getId(), copy); - broadcastService.updateEvent(user.getId(), copy); + connectionManager.updateEvent(true, user.getId(), copy); + broadcastService.updateEvent(true, user.getId(), copy); } } diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java index 74427c08b..0e0ea1eb8 100644 --- a/src/main/java/org/traccar/session/ConnectionManager.java +++ b/src/main/java/org/traccar/session/ConnectionManager.java @@ -30,6 +30,7 @@ import org.traccar.database.NotificationManager; import org.traccar.handler.events.MotionEventHandler; import org.traccar.handler.events.OverspeedEventHandler; import org.traccar.helper.model.AttributeUtil; +import org.traccar.model.BaseModel; import org.traccar.model.Device; import org.traccar.model.Event; import org.traccar.model.Position; @@ -45,6 +46,7 @@ import javax.inject.Inject; import javax.inject.Singleton; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -52,6 +54,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Singleton public class ConnectionManager implements BroadcastInterface { @@ -74,7 +77,10 @@ public class ConnectionManager implements BroadcastInterface { private final Timer timer; private final BroadcastService broadcastService; - private final Map<Long, Set<UpdateListener>> listeners = new ConcurrentHashMap<>(); + private final Map<Long, Set<UpdateListener>> listeners = new HashMap<>(); + private final Map<Long, Set<Long>> userDevices = new HashMap<>(); + private final Map<Long, Set<Long>> deviceUsers = new HashMap<>(); + private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>(); @Inject @@ -197,6 +203,10 @@ public class ConnectionManager implements BroadcastInterface { public void deviceUnknown(long deviceId) { updateDevice(deviceId, Device.STATUS_UNKNOWN, null); + removeDeviceSession(deviceId); + } + + private void removeDeviceSession(long deviceId) { DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId); if (deviceSession != null) { cacheManager.removeDevice(deviceId); @@ -274,8 +284,8 @@ public class ConnectionManager implements BroadcastInterface { LOGGER.warn("Update device status error", e); } - updateDevice(device); - broadcastService.updateDevice(device); + updateDevice(true, device); + broadcastService.updateDevice(true, device); } public DeviceState getDeviceState(long deviceId) { @@ -313,10 +323,14 @@ public class ConnectionManager implements BroadcastInterface { } @Override - public synchronized void updateDevice(Device device) { - for (User user : cacheManager.getDeviceObjects(device.getId(), User.class)) { - if (listeners.containsKey(user.getId())) { - for (UpdateListener listener : listeners.get(user.getId())) { + public synchronized void updateDevice(boolean local, Device device) { + if (!local && Device.STATUS_ONLINE.equals(device.getStatus())) { + timeouts.remove(device.getId()); + removeDeviceSession(device.getId()); + } + for (long userId : deviceUsers.getOrDefault(device.getId(), Collections.emptySet())) { + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { listener.onUpdateDevice(device); } } @@ -324,11 +338,10 @@ public class ConnectionManager implements BroadcastInterface { } @Override - public synchronized void updatePosition(Position position) { - long deviceId = position.getDeviceId(); - for (User user : cacheManager.getDeviceObjects(deviceId, User.class)) { - if (listeners.containsKey(user.getId())) { - for (UpdateListener listener : listeners.get(user.getId())) { + public synchronized void updatePosition(boolean local, Position position) { + for (long userId : deviceUsers.getOrDefault(position.getDeviceId(), Collections.emptySet())) { + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { listener.onUpdatePosition(position); } } @@ -336,7 +349,7 @@ public class ConnectionManager implements BroadcastInterface { } @Override - public synchronized void updateEvent(long userId, Event event) { + public synchronized void updateEvent(boolean local, long userId, Event event) { if (listeners.containsKey(userId)) { for (UpdateListener listener : listeners.get(userId)) { listener.onUpdateEvent(event); @@ -351,18 +364,31 @@ public class ConnectionManager implements BroadcastInterface { void onUpdateEvent(Event event); } - public synchronized void addListener(long userId, UpdateListener listener) { - if (!listeners.containsKey(userId)) { - listeners.put(userId, new HashSet<>()); + public synchronized void addListener(long userId, UpdateListener listener) throws StorageException { + var set = listeners.get(userId); + if (set == null) { + set = new HashSet<>(); + listeners.put(userId, set); + + var devices = storage.getObjects(Device.class, new Request( + new Columns.Include("id"), new Condition.Permission(User.class, userId, Device.class))); + userDevices.put(userId, devices.stream().map(BaseModel::getId).collect(Collectors.toUnmodifiableSet())); + devices.forEach(device -> deviceUsers.computeIfAbsent(device.getId(), id -> new HashSet<>()).add(userId)); } - listeners.get(userId).add(listener); + set.add(listener); } public synchronized void removeListener(long userId, UpdateListener listener) { - if (!listeners.containsKey(userId)) { - listeners.put(userId, new HashSet<>()); + var set = listeners.get(userId); + set.remove(listener); + if (set.isEmpty()) { + listeners.remove(userId); + + userDevices.remove(userId).forEach(deviceId -> deviceUsers.computeIfPresent(deviceId, (x, userIds) -> { + userIds.remove(userId); + return userIds.isEmpty() ? null : userIds; + })); } - listeners.get(userId).remove(listener); } } |