From 6e1d43d86414281ca562c45488b82808936fc980 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Thu, 2 Jun 2022 17:11:17 -0700 Subject: Integrate cache manager --- src/main/java/org/traccar/BaseProtocolDecoder.java | 7 +++++- src/main/java/org/traccar/MainEventHandler.java | 2 +- .../org/traccar/session/ConnectionManager.java | 26 +++++++++++++++++++--- .../org/traccar/session/cache/CacheManager.java | 8 +++++-- 4 files changed, 36 insertions(+), 7 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/org/traccar/BaseProtocolDecoder.java b/src/main/java/org/traccar/BaseProtocolDecoder.java index d6c571b79..71ef686fa 100644 --- a/src/main/java/org/traccar/BaseProtocolDecoder.java +++ b/src/main/java/org/traccar/BaseProtocolDecoder.java @@ -30,6 +30,7 @@ import org.traccar.model.Device; import org.traccar.model.Position; import org.traccar.session.ConnectionManager; import org.traccar.session.DeviceSession; +import org.traccar.storage.StorageException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -140,7 +141,11 @@ public abstract class BaseProtocolDecoder extends ExtendedObjectDecoder { } public DeviceSession getDeviceSession(Channel channel, SocketAddress remoteAddress, String... uniqueIds) { - return connectionManager.getDeviceSession(protocol, channel, remoteAddress, uniqueIds); + try { + return connectionManager.getDeviceSession(protocol, channel, remoteAddress, uniqueIds); + } catch (StorageException e) { + throw new RuntimeException(e); + } } public void getLastLocation(Position position, Date deviceTime) { diff --git a/src/main/java/org/traccar/MainEventHandler.java b/src/main/java/org/traccar/MainEventHandler.java index 0a25b7547..f04307fdb 100644 --- a/src/main/java/org/traccar/MainEventHandler.java +++ b/src/main/java/org/traccar/MainEventHandler.java @@ -130,7 +130,7 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter { if (BasePipelineFactory.getHandler(ctx.pipeline(), HttpRequestDecoder.class) == null && !connectionlessProtocols.contains(ctx.pipeline().get(BaseProtocolDecoder.class).getProtocolName())) { - Context.getConnectionManager().removeDeviceSessions(ctx.channel()); + Context.getConnectionManager().deviceDisconnected(ctx.channel()); } } diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java index 5d8a8c606..e01a568aa 100644 --- a/src/main/java/org/traccar/session/ConnectionManager.java +++ b/src/main/java/org/traccar/session/ConnectionManager.java @@ -29,6 +29,7 @@ import org.traccar.handler.events.OverspeedEventHandler; import org.traccar.model.Device; import org.traccar.model.Event; import org.traccar.model.Position; +import org.traccar.session.cache.CacheManager; import org.traccar.storage.StorageException; import java.net.InetSocketAddress; @@ -51,6 +52,8 @@ public class ConnectionManager { private final Map sessionsByDeviceId = new ConcurrentHashMap<>(); private final Map> sessionsByEndpoint = new ConcurrentHashMap<>(); + private final CacheManager cacheManager; + private final Map> listeners = new ConcurrentHashMap<>(); private final Map timeouts = new ConcurrentHashMap<>(); @@ -60,6 +63,7 @@ public class ConnectionManager { deviceTimeout = Context.getConfig().getLong(Keys.STATUS_TIMEOUT) * 1000; updateDeviceState = Context.getConfig().getBoolean(Keys.STATUS_UPDATE_DEVICE_STATE); timer = Main.getInjector().getInstance(Timer.class); + cacheManager = Main.getInjector().getInstance(CacheManager.class); } public DeviceSession getDeviceSession(long deviceId) { @@ -67,7 +71,8 @@ public class ConnectionManager { } public DeviceSession getDeviceSession( - Protocol protocol, Channel channel, SocketAddress remoteAddress, String... uniqueIds) { + Protocol protocol, Channel channel, SocketAddress remoteAddress, + String... uniqueIds) throws StorageException { Endpoint endpoint = new Endpoint(channel, remoteAddress); Map endpointSessions = sessionsByEndpoint.getOrDefault( @@ -116,6 +121,7 @@ public class ConnectionManager { endpointSessions.put(device.getUniqueId(), deviceSession); sessionsByEndpoint.put(endpoint, endpointSessions); sessionsByDeviceId.put(device.getId(), deviceSession); + cacheManager.addDevice(device.getId()); return deviceSession; } else { @@ -125,17 +131,31 @@ public class ConnectionManager { } } - public void removeDeviceSessions(Channel channel) { + public void deviceDisconnected(Channel channel) { Endpoint endpoint = new Endpoint(channel, channel.remoteAddress()); Map endpointSessions = sessionsByEndpoint.remove(endpoint); if (endpointSessions != null) { for (DeviceSession deviceSession : endpointSessions.values()) { updateDevice(deviceSession.getDeviceId(), Device.STATUS_OFFLINE, null); sessionsByDeviceId.remove(deviceSession.getDeviceId()); + cacheManager.removeDevice(deviceSession.getDeviceId()); } } } + public void deviceUnknown(long deviceId) { + updateDevice(deviceId, Device.STATUS_UNKNOWN, null); + DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId); + cacheManager.removeDevice(deviceId); + if (deviceSession != null) { + Endpoint endpoint = new Endpoint(deviceSession.getChannel(), deviceSession.getRemoteAddress()); + sessionsByEndpoint.computeIfPresent(endpoint, (e, sessions) -> { + sessions.remove(deviceSession.getUniqueId()); + return sessions.isEmpty() ? null : sessions; + }); + } + } + public void updateDevice(final long deviceId, String status, Date time) { Device device = Context.getIdentityManager().getById(deviceId); if (device == null) { @@ -181,7 +201,7 @@ public class ConnectionManager { if (status.equals(Device.STATUS_ONLINE)) { timeouts.put(deviceId, timer.newTimeout(timeout1 -> { if (!timeout1.isCancelled()) { - updateDevice(deviceId, Device.STATUS_UNKNOWN, null); + deviceUnknown(deviceId); } }, deviceTimeout, TimeUnit.MILLISECONDS)); } diff --git a/src/main/java/org/traccar/session/cache/CacheManager.java b/src/main/java/org/traccar/session/cache/CacheManager.java index ae514ce8b..d019b072b 100644 --- a/src/main/java/org/traccar/session/cache/CacheManager.java +++ b/src/main/java/org/traccar/session/cache/CacheManager.java @@ -99,7 +99,9 @@ public class CacheManager { public void addDevice(long deviceId) throws StorageException { try { lock.writeLock().lock(); - unsafeAddDevice(deviceId); + if (!deviceLinks.containsKey(deviceId)) { + unsafeAddDevice(deviceId); + } } finally { lock.writeLock().unlock(); } @@ -108,7 +110,9 @@ public class CacheManager { public void removeDevice(long deviceId) { try { lock.writeLock().lock(); - unsafeRemoveDevice(deviceId); + if (deviceLinks.containsKey(deviceId)) { + unsafeRemoveDevice(deviceId); + } } finally { lock.writeLock().unlock(); } -- cgit v1.2.3