aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/session
diff options
context:
space:
mode:
authorAnton Tananaev <anton@traccar.org>2022-06-25 13:33:35 -0700
committerAnton Tananaev <anton@traccar.org>2022-06-25 13:33:35 -0700
commitc53d98c668af9c79767e22964f05c7bf7dc866f2 (patch)
tree42d664009487232e3a21514bb5ab52c7571262a6 /src/main/java/org/traccar/session
parent5a732a26c85785a9b801583f2fff0ce47314aa03 (diff)
downloadtrackermap-server-c53d98c668af9c79767e22964f05c7bf7dc866f2.tar.gz
trackermap-server-c53d98c668af9c79767e22964f05c7bf7dc866f2.tar.bz2
trackermap-server-c53d98c668af9c79767e22964f05c7bf7dc866f2.zip
Integrate broadcast service
Diffstat (limited to 'src/main/java/org/traccar/session')
-rw-r--r--src/main/java/org/traccar/session/ConnectionManager.java13
-rw-r--r--src/main/java/org/traccar/session/cache/CacheManager.java37
2 files changed, 36 insertions, 14 deletions
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java
index fe6521d18..74427c08b 100644
--- a/src/main/java/org/traccar/session/ConnectionManager.java
+++ b/src/main/java/org/traccar/session/ConnectionManager.java
@@ -22,6 +22,8 @@ import io.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.Protocol;
+import org.traccar.broadcast.BroadcastInterface;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.config.Config;
import org.traccar.config.Keys;
import org.traccar.database.NotificationManager;
@@ -52,7 +54,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Singleton
-public class ConnectionManager {
+public class ConnectionManager implements BroadcastInterface {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
@@ -70,6 +72,7 @@ public class ConnectionManager {
private final Storage storage;
private final NotificationManager notificationManager;
private final Timer timer;
+ private final BroadcastService broadcastService;
private final Map<Long, Set<UpdateListener>> listeners = new ConcurrentHashMap<>();
private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>();
@@ -77,15 +80,17 @@ public class ConnectionManager {
@Inject
public ConnectionManager(
Injector injector, Config config, CacheManager cacheManager, Storage storage,
- NotificationManager notificationManager, Timer timer) {
+ NotificationManager notificationManager, Timer timer, BroadcastService broadcastService) {
this.injector = injector;
this.config = config;
this.cacheManager = cacheManager;
this.storage = storage;
this.notificationManager = notificationManager;
this.timer = timer;
+ this.broadcastService = broadcastService;
deviceTimeout = config.getLong(Keys.STATUS_TIMEOUT) * 1000;
updateDeviceState = config.getBoolean(Keys.STATUS_UPDATE_DEVICE_STATE);
+ broadcastService.registerListener(this);
}
public DeviceSession getDeviceSession(long deviceId) {
@@ -270,6 +275,7 @@ public class ConnectionManager {
}
updateDevice(device);
+ broadcastService.updateDevice(device);
}
public DeviceState getDeviceState(long deviceId) {
@@ -306,6 +312,7 @@ public class ConnectionManager {
}
}
+ @Override
public synchronized void updateDevice(Device device) {
for (User user : cacheManager.getDeviceObjects(device.getId(), User.class)) {
if (listeners.containsKey(user.getId())) {
@@ -316,6 +323,7 @@ public class ConnectionManager {
}
}
+ @Override
public synchronized void updatePosition(Position position) {
long deviceId = position.getDeviceId();
for (User user : cacheManager.getDeviceObjects(deviceId, User.class)) {
@@ -327,6 +335,7 @@ public class ConnectionManager {
}
}
+ @Override
public synchronized void updateEvent(long userId, Event event) {
if (listeners.containsKey(userId)) {
for (UpdateListener listener : listeners.get(userId)) {
diff --git a/src/main/java/org/traccar/session/cache/CacheManager.java b/src/main/java/org/traccar/session/cache/CacheManager.java
index abc8ca4c9..d2ada7d43 100644
--- a/src/main/java/org/traccar/session/cache/CacheManager.java
+++ b/src/main/java/org/traccar/session/cache/CacheManager.java
@@ -15,6 +15,8 @@
*/
package org.traccar.session.cache;
+import org.traccar.broadcast.BroadcastInterface;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.config.Config;
import org.traccar.helper.model.GeofenceUtil;
import org.traccar.model.Attribute;
@@ -49,7 +51,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@Singleton
-public class CacheManager {
+public class CacheManager implements BroadcastInterface {
private static final Collection<Class<? extends BaseModel>> CLASSES = Arrays.asList(
Attribute.class, Driver.class, Geofence.class, Maintenance.class, Notification.class);
@@ -68,9 +70,10 @@ public class CacheManager {
private final Map<Long, List<User>> notificationUsers = new HashMap<>();
@Inject
- public CacheManager(Config config, Storage storage) throws StorageException {
+ public CacheManager(Config config, Storage storage, BroadcastService broadcastService) throws StorageException {
this.config = config;
this.storage = storage;
+ broadcastService.registerListener(this);
invalidateServer();
invalidateUsers();
}
@@ -179,13 +182,18 @@ public class CacheManager {
}
}
- public <T extends BaseModel> void updateOrInvalidate(Class<T> clazz, long id) throws StorageException {
- var object = storage.getObject(clazz, new Request(
- new Columns.All(), new Condition.Equals("id", "id", id)));
- if (object != null) {
- updateOrInvalidate(object);
- } else {
- invalidate(clazz, id);
+ @Override
+ public void invalidateObject(Class<? extends BaseModel> clazz, long id) {
+ try {
+ var object = storage.getObject(clazz, new Request(
+ new Columns.All(), new Condition.Equals("id", "id", id)));
+ if (object != null) {
+ updateOrInvalidate(object);
+ } else {
+ invalidate(clazz, id);
+ }
+ } catch (StorageException e) {
+ throw new RuntimeException(e);
}
}
@@ -219,10 +227,15 @@ public class CacheManager {
invalidate(new CacheKey(clazz, id));
}
- public void invalidate(
+ @Override
+ public void invalidatePermission(
Class<? extends BaseModel> clazz1, long id1,
- Class<? extends BaseModel> clazz2, long id2) throws StorageException {
- invalidate(new CacheKey(clazz1, id1), new CacheKey(clazz2, id2));
+ Class<? extends BaseModel> clazz2, long id2) {
+ try {
+ invalidate(new CacheKey(clazz1, id1), new CacheKey(clazz2, id2));
+ } catch (StorageException e) {
+ throw new RuntimeException(e);
+ }
}
private void invalidateServer() throws StorageException {