aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/traccar/MainEventHandler.java4
-rw-r--r--src/main/java/org/traccar/api/AsyncSocket.java7
-rw-r--r--src/main/java/org/traccar/api/resource/DeviceResource.java4
-rw-r--r--src/main/java/org/traccar/broadcast/BroadcastInterface.java6
-rw-r--r--src/main/java/org/traccar/broadcast/MulticastBroadcastService.java12
-rw-r--r--src/main/java/org/traccar/handler/events/GeofenceEventHandler.java2
-rw-r--r--src/main/java/org/traccar/notificators/NotificatorWeb.java4
-rw-r--r--src/main/java/org/traccar/session/ConnectionManager.java66
8 files changed, 65 insertions, 40 deletions
diff --git a/src/main/java/org/traccar/MainEventHandler.java b/src/main/java/org/traccar/MainEventHandler.java
index 0a8c69b54..e2cad15c6 100644
--- a/src/main/java/org/traccar/MainEventHandler.java
+++ b/src/main/java/org/traccar/MainEventHandler.java
@@ -92,8 +92,8 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter {
new Condition.Equals("id", "id")));
cacheManager.updatePosition(position);
- connectionManager.updatePosition(position);
- broadcastService.updatePosition(position);
+ connectionManager.updatePosition(true, position);
+ broadcastService.updatePosition(true, position);
}
} catch (StorageException error) {
LOGGER.warn("Failed to update device", error);
diff --git a/src/main/java/org/traccar/api/AsyncSocket.java b/src/main/java/org/traccar/api/AsyncSocket.java
index 40aa68e88..5fc4b4412 100644
--- a/src/main/java/org/traccar/api/AsyncSocket.java
+++ b/src/main/java/org/traccar/api/AsyncSocket.java
@@ -58,15 +58,14 @@ public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.U
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
- Map<String, Collection<?>> data = new HashMap<>();
try {
+ Map<String, Collection<?>> data = new HashMap<>();
data.put(KEY_POSITIONS, PositionUtil.getLatestPositions(storage, userId));
+ sendData(data);
+ connectionManager.addListener(userId, this);
} catch (StorageException e) {
throw new RuntimeException(e);
}
- sendData(data);
-
- connectionManager.addListener(userId, this);
}
@Override
diff --git a/src/main/java/org/traccar/api/resource/DeviceResource.java b/src/main/java/org/traccar/api/resource/DeviceResource.java
index 2b673a108..e205f2d28 100644
--- a/src/main/java/org/traccar/api/resource/DeviceResource.java
+++ b/src/main/java/org/traccar/api/resource/DeviceResource.java
@@ -137,8 +137,8 @@ public class DeviceResource extends BaseObjectResource<Device> {
try {
cacheManager.addDevice(position.getDeviceId());
cacheManager.updatePosition(position);
- connectionManager.updatePosition(position);
- broadcastService.updatePosition(position);
+ connectionManager.updatePosition(true, position);
+ broadcastService.updatePosition(true, position);
} finally {
cacheManager.removeDevice(position.getDeviceId());
}
diff --git a/src/main/java/org/traccar/broadcast/BroadcastInterface.java b/src/main/java/org/traccar/broadcast/BroadcastInterface.java
index d5b49f213..69e610dc6 100644
--- a/src/main/java/org/traccar/broadcast/BroadcastInterface.java
+++ b/src/main/java/org/traccar/broadcast/BroadcastInterface.java
@@ -22,13 +22,13 @@ import org.traccar.model.Position;
public interface BroadcastInterface {
- default void updateDevice(Device device) {
+ default void updateDevice(boolean local, Device device) {
}
- default void updatePosition(Position position) {
+ default void updatePosition(boolean local, Position position) {
}
- default void updateEvent(long userId, Event event) {
+ default void updateEvent(boolean local, long userId, Event event) {
}
default void invalidateObject(Class<? extends BaseModel> clazz, long id) {
diff --git a/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java b/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java
index 0525fa742..ac0fcbd86 100644
--- a/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java
+++ b/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java
@@ -66,21 +66,21 @@ public class MulticastBroadcastService implements BroadcastService {
}
@Override
- public void updateDevice(Device device) {
+ public void updateDevice(boolean local, Device device) {
BroadcastMessage message = new BroadcastMessage();
message.setDevice(device);
sendMessage(message);
}
@Override
- public void updatePosition(Position position) {
+ public void updatePosition(boolean local, Position position) {
BroadcastMessage message = new BroadcastMessage();
message.setPosition(position);
sendMessage(message);
}
@Override
- public void updateEvent(long userId, Event event) {
+ public void updateEvent(boolean local, long userId, Event event) {
BroadcastMessage message = new BroadcastMessage();
message.setUserId(userId);
message.setEvent(event);
@@ -115,11 +115,11 @@ public class MulticastBroadcastService implements BroadcastService {
private void handleMessage(BroadcastMessage message) {
if (message.getDevice() != null) {
- listeners.forEach(listener -> listener.updateDevice(message.getDevice()));
+ listeners.forEach(listener -> listener.updateDevice(false, message.getDevice()));
} else if (message.getPosition() != null) {
- listeners.forEach(listener -> listener.updatePosition(message.getPosition()));
+ listeners.forEach(listener -> listener.updatePosition(false, message.getPosition()));
} else if (message.getUserId() != null && message.getEvent() != null) {
- listeners.forEach(listener -> listener.updateEvent(message.getUserId(), message.getEvent()));
+ listeners.forEach(listener -> listener.updateEvent(false, message.getUserId(), message.getEvent()));
} else if (message.getChanges() != null) {
var iterator = message.getChanges().entrySet().iterator();
if (iterator.hasNext()) {
diff --git a/src/main/java/org/traccar/handler/events/GeofenceEventHandler.java b/src/main/java/org/traccar/handler/events/GeofenceEventHandler.java
index 0a924cfc3..ca3fc3f89 100644
--- a/src/main/java/org/traccar/handler/events/GeofenceEventHandler.java
+++ b/src/main/java/org/traccar/handler/events/GeofenceEventHandler.java
@@ -68,7 +68,7 @@ public class GeofenceEventHandler extends BaseEventHandler {
device.setGeofenceIds(currentGeofences);
if (!oldGeofences.isEmpty() || !newGeofences.isEmpty()) {
- connectionManager.updateDevice(device);
+ connectionManager.updateDevice(true, device);
}
Map<Event, Position> events = new HashMap<>();
diff --git a/src/main/java/org/traccar/notificators/NotificatorWeb.java b/src/main/java/org/traccar/notificators/NotificatorWeb.java
index efbbf24cc..061018ba9 100644
--- a/src/main/java/org/traccar/notificators/NotificatorWeb.java
+++ b/src/main/java/org/traccar/notificators/NotificatorWeb.java
@@ -56,8 +56,8 @@ public final class NotificatorWeb implements Notificator {
var message = notificationFormatter.formatMessage(user, event, position, "short");
copy.set("message", message.getBody());
- connectionManager.updateEvent(user.getId(), copy);
- broadcastService.updateEvent(user.getId(), copy);
+ connectionManager.updateEvent(true, user.getId(), copy);
+ broadcastService.updateEvent(true, user.getId(), copy);
}
}
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java
index 74427c08b..0e0ea1eb8 100644
--- a/src/main/java/org/traccar/session/ConnectionManager.java
+++ b/src/main/java/org/traccar/session/ConnectionManager.java
@@ -30,6 +30,7 @@ import org.traccar.database.NotificationManager;
import org.traccar.handler.events.MotionEventHandler;
import org.traccar.handler.events.OverspeedEventHandler;
import org.traccar.helper.model.AttributeUtil;
+import org.traccar.model.BaseModel;
import org.traccar.model.Device;
import org.traccar.model.Event;
import org.traccar.model.Position;
@@ -45,6 +46,7 @@ import javax.inject.Inject;
import javax.inject.Singleton;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -52,6 +54,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@Singleton
public class ConnectionManager implements BroadcastInterface {
@@ -74,7 +77,10 @@ public class ConnectionManager implements BroadcastInterface {
private final Timer timer;
private final BroadcastService broadcastService;
- private final Map<Long, Set<UpdateListener>> listeners = new ConcurrentHashMap<>();
+ private final Map<Long, Set<UpdateListener>> listeners = new HashMap<>();
+ private final Map<Long, Set<Long>> userDevices = new HashMap<>();
+ private final Map<Long, Set<Long>> deviceUsers = new HashMap<>();
+
private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>();
@Inject
@@ -197,6 +203,10 @@ public class ConnectionManager implements BroadcastInterface {
public void deviceUnknown(long deviceId) {
updateDevice(deviceId, Device.STATUS_UNKNOWN, null);
+ removeDeviceSession(deviceId);
+ }
+
+ private void removeDeviceSession(long deviceId) {
DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId);
if (deviceSession != null) {
cacheManager.removeDevice(deviceId);
@@ -274,8 +284,8 @@ public class ConnectionManager implements BroadcastInterface {
LOGGER.warn("Update device status error", e);
}
- updateDevice(device);
- broadcastService.updateDevice(device);
+ updateDevice(true, device);
+ broadcastService.updateDevice(true, device);
}
public DeviceState getDeviceState(long deviceId) {
@@ -313,10 +323,14 @@ public class ConnectionManager implements BroadcastInterface {
}
@Override
- public synchronized void updateDevice(Device device) {
- for (User user : cacheManager.getDeviceObjects(device.getId(), User.class)) {
- if (listeners.containsKey(user.getId())) {
- for (UpdateListener listener : listeners.get(user.getId())) {
+ public synchronized void updateDevice(boolean local, Device device) {
+ if (!local && Device.STATUS_ONLINE.equals(device.getStatus())) {
+ timeouts.remove(device.getId());
+ removeDeviceSession(device.getId());
+ }
+ for (long userId : deviceUsers.getOrDefault(device.getId(), Collections.emptySet())) {
+ if (listeners.containsKey(userId)) {
+ for (UpdateListener listener : listeners.get(userId)) {
listener.onUpdateDevice(device);
}
}
@@ -324,11 +338,10 @@ public class ConnectionManager implements BroadcastInterface {
}
@Override
- public synchronized void updatePosition(Position position) {
- long deviceId = position.getDeviceId();
- for (User user : cacheManager.getDeviceObjects(deviceId, User.class)) {
- if (listeners.containsKey(user.getId())) {
- for (UpdateListener listener : listeners.get(user.getId())) {
+ public synchronized void updatePosition(boolean local, Position position) {
+ for (long userId : deviceUsers.getOrDefault(position.getDeviceId(), Collections.emptySet())) {
+ if (listeners.containsKey(userId)) {
+ for (UpdateListener listener : listeners.get(userId)) {
listener.onUpdatePosition(position);
}
}
@@ -336,7 +349,7 @@ public class ConnectionManager implements BroadcastInterface {
}
@Override
- public synchronized void updateEvent(long userId, Event event) {
+ public synchronized void updateEvent(boolean local, long userId, Event event) {
if (listeners.containsKey(userId)) {
for (UpdateListener listener : listeners.get(userId)) {
listener.onUpdateEvent(event);
@@ -351,18 +364,31 @@ public class ConnectionManager implements BroadcastInterface {
void onUpdateEvent(Event event);
}
- public synchronized void addListener(long userId, UpdateListener listener) {
- if (!listeners.containsKey(userId)) {
- listeners.put(userId, new HashSet<>());
+ public synchronized void addListener(long userId, UpdateListener listener) throws StorageException {
+ var set = listeners.get(userId);
+ if (set == null) {
+ set = new HashSet<>();
+ listeners.put(userId, set);
+
+ var devices = storage.getObjects(Device.class, new Request(
+ new Columns.Include("id"), new Condition.Permission(User.class, userId, Device.class)));
+ userDevices.put(userId, devices.stream().map(BaseModel::getId).collect(Collectors.toUnmodifiableSet()));
+ devices.forEach(device -> deviceUsers.computeIfAbsent(device.getId(), id -> new HashSet<>()).add(userId));
}
- listeners.get(userId).add(listener);
+ set.add(listener);
}
public synchronized void removeListener(long userId, UpdateListener listener) {
- if (!listeners.containsKey(userId)) {
- listeners.put(userId, new HashSet<>());
+ var set = listeners.get(userId);
+ set.remove(listener);
+ if (set.isEmpty()) {
+ listeners.remove(userId);
+
+ userDevices.remove(userId).forEach(deviceId -> deviceUsers.computeIfPresent(deviceId, (x, userIds) -> {
+ userIds.remove(userId);
+ return userIds.isEmpty() ? null : userIds;
+ }));
}
- listeners.get(userId).remove(listener);
}
}