From c53d98c668af9c79767e22964f05c7bf7dc866f2 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sat, 25 Jun 2022 13:33:35 -0700 Subject: Integrate broadcast service --- .../org/traccar/session/ConnectionManager.java | 13 ++++++-- .../org/traccar/session/cache/CacheManager.java | 37 +++++++++++++++------- 2 files changed, 36 insertions(+), 14 deletions(-) (limited to 'src/main/java/org/traccar/session') diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java index fe6521d18..74427c08b 100644 --- a/src/main/java/org/traccar/session/ConnectionManager.java +++ b/src/main/java/org/traccar/session/ConnectionManager.java @@ -22,6 +22,8 @@ import io.netty.util.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.traccar.Protocol; +import org.traccar.broadcast.BroadcastInterface; +import org.traccar.broadcast.BroadcastService; import org.traccar.config.Config; import org.traccar.config.Keys; import org.traccar.database.NotificationManager; @@ -52,7 +54,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @Singleton -public class ConnectionManager { +public class ConnectionManager implements BroadcastInterface { private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); @@ -70,6 +72,7 @@ public class ConnectionManager { private final Storage storage; private final NotificationManager notificationManager; private final Timer timer; + private final BroadcastService broadcastService; private final Map> listeners = new ConcurrentHashMap<>(); private final Map timeouts = new ConcurrentHashMap<>(); @@ -77,15 +80,17 @@ public class ConnectionManager { @Inject public ConnectionManager( Injector injector, Config config, CacheManager cacheManager, Storage storage, - NotificationManager notificationManager, Timer timer) { + NotificationManager notificationManager, Timer timer, BroadcastService broadcastService) { this.injector = injector; this.config = config; this.cacheManager = cacheManager; this.storage = storage; this.notificationManager = notificationManager; this.timer = timer; + this.broadcastService = broadcastService; deviceTimeout = config.getLong(Keys.STATUS_TIMEOUT) * 1000; updateDeviceState = config.getBoolean(Keys.STATUS_UPDATE_DEVICE_STATE); + broadcastService.registerListener(this); } public DeviceSession getDeviceSession(long deviceId) { @@ -270,6 +275,7 @@ public class ConnectionManager { } updateDevice(device); + broadcastService.updateDevice(device); } public DeviceState getDeviceState(long deviceId) { @@ -306,6 +312,7 @@ public class ConnectionManager { } } + @Override public synchronized void updateDevice(Device device) { for (User user : cacheManager.getDeviceObjects(device.getId(), User.class)) { if (listeners.containsKey(user.getId())) { @@ -316,6 +323,7 @@ public class ConnectionManager { } } + @Override public synchronized void updatePosition(Position position) { long deviceId = position.getDeviceId(); for (User user : cacheManager.getDeviceObjects(deviceId, User.class)) { @@ -327,6 +335,7 @@ public class ConnectionManager { } } + @Override public synchronized void updateEvent(long userId, Event event) { if (listeners.containsKey(userId)) { for (UpdateListener listener : listeners.get(userId)) { diff --git a/src/main/java/org/traccar/session/cache/CacheManager.java b/src/main/java/org/traccar/session/cache/CacheManager.java index abc8ca4c9..d2ada7d43 100644 --- a/src/main/java/org/traccar/session/cache/CacheManager.java +++ b/src/main/java/org/traccar/session/cache/CacheManager.java @@ -15,6 +15,8 @@ */ package org.traccar.session.cache; +import org.traccar.broadcast.BroadcastInterface; +import org.traccar.broadcast.BroadcastService; import org.traccar.config.Config; import org.traccar.helper.model.GeofenceUtil; import org.traccar.model.Attribute; @@ -49,7 +51,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @Singleton -public class CacheManager { +public class CacheManager implements BroadcastInterface { private static final Collection> CLASSES = Arrays.asList( Attribute.class, Driver.class, Geofence.class, Maintenance.class, Notification.class); @@ -68,9 +70,10 @@ public class CacheManager { private final Map> notificationUsers = new HashMap<>(); @Inject - public CacheManager(Config config, Storage storage) throws StorageException { + public CacheManager(Config config, Storage storage, BroadcastService broadcastService) throws StorageException { this.config = config; this.storage = storage; + broadcastService.registerListener(this); invalidateServer(); invalidateUsers(); } @@ -179,13 +182,18 @@ public class CacheManager { } } - public void updateOrInvalidate(Class clazz, long id) throws StorageException { - var object = storage.getObject(clazz, new Request( - new Columns.All(), new Condition.Equals("id", "id", id))); - if (object != null) { - updateOrInvalidate(object); - } else { - invalidate(clazz, id); + @Override + public void invalidateObject(Class clazz, long id) { + try { + var object = storage.getObject(clazz, new Request( + new Columns.All(), new Condition.Equals("id", "id", id))); + if (object != null) { + updateOrInvalidate(object); + } else { + invalidate(clazz, id); + } + } catch (StorageException e) { + throw new RuntimeException(e); } } @@ -219,10 +227,15 @@ public class CacheManager { invalidate(new CacheKey(clazz, id)); } - public void invalidate( + @Override + public void invalidatePermission( Class clazz1, long id1, - Class clazz2, long id2) throws StorageException { - invalidate(new CacheKey(clazz1, id1), new CacheKey(clazz2, id2)); + Class clazz2, long id2) { + try { + invalidate(new CacheKey(clazz1, id1), new CacheKey(clazz2, id2)); + } catch (StorageException e) { + throw new RuntimeException(e); + } } private void invalidateServer() throws StorageException { -- cgit v1.2.3