diff options
Diffstat (limited to 'src/main/java/org/traccar/session/ConnectionManager.java')
-rw-r--r-- | src/main/java/org/traccar/session/ConnectionManager.java | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java new file mode 100644 index 000000000..37a42d827 --- /dev/null +++ b/src/main/java/org/traccar/session/ConnectionManager.java @@ -0,0 +1,374 @@ +/* + * Copyright 2015 - 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session; + +import io.netty.channel.Channel; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.Protocol; +import org.traccar.broadcast.BroadcastInterface; +import org.traccar.broadcast.BroadcastService; +import org.traccar.config.Config; +import org.traccar.config.Keys; +import org.traccar.database.DeviceLookupService; +import org.traccar.database.NotificationManager; +import org.traccar.model.BaseModel; +import org.traccar.model.Device; +import org.traccar.model.Event; +import org.traccar.model.Position; +import org.traccar.model.User; +import org.traccar.session.cache.CacheManager; +import org.traccar.storage.Storage; +import org.traccar.storage.StorageException; +import org.traccar.storage.query.Columns; +import org.traccar.storage.query.Condition; +import org.traccar.storage.query.Request; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Singleton +public class ConnectionManager implements BroadcastInterface { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); + + private final long deviceTimeout; + + private final Map<Long, DeviceSession> sessionsByDeviceId = new ConcurrentHashMap<>(); + private final Map<Endpoint, Map<String, DeviceSession>> sessionsByEndpoint = new ConcurrentHashMap<>(); + + private final Config config; + private final CacheManager cacheManager; + private final Storage storage; + private final NotificationManager notificationManager; + private final Timer timer; + private final BroadcastService broadcastService; + private final DeviceLookupService deviceLookupService; + + private final Map<Long, Set<UpdateListener>> listeners = new HashMap<>(); + private final Map<Long, Set<Long>> userDevices = new HashMap<>(); + private final Map<Long, Set<Long>> deviceUsers = new HashMap<>(); + + private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>(); + + @Inject + public ConnectionManager( + Config config, CacheManager cacheManager, Storage storage, + NotificationManager notificationManager, Timer timer, BroadcastService broadcastService, + DeviceLookupService deviceLookupService) { + this.config = config; + this.cacheManager = cacheManager; + this.storage = storage; + this.notificationManager = notificationManager; + this.timer = timer; + this.broadcastService = broadcastService; + this.deviceLookupService = deviceLookupService; + deviceTimeout = config.getLong(Keys.STATUS_TIMEOUT); + broadcastService.registerListener(this); + } + + public DeviceSession getDeviceSession(long deviceId) { + return sessionsByDeviceId.get(deviceId); + } + + public DeviceSession getDeviceSession( + Protocol protocol, Channel channel, SocketAddress remoteAddress, + String... uniqueIds) throws StorageException { + + Endpoint endpoint = new Endpoint(channel, remoteAddress); + Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.getOrDefault( + endpoint, new ConcurrentHashMap<>()); + + uniqueIds = Arrays.stream(uniqueIds).filter(Objects::nonNull).toArray(String[]::new); + if (uniqueIds.length > 0) { + for (String uniqueId : uniqueIds) { + DeviceSession deviceSession = endpointSessions.get(uniqueId); + if (deviceSession != null) { + return deviceSession; + } + } + } else { + return endpointSessions.values().stream().findAny().orElse(null); + } + + Device device = deviceLookupService.lookup(uniqueIds); + + if (device == null && config.getBoolean(Keys.DATABASE_REGISTER_UNKNOWN)) { + device = addUnknownDevice(uniqueIds[0]); + } + + if (device != null) { + device.checkDisabled(); + + DeviceSession oldSession = sessionsByDeviceId.remove(device.getId()); + if (oldSession != null) { + Endpoint oldEndpoint = new Endpoint(oldSession.getChannel(), oldSession.getRemoteAddress()); + Map<String, DeviceSession> oldEndpointSessions = sessionsByEndpoint.get(oldEndpoint); + if (oldEndpointSessions != null && oldEndpointSessions.size() > 1) { + oldEndpointSessions.remove(device.getUniqueId()); + } else { + sessionsByEndpoint.remove(oldEndpoint); + } + } + + DeviceSession deviceSession = new DeviceSession( + device.getId(), device.getUniqueId(), protocol, channel, remoteAddress); + endpointSessions.put(device.getUniqueId(), deviceSession); + sessionsByEndpoint.put(endpoint, endpointSessions); + sessionsByDeviceId.put(device.getId(), deviceSession); + + if (oldSession == null) { + cacheManager.addDevice(device.getId()); + } + + return deviceSession; + } else { + LOGGER.warn("Unknown device - " + String.join(" ", uniqueIds) + + " (" + ((InetSocketAddress) remoteAddress).getHostString() + ")"); + return null; + } + } + + private Device addUnknownDevice(String uniqueId) { + Device device = new Device(); + device.setName(uniqueId); + device.setUniqueId(uniqueId); + device.setCategory(config.getString(Keys.DATABASE_REGISTER_UNKNOWN_DEFAULT_CATEGORY)); + + long defaultGroupId = config.getLong(Keys.DATABASE_REGISTER_UNKNOWN_DEFAULT_GROUP_ID); + if (defaultGroupId != 0) { + device.setGroupId(defaultGroupId); + } + + try { + device.setId(storage.addObject(device, new Request(new Columns.Exclude("id")))); + LOGGER.info("Automatically registered " + uniqueId); + return device; + } catch (StorageException e) { + LOGGER.warn("Automatic registration failed", e); + return null; + } + } + + public void deviceDisconnected(Channel channel, boolean supportsOffline) { + Endpoint endpoint = new Endpoint(channel, channel.remoteAddress()); + Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.remove(endpoint); + if (endpointSessions != null) { + for (DeviceSession deviceSession : endpointSessions.values()) { + if (supportsOffline) { + updateDevice(deviceSession.getDeviceId(), Device.STATUS_OFFLINE, null); + } + sessionsByDeviceId.remove(deviceSession.getDeviceId()); + cacheManager.removeDevice(deviceSession.getDeviceId()); + } + } + } + + public void deviceUnknown(long deviceId) { + updateDevice(deviceId, Device.STATUS_UNKNOWN, null); + removeDeviceSession(deviceId); + } + + private void removeDeviceSession(long deviceId) { + DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId); + if (deviceSession != null) { + cacheManager.removeDevice(deviceId); + Endpoint endpoint = new Endpoint(deviceSession.getChannel(), deviceSession.getRemoteAddress()); + sessionsByEndpoint.computeIfPresent(endpoint, (e, sessions) -> { + sessions.remove(deviceSession.getUniqueId()); + return sessions.isEmpty() ? null : sessions; + }); + } + } + + public void updateDevice(long deviceId, String status, Date time) { + Device device = cacheManager.getObject(Device.class, deviceId); + if (device == null) { + try { + device = storage.getObject(Device.class, new Request( + new Columns.All(), new Condition.Equals("id", deviceId))); + } catch (StorageException e) { + LOGGER.warn("Failed to get device", e); + } + if (device == null) { + return; + } + } + + String oldStatus = device.getStatus(); + device.setStatus(status); + + if (!status.equals(oldStatus)) { + String eventType; + Map<Event, Position> events = new HashMap<>(); + switch (status) { + case Device.STATUS_ONLINE: + eventType = Event.TYPE_DEVICE_ONLINE; + break; + case Device.STATUS_UNKNOWN: + eventType = Event.TYPE_DEVICE_UNKNOWN; + break; + default: + eventType = Event.TYPE_DEVICE_OFFLINE; + break; + } + events.put(new Event(eventType, deviceId), null); + notificationManager.updateEvents(events); + } + + if (time != null) { + device.setLastUpdate(time); + } + + Timeout timeout = timeouts.remove(deviceId); + if (timeout != null) { + timeout.cancel(); + } + + if (status.equals(Device.STATUS_ONLINE)) { + timeouts.put(deviceId, timer.newTimeout(timeout1 -> { + if (!timeout1.isCancelled()) { + deviceUnknown(deviceId); + } + }, deviceTimeout, TimeUnit.SECONDS)); + } + + try { + storage.updateObject(device, new Request( + new Columns.Include("status", "lastUpdate"), + new Condition.Equals("id", deviceId))); + } catch (StorageException e) { + LOGGER.warn("Update device status error", e); + } + + updateDevice(true, device); + } + + public synchronized void sendKeepalive() { + for (Set<UpdateListener> userListeners : listeners.values()) { + for (UpdateListener listener : userListeners) { + listener.onKeepalive(); + } + } + } + + @Override + public synchronized void updateDevice(boolean local, Device device) { + if (local) { + broadcastService.updateDevice(true, device); + } else if (Device.STATUS_ONLINE.equals(device.getStatus())) { + timeouts.remove(device.getId()); + removeDeviceSession(device.getId()); + } + for (long userId : deviceUsers.getOrDefault(device.getId(), Collections.emptySet())) { + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { + listener.onUpdateDevice(device); + } + } + } + } + + @Override + public synchronized void updatePosition(boolean local, Position position) { + if (local) { + broadcastService.updatePosition(true, position); + } + for (long userId : deviceUsers.getOrDefault(position.getDeviceId(), Collections.emptySet())) { + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { + listener.onUpdatePosition(position); + } + } + } + } + + @Override + public synchronized void updateEvent(boolean local, long userId, Event event) { + if (local) { + broadcastService.updateEvent(true, userId, event); + } + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { + listener.onUpdateEvent(event); + } + } + } + + @Override + public synchronized void invalidatePermission( + boolean local, + Class<? extends BaseModel> clazz1, long id1, + Class<? extends BaseModel> clazz2, long id2) { + if (clazz1.equals(User.class) && clazz2.equals(Device.class)) { + if (listeners.containsKey(id1)) { + userDevices.get(id1).add(id2); + deviceUsers.put(id2, new HashSet<>(List.of(id1))); + } + } + } + + public interface UpdateListener { + void onKeepalive(); + void onUpdateDevice(Device device); + void onUpdatePosition(Position position); + void onUpdateEvent(Event event); + } + + public synchronized void addListener(long userId, UpdateListener listener) throws StorageException { + var set = listeners.get(userId); + if (set == null) { + set = new HashSet<>(); + listeners.put(userId, set); + + var devices = storage.getObjects(Device.class, new Request( + new Columns.Include("id"), new Condition.Permission(User.class, userId, Device.class))); + userDevices.put(userId, devices.stream().map(BaseModel::getId).collect(Collectors.toSet())); + devices.forEach(device -> deviceUsers.computeIfAbsent(device.getId(), id -> new HashSet<>()).add(userId)); + } + set.add(listener); + } + + public synchronized void removeListener(long userId, UpdateListener listener) { + var set = listeners.get(userId); + set.remove(listener); + if (set.isEmpty()) { + listeners.remove(userId); + + userDevices.remove(userId).forEach(deviceId -> deviceUsers.computeIfPresent(deviceId, (x, userIds) -> { + userIds.remove(userId); + return userIds.isEmpty() ? null : userIds; + })); + } + } + +} |