diff options
Diffstat (limited to 'src/main/java/org/traccar/session')
10 files changed, 1403 insertions, 0 deletions
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java new file mode 100644 index 000000000..28214840d --- /dev/null +++ b/src/main/java/org/traccar/session/ConnectionManager.java @@ -0,0 +1,376 @@ +/* + * Copyright 2015 - 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session; + +import io.netty.channel.Channel; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.Protocol; +import org.traccar.broadcast.BroadcastInterface; +import org.traccar.broadcast.BroadcastService; +import org.traccar.config.Config; +import org.traccar.config.Keys; +import org.traccar.database.DeviceLookupService; +import org.traccar.database.NotificationManager; +import org.traccar.model.BaseModel; +import org.traccar.model.Device; +import org.traccar.model.Event; +import org.traccar.model.Position; +import org.traccar.model.User; +import org.traccar.session.cache.CacheManager; +import org.traccar.storage.Storage; +import org.traccar.storage.StorageException; +import org.traccar.storage.query.Columns; +import org.traccar.storage.query.Condition; +import org.traccar.storage.query.Request; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Singleton +public class ConnectionManager implements BroadcastInterface { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); + + private final long deviceTimeout; + + private final Map<Long, DeviceSession> sessionsByDeviceId = new ConcurrentHashMap<>(); + private final Map<Endpoint, Map<String, DeviceSession>> sessionsByEndpoint = new ConcurrentHashMap<>(); + + private final Config config; + private final CacheManager cacheManager; + private final Storage storage; + private final NotificationManager notificationManager; + private final Timer timer; + private final BroadcastService broadcastService; + private final DeviceLookupService deviceLookupService; + + private final Map<Long, Set<UpdateListener>> listeners = new HashMap<>(); + private final Map<Long, Set<Long>> userDevices = new HashMap<>(); + private final Map<Long, Set<Long>> deviceUsers = new HashMap<>(); + + private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>(); + + @Inject + public ConnectionManager( + Config config, CacheManager cacheManager, Storage storage, + NotificationManager notificationManager, Timer timer, BroadcastService broadcastService, + DeviceLookupService deviceLookupService) { + this.config = config; + this.cacheManager = cacheManager; + this.storage = storage; + this.notificationManager = notificationManager; + this.timer = timer; + this.broadcastService = broadcastService; + this.deviceLookupService = deviceLookupService; + deviceTimeout = config.getLong(Keys.STATUS_TIMEOUT); + broadcastService.registerListener(this); + } + + public DeviceSession getDeviceSession(long deviceId) { + return sessionsByDeviceId.get(deviceId); + } + + public DeviceSession getDeviceSession( + Protocol protocol, Channel channel, SocketAddress remoteAddress, + String... uniqueIds) throws StorageException { + + Endpoint endpoint = new Endpoint(channel, remoteAddress); + Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.getOrDefault( + endpoint, new ConcurrentHashMap<>()); + + uniqueIds = Arrays.stream(uniqueIds).filter(Objects::nonNull).toArray(String[]::new); + if (uniqueIds.length > 0) { + for (String uniqueId : uniqueIds) { + DeviceSession deviceSession = endpointSessions.get(uniqueId); + if (deviceSession != null) { + return deviceSession; + } + } + } else { + return endpointSessions.values().stream().findAny().orElse(null); + } + + Device device = deviceLookupService.lookup(uniqueIds); + + if (device == null && config.getBoolean(Keys.DATABASE_REGISTER_UNKNOWN)) { + if (uniqueIds[0].matches(config.getString(Keys.DATABASE_REGISTER_UNKNOWN_REGEX))) { + device = addUnknownDevice(uniqueIds[0]); + } + } + + if (device != null) { + device.checkDisabled(); + + DeviceSession oldSession = sessionsByDeviceId.remove(device.getId()); + if (oldSession != null) { + Endpoint oldEndpoint = new Endpoint(oldSession.getChannel(), oldSession.getRemoteAddress()); + Map<String, DeviceSession> oldEndpointSessions = sessionsByEndpoint.get(oldEndpoint); + if (oldEndpointSessions != null && oldEndpointSessions.size() > 1) { + oldEndpointSessions.remove(device.getUniqueId()); + } else { + sessionsByEndpoint.remove(oldEndpoint); + } + } + + DeviceSession deviceSession = new DeviceSession( + device.getId(), device.getUniqueId(), protocol, channel, remoteAddress); + endpointSessions.put(device.getUniqueId(), deviceSession); + sessionsByEndpoint.put(endpoint, endpointSessions); + sessionsByDeviceId.put(device.getId(), deviceSession); + + if (oldSession == null) { + cacheManager.addDevice(device.getId()); + } + + return deviceSession; + } else { + LOGGER.warn("Unknown device - " + String.join(" ", uniqueIds) + + " (" + ((InetSocketAddress) remoteAddress).getHostString() + ")"); + return null; + } + } + + private Device addUnknownDevice(String uniqueId) { + Device device = new Device(); + device.setName(uniqueId); + device.setUniqueId(uniqueId); + device.setCategory(config.getString(Keys.DATABASE_REGISTER_UNKNOWN_DEFAULT_CATEGORY)); + + long defaultGroupId = config.getLong(Keys.DATABASE_REGISTER_UNKNOWN_DEFAULT_GROUP_ID); + if (defaultGroupId != 0) { + device.setGroupId(defaultGroupId); + } + + try { + device.setId(storage.addObject(device, new Request(new Columns.Exclude("id")))); + LOGGER.info("Automatically registered " + uniqueId); + return device; + } catch (StorageException e) { + LOGGER.warn("Automatic registration failed", e); + return null; + } + } + + public void deviceDisconnected(Channel channel, boolean supportsOffline) { + Endpoint endpoint = new Endpoint(channel, channel.remoteAddress()); + Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.remove(endpoint); + if (endpointSessions != null) { + for (DeviceSession deviceSession : endpointSessions.values()) { + if (supportsOffline) { + updateDevice(deviceSession.getDeviceId(), Device.STATUS_OFFLINE, null); + } + sessionsByDeviceId.remove(deviceSession.getDeviceId()); + cacheManager.removeDevice(deviceSession.getDeviceId()); + } + } + } + + public void deviceUnknown(long deviceId) { + updateDevice(deviceId, Device.STATUS_UNKNOWN, null); + removeDeviceSession(deviceId); + } + + private void removeDeviceSession(long deviceId) { + DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId); + if (deviceSession != null) { + cacheManager.removeDevice(deviceId); + Endpoint endpoint = new Endpoint(deviceSession.getChannel(), deviceSession.getRemoteAddress()); + sessionsByEndpoint.computeIfPresent(endpoint, (e, sessions) -> { + sessions.remove(deviceSession.getUniqueId()); + return sessions.isEmpty() ? null : sessions; + }); + } + } + + public void updateDevice(long deviceId, String status, Date time) { + Device device = cacheManager.getObject(Device.class, deviceId); + if (device == null) { + try { + device = storage.getObject(Device.class, new Request( + new Columns.All(), new Condition.Equals("id", deviceId))); + } catch (StorageException e) { + LOGGER.warn("Failed to get device", e); + } + if (device == null) { + return; + } + } + + String oldStatus = device.getStatus(); + device.setStatus(status); + + if (!status.equals(oldStatus)) { + String eventType; + Map<Event, Position> events = new HashMap<>(); + switch (status) { + case Device.STATUS_ONLINE: + eventType = Event.TYPE_DEVICE_ONLINE; + break; + case Device.STATUS_UNKNOWN: + eventType = Event.TYPE_DEVICE_UNKNOWN; + break; + default: + eventType = Event.TYPE_DEVICE_OFFLINE; + break; + } + events.put(new Event(eventType, deviceId), null); + notificationManager.updateEvents(events); + } + + if (time != null) { + device.setLastUpdate(time); + } + + Timeout timeout = timeouts.remove(deviceId); + if (timeout != null) { + timeout.cancel(); + } + + if (status.equals(Device.STATUS_ONLINE)) { + timeouts.put(deviceId, timer.newTimeout(timeout1 -> { + if (!timeout1.isCancelled()) { + deviceUnknown(deviceId); + } + }, deviceTimeout, TimeUnit.SECONDS)); + } + + try { + storage.updateObject(device, new Request( + new Columns.Include("status", "lastUpdate"), + new Condition.Equals("id", deviceId))); + } catch (StorageException e) { + LOGGER.warn("Update device status error", e); + } + + updateDevice(true, device); + } + + public synchronized void sendKeepalive() { + for (Set<UpdateListener> userListeners : listeners.values()) { + for (UpdateListener listener : userListeners) { + listener.onKeepalive(); + } + } + } + + @Override + public synchronized void updateDevice(boolean local, Device device) { + if (local) { + broadcastService.updateDevice(true, device); + } else if (Device.STATUS_ONLINE.equals(device.getStatus())) { + timeouts.remove(device.getId()); + removeDeviceSession(device.getId()); + } + for (long userId : deviceUsers.getOrDefault(device.getId(), Collections.emptySet())) { + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { + listener.onUpdateDevice(device); + } + } + } + } + + @Override + public synchronized void updatePosition(boolean local, Position position) { + if (local) { + broadcastService.updatePosition(true, position); + } + for (long userId : deviceUsers.getOrDefault(position.getDeviceId(), Collections.emptySet())) { + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { + listener.onUpdatePosition(position); + } + } + } + } + + @Override + public synchronized void updateEvent(boolean local, long userId, Event event) { + if (local) { + broadcastService.updateEvent(true, userId, event); + } + if (listeners.containsKey(userId)) { + for (UpdateListener listener : listeners.get(userId)) { + listener.onUpdateEvent(event); + } + } + } + + @Override + public synchronized void invalidatePermission( + boolean local, + Class<? extends BaseModel> clazz1, long id1, + Class<? extends BaseModel> clazz2, long id2) { + if (clazz1.equals(User.class) && clazz2.equals(Device.class)) { + if (listeners.containsKey(id1)) { + userDevices.get(id1).add(id2); + deviceUsers.put(id2, new HashSet<>(List.of(id1))); + } + } + } + + public interface UpdateListener { + void onKeepalive(); + void onUpdateDevice(Device device); + void onUpdatePosition(Position position); + void onUpdateEvent(Event event); + } + + public synchronized void addListener(long userId, UpdateListener listener) throws StorageException { + var set = listeners.get(userId); + if (set == null) { + set = new HashSet<>(); + listeners.put(userId, set); + + var devices = storage.getObjects(Device.class, new Request( + new Columns.Include("id"), new Condition.Permission(User.class, userId, Device.class))); + userDevices.put(userId, devices.stream().map(BaseModel::getId).collect(Collectors.toSet())); + devices.forEach(device -> deviceUsers.computeIfAbsent(device.getId(), id -> new HashSet<>()).add(userId)); + } + set.add(listener); + } + + public synchronized void removeListener(long userId, UpdateListener listener) { + var set = listeners.get(userId); + set.remove(listener); + if (set.isEmpty()) { + listeners.remove(userId); + + userDevices.remove(userId).forEach(deviceId -> deviceUsers.computeIfPresent(deviceId, (x, userIds) -> { + userIds.remove(userId); + return userIds.isEmpty() ? null : userIds; + })); + } + } + +} diff --git a/src/main/java/org/traccar/session/DeviceSession.java b/src/main/java/org/traccar/session/DeviceSession.java new file mode 100644 index 000000000..009f90f5a --- /dev/null +++ b/src/main/java/org/traccar/session/DeviceSession.java @@ -0,0 +1,90 @@ +/* + * Copyright 2016 - 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session; + +import io.netty.channel.Channel; +import io.netty.handler.codec.http.HttpRequestDecoder; +import org.traccar.BasePipelineFactory; +import org.traccar.Protocol; +import org.traccar.model.Command; + +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; + +public class DeviceSession { + + private final long deviceId; + private final String uniqueId; + private final Protocol protocol; + private final Channel channel; + private final SocketAddress remoteAddress; + + public DeviceSession( + long deviceId, String uniqueId, Protocol protocol, Channel channel, SocketAddress remoteAddress) { + this.deviceId = deviceId; + this.uniqueId = uniqueId; + this.protocol = protocol; + this.channel = channel; + this.remoteAddress = remoteAddress; + } + + public long getDeviceId() { + return deviceId; + } + + public String getUniqueId() { + return uniqueId; + } + + public Channel getChannel() { + return channel; + } + + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + public boolean supportsLiveCommands() { + return BasePipelineFactory.getHandler(channel.pipeline(), HttpRequestDecoder.class) == null; + } + + public void sendCommand(Command command) { + protocol.sendDataCommand(channel, remoteAddress, command); + } + + public static final String KEY_TIMEZONE = "timezone"; + + private final Map<String, Object> locals = new HashMap<>(); + + public boolean contains(String key) { + return locals.containsKey(key); + } + + public void set(String key, Object value) { + if (value != null) { + locals.put(key, value); + } else { + locals.remove(key); + } + } + + @SuppressWarnings("unchecked") + public <T> T get(String key) { + return (T) locals.get(key); + } + +} diff --git a/src/main/java/org/traccar/session/Endpoint.java b/src/main/java/org/traccar/session/Endpoint.java new file mode 100644 index 000000000..76aac3444 --- /dev/null +++ b/src/main/java/org/traccar/session/Endpoint.java @@ -0,0 +1,58 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session; + +import io.netty.channel.Channel; + +import java.net.SocketAddress; +import java.util.Objects; + +public class Endpoint { + + private final Channel channel; + private final SocketAddress remoteAddress; + + public Endpoint(Channel channel, SocketAddress remoteAddress) { + this.channel = channel; + this.remoteAddress = remoteAddress; + } + + public Channel getChannel() { + return channel; + } + + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Endpoint endpoint = (Endpoint) o; + return channel.equals(endpoint.channel) && remoteAddress.equals(endpoint.remoteAddress); + } + + @Override + public int hashCode() { + return Objects.hash(channel, remoteAddress); + } + +} diff --git a/src/main/java/org/traccar/session/cache/CacheKey.java b/src/main/java/org/traccar/session/cache/CacheKey.java new file mode 100644 index 000000000..23145e34b --- /dev/null +++ b/src/main/java/org/traccar/session/cache/CacheKey.java @@ -0,0 +1,57 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session.cache; + +import org.traccar.model.BaseModel; + +import java.util.Objects; + +class CacheKey { + + private final Class<? extends BaseModel> clazz; + private final long id; + + CacheKey(BaseModel object) { + this(object.getClass(), object.getId()); + } + + CacheKey(Class<? extends BaseModel> clazz, long id) { + this.clazz = clazz; + this.id = id; + } + + public boolean classIs(Class<? extends BaseModel> clazz) { + return clazz.equals(this.clazz); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return id == cacheKey.id && Objects.equals(clazz, cacheKey.clazz); + } + + @Override + public int hashCode() { + return Objects.hash(clazz, id); + } + +} diff --git a/src/main/java/org/traccar/session/cache/CacheManager.java b/src/main/java/org/traccar/session/cache/CacheManager.java new file mode 100644 index 000000000..58320cf29 --- /dev/null +++ b/src/main/java/org/traccar/session/cache/CacheManager.java @@ -0,0 +1,428 @@ +/* + * Copyright 2022 - 2023 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session.cache; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.broadcast.BroadcastInterface; +import org.traccar.broadcast.BroadcastService; +import org.traccar.config.Config; +import org.traccar.model.Attribute; +import org.traccar.model.BaseModel; +import org.traccar.model.Calendar; +import org.traccar.model.Device; +import org.traccar.model.Driver; +import org.traccar.model.Geofence; +import org.traccar.model.Group; +import org.traccar.model.GroupedModel; +import org.traccar.model.Maintenance; +import org.traccar.model.Notification; +import org.traccar.model.Position; +import org.traccar.model.Schedulable; +import org.traccar.model.Server; +import org.traccar.model.User; +import org.traccar.storage.Storage; +import org.traccar.storage.StorageException; +import org.traccar.storage.query.Columns; +import org.traccar.storage.query.Condition; +import org.traccar.storage.query.Request; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +@Singleton +public class CacheManager implements BroadcastInterface { + + private static final Logger LOGGER = LoggerFactory.getLogger(CacheManager.class); + private static final int GROUP_DEPTH_LIMIT = 3; + private static final Collection<Class<? extends BaseModel>> CLASSES = Arrays.asList( + Attribute.class, Driver.class, Geofence.class, Maintenance.class, Notification.class); + + private final Config config; + private final Storage storage; + private final BroadcastService broadcastService; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private final Map<CacheKey, CacheValue> deviceCache = new HashMap<>(); + private final Map<Long, Integer> deviceReferences = new HashMap<>(); + private final Map<Long, Map<Class<? extends BaseModel>, Set<Long>>> deviceLinks = new HashMap<>(); + private final Map<Long, Position> devicePositions = new HashMap<>(); + + private Server server; + private final Map<Long, List<User>> notificationUsers = new HashMap<>(); + + @Inject + public CacheManager(Config config, Storage storage, BroadcastService broadcastService) throws StorageException { + this.config = config; + this.storage = storage; + this.broadcastService = broadcastService; + invalidateServer(); + invalidateUsers(); + broadcastService.registerListener(this); + } + + public Config getConfig() { + return config; + } + + public <T extends BaseModel> T getObject(Class<T> clazz, long id) { + try { + lock.readLock().lock(); + var cacheValue = deviceCache.get(new CacheKey(clazz, id)); + return cacheValue != null ? cacheValue.getValue() : null; + } finally { + lock.readLock().unlock(); + } + } + + public <T extends BaseModel> List<T> getDeviceObjects(long deviceId, Class<T> clazz) { + try { + lock.readLock().lock(); + var links = deviceLinks.get(deviceId); + if (links != null) { + return links.getOrDefault(clazz, new LinkedHashSet<>()).stream() + .map(id -> { + var cacheValue = deviceCache.get(new CacheKey(clazz, id)); + return cacheValue != null ? cacheValue.<T>getValue() : null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } else { + LOGGER.warn("Device {} cache missing", deviceId); + return Collections.emptyList(); + } + } finally { + lock.readLock().unlock(); + } + } + + public Position getPosition(long deviceId) { + try { + lock.readLock().lock(); + return devicePositions.get(deviceId); + } finally { + lock.readLock().unlock(); + } + } + + public Server getServer() { + try { + lock.readLock().lock(); + return server; + } finally { + lock.readLock().unlock(); + } + } + + public List<User> getNotificationUsers(long notificationId, long deviceId) { + try { + lock.readLock().lock(); + var users = deviceLinks.get(deviceId).get(User.class).stream() + .collect(Collectors.toUnmodifiableSet()); + return notificationUsers.getOrDefault(notificationId, new LinkedList<>()).stream() + .filter(user -> users.contains(user.getId())) + .collect(Collectors.toUnmodifiableList()); + } finally { + lock.readLock().unlock(); + } + } + + public Driver findDriverByUniqueId(long deviceId, String driverUniqueId) { + return getDeviceObjects(deviceId, Driver.class).stream() + .filter(driver -> driver.getUniqueId().equals(driverUniqueId)) + .findFirst() + .orElse(null); + } + + public void addDevice(long deviceId) throws StorageException { + try { + lock.writeLock().lock(); + Integer references = deviceReferences.get(deviceId); + if (references != null) { + references += 1; + } else { + unsafeAddDevice(deviceId); + references = 1; + } + deviceReferences.put(deviceId, references); + } finally { + lock.writeLock().unlock(); + } + } + + public void removeDevice(long deviceId) { + try { + lock.writeLock().lock(); + Integer references = deviceReferences.get(deviceId); + if (references != null) { + references -= 1; + if (references <= 0) { + unsafeRemoveDevice(deviceId); + deviceReferences.remove(deviceId); + } else { + deviceReferences.put(deviceId, references); + } + } + } finally { + lock.writeLock().unlock(); + } + } + + public void updatePosition(Position position) { + try { + lock.writeLock().lock(); + if (deviceLinks.containsKey(position.getDeviceId())) { + devicePositions.put(position.getDeviceId(), position); + } + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void invalidateObject(boolean local, Class<? extends BaseModel> clazz, long id) { + try { + var object = storage.getObject(clazz, new Request( + new Columns.All(), new Condition.Equals("id", id))); + if (object != null) { + updateOrInvalidate(local, object); + } else { + invalidate(clazz, id); + } + } catch (StorageException e) { + throw new RuntimeException(e); + } + } + + public <T extends BaseModel> void updateOrInvalidate(boolean local, T object) throws StorageException { + if (local) { + broadcastService.invalidateObject(true, object.getClass(), object.getId()); + } + + if (object instanceof Server) { + invalidateServer(); + return; + } + if (object instanceof User) { + invalidateUsers(); + return; + } + + boolean invalidate = false; + var before = getObject(object.getClass(), object.getId()); + if (before == null) { + return; + } else if (object instanceof GroupedModel) { + if (((GroupedModel) before).getGroupId() != ((GroupedModel) object).getGroupId()) { + invalidate = true; + } + } else if (object instanceof Schedulable) { + if (((Schedulable) before).getCalendarId() != ((Schedulable) object).getCalendarId()) { + invalidate = true; + } + } + if (invalidate) { + invalidate(object.getClass(), object.getId()); + } else { + try { + lock.writeLock().lock(); + deviceCache.get(new CacheKey(object.getClass(), object.getId())).setValue(object); + } finally { + lock.writeLock().unlock(); + } + } + } + + public <T extends BaseModel> void invalidate(Class<T> clazz, long id) throws StorageException { + invalidate(new CacheKey(clazz, id)); + } + + @Override + public void invalidatePermission( + boolean local, + Class<? extends BaseModel> clazz1, long id1, + Class<? extends BaseModel> clazz2, long id2) { + if (local) { + broadcastService.invalidatePermission(true, clazz1, id1, clazz2, id2); + } + + try { + invalidate(new CacheKey(clazz1, id1), new CacheKey(clazz2, id2)); + } catch (StorageException e) { + throw new RuntimeException(e); + } + } + + private void invalidateServer() throws StorageException { + server = storage.getObject(Server.class, new Request(new Columns.All())); + } + + private void invalidateUsers() throws StorageException { + notificationUsers.clear(); + Map<Long, User> users = new HashMap<>(); + storage.getObjects(User.class, new Request(new Columns.All())) + .forEach(user -> users.put(user.getId(), user)); + storage.getPermissions(User.class, Notification.class).forEach(permission -> { + long notificationId = permission.getPropertyId(); + var user = users.get(permission.getOwnerId()); + notificationUsers.computeIfAbsent(notificationId, k -> new LinkedList<>()).add(user); + }); + } + + private void addObject(long deviceId, BaseModel object) { + deviceCache.computeIfAbsent(new CacheKey(object), k -> new CacheValue(object)).retain(deviceId); + } + + private void unsafeAddDevice(long deviceId) throws StorageException { + Map<Class<? extends BaseModel>, Set<Long>> links = new HashMap<>(); + + Device device = storage.getObject(Device.class, new Request( + new Columns.All(), new Condition.Equals("id", deviceId))); + if (device != null) { + addObject(deviceId, device); + if (device.getCalendarId() > 0) { + var calendar = storage.getObject(Calendar.class, new Request( + new Columns.All(), new Condition.Equals("id", device.getCalendarId()))); + links.computeIfAbsent(Calendar.class, k -> new LinkedHashSet<>()).add(calendar.getId()); + addObject(deviceId, calendar); + } + + int groupDepth = 0; + long groupId = device.getGroupId(); + while (groupDepth < GROUP_DEPTH_LIMIT && groupId > 0) { + Group group = storage.getObject(Group.class, new Request( + new Columns.All(), new Condition.Equals("id", groupId))); + links.computeIfAbsent(Group.class, k -> new LinkedHashSet<>()).add(group.getId()); + addObject(deviceId, group); + groupId = group.getGroupId(); + groupDepth += 1; + } + + for (Class<? extends BaseModel> clazz : CLASSES) { + var objects = storage.getObjects(clazz, new Request( + new Columns.All(), new Condition.Permission(Device.class, deviceId, clazz))); + links.put(clazz, objects.stream().map(BaseModel::getId).collect(Collectors.toSet())); + for (var object : objects) { + addObject(deviceId, object); + if (object instanceof Schedulable) { + var scheduled = (Schedulable) object; + if (scheduled.getCalendarId() > 0) { + var calendar = storage.getObject(Calendar.class, new Request( + new Columns.All(), new Condition.Equals("id", scheduled.getCalendarId()))); + links.computeIfAbsent(Calendar.class, k -> new LinkedHashSet<>()).add(calendar.getId()); + addObject(deviceId, calendar); + } + } + } + } + + var users = storage.getObjects(User.class, new Request( + new Columns.All(), new Condition.Permission(User.class, Device.class, deviceId))); + links.put(User.class, users.stream().map(BaseModel::getId).collect(Collectors.toSet())); + for (var user : users) { + addObject(deviceId, user); + var notifications = storage.getObjects(Notification.class, new Request( + new Columns.All(), + new Condition.Permission(User.class, user.getId(), Notification.class))).stream() + .filter(Notification::getAlways) + .collect(Collectors.toList()); + for (var notification : notifications) { + links.computeIfAbsent(Notification.class, k -> new LinkedHashSet<>()).add(notification.getId()); + addObject(deviceId, notification); + if (notification.getCalendarId() > 0) { + var calendar = storage.getObject(Calendar.class, new Request( + new Columns.All(), new Condition.Equals("id", notification.getCalendarId()))); + links.computeIfAbsent(Calendar.class, k -> new LinkedHashSet<>()).add(calendar.getId()); + addObject(deviceId, calendar); + } + } + } + + deviceLinks.put(deviceId, links); + + if (device.getPositionId() > 0) { + devicePositions.put(deviceId, storage.getObject(Position.class, new Request( + new Columns.All(), new Condition.Equals("id", device.getPositionId())))); + } + } + } + + private void unsafeRemoveDevice(long deviceId) { + deviceCache.remove(new CacheKey(Device.class, deviceId)); + deviceLinks.remove(deviceId).forEach((clazz, ids) -> ids.forEach(id -> { + var key = new CacheKey(clazz, id); + deviceCache.computeIfPresent(key, (k, value) -> { + value.release(deviceId); + return value.getReferences().size() > 0 ? value : null; + }); + })); + devicePositions.remove(deviceId); + } + + private void invalidate(CacheKey... keys) throws StorageException { + try { + lock.writeLock().lock(); + unsafeInvalidate(keys); + } finally { + lock.writeLock().unlock(); + } + } + + private void unsafeInvalidate(CacheKey[] keys) throws StorageException { + boolean invalidateServer = false; + boolean invalidateUsers = false; + Set<Long> linkedDevices = new HashSet<>(); + for (var key : keys) { + if (key.classIs(Server.class)) { + invalidateServer = true; + } else { + if (key.classIs(User.class) || key.classIs(Notification.class)) { + invalidateUsers = true; + } + deviceCache.computeIfPresent(key, (k, value) -> { + linkedDevices.addAll(value.getReferences()); + return value; + }); + } + } + for (long deviceId : linkedDevices) { + unsafeRemoveDevice(deviceId); + unsafeAddDevice(deviceId); + } + if (invalidateServer) { + invalidateServer(); + } + if (invalidateUsers) { + invalidateUsers(); + } + } + +} diff --git a/src/main/java/org/traccar/session/cache/CacheValue.java b/src/main/java/org/traccar/session/cache/CacheValue.java new file mode 100644 index 000000000..1f0383ce5 --- /dev/null +++ b/src/main/java/org/traccar/session/cache/CacheValue.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session.cache; + +import org.traccar.model.BaseModel; + +import java.util.HashSet; +import java.util.Set; + +class CacheValue { + + private BaseModel value; + private final Set<Long> references = new HashSet<>(); + + CacheValue(BaseModel value) { + this.value = value; + } + + public void retain(long deviceId) { + references.add(deviceId); + } + + public void release(long deviceId) { + references.remove(deviceId); + } + + @SuppressWarnings("unchecked") + public <T extends BaseModel> T getValue() { + return (T) value; + } + + public void setValue(BaseModel value) { + this.value = value; + } + + public Set<Long> getReferences() { + return references; + } + +} diff --git a/src/main/java/org/traccar/session/state/MotionProcessor.java b/src/main/java/org/traccar/session/state/MotionProcessor.java new file mode 100644 index 000000000..a1737a739 --- /dev/null +++ b/src/main/java/org/traccar/session/state/MotionProcessor.java @@ -0,0 +1,81 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session.state; + +import org.traccar.model.Event; +import org.traccar.model.Position; +import org.traccar.reports.common.TripsConfig; + +public final class MotionProcessor { + + private MotionProcessor() { + } + + public static void updateState( + MotionState state, Position position, boolean newState, TripsConfig tripsConfig) { + + state.setEvent(null); + + boolean oldState = state.getMotionState(); + if (oldState == newState) { + if (state.getMotionTime() != null) { + long oldTime = state.getMotionTime().getTime(); + long newTime = position.getFixTime().getTime(); + + double distance = position.getDouble(Position.KEY_TOTAL_DISTANCE) - state.getMotionDistance(); + Boolean ignition = null; + if (tripsConfig.getUseIgnition() && position.hasAttribute(Position.KEY_IGNITION)) { + ignition = position.getBoolean(Position.KEY_IGNITION); + } + + boolean generateEvent = false; + if (newState) { + if (newTime - oldTime >= tripsConfig.getMinimalTripDuration() + || distance >= tripsConfig.getMinimalTripDistance()) { + generateEvent = true; + } + } else { + if (newTime - oldTime >= tripsConfig.getMinimalParkingDuration() + || ignition != null && !ignition) { + generateEvent = true; + } + } + + if (generateEvent) { + + String eventType = newState ? Event.TYPE_DEVICE_MOVING : Event.TYPE_DEVICE_STOPPED; + Event event = new Event(eventType, position); + + state.setMotionStreak(newState); + state.setMotionTime(null); + state.setMotionDistance(0); + state.setEvent(event); + + } + } + } else { + state.setMotionState(newState); + if (state.getMotionStreak() == newState) { + state.setMotionTime(null); + state.setMotionDistance(0); + } else { + state.setMotionTime(position.getFixTime()); + state.setMotionDistance(position.getDouble(Position.KEY_TOTAL_DISTANCE)); + } + } + } + +} diff --git a/src/main/java/org/traccar/session/state/MotionState.java b/src/main/java/org/traccar/session/state/MotionState.java new file mode 100644 index 000000000..6c917ad16 --- /dev/null +++ b/src/main/java/org/traccar/session/state/MotionState.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session.state; + +import org.traccar.model.Device; +import org.traccar.model.Event; + +import java.util.Date; + +public class MotionState { + + public static MotionState fromDevice(Device device) { + MotionState state = new MotionState(); + state.motionStreak = device.getMotionStreak(); + state.motionState = device.getMotionState(); + state.motionTime = device.getMotionTime(); + state.motionDistance = device.getMotionDistance(); + return state; + } + + public void toDevice(Device device) { + device.setMotionStreak(motionStreak); + device.setMotionState(motionState); + device.setMotionTime(motionTime); + device.setMotionDistance(motionDistance); + } + + private boolean changed; + + public boolean isChanged() { + return changed; + } + + private boolean motionStreak; + + public boolean getMotionStreak() { + return motionStreak; + } + + public void setMotionStreak(boolean motionStreak) { + this.motionStreak = motionStreak; + changed = true; + } + + private boolean motionState; + + public boolean getMotionState() { + return motionState; + } + + public void setMotionState(boolean motionState) { + this.motionState = motionState; + changed = true; + } + + private Date motionTime; + + public Date getMotionTime() { + return motionTime; + } + + public void setMotionTime(Date motionTime) { + this.motionTime = motionTime; + changed = true; + } + + private double motionDistance; + + public double getMotionDistance() { + return motionDistance; + } + + public void setMotionDistance(double motionDistance) { + this.motionDistance = motionDistance; + changed = true; + } + + private Event event; + + public Event getEvent() { + return event; + } + + public void setEvent(Event event) { + this.event = event; + } + +} diff --git a/src/main/java/org/traccar/session/state/OverspeedProcessor.java b/src/main/java/org/traccar/session/state/OverspeedProcessor.java new file mode 100644 index 000000000..221b51ff5 --- /dev/null +++ b/src/main/java/org/traccar/session/state/OverspeedProcessor.java @@ -0,0 +1,71 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session.state; + +import org.traccar.model.Event; +import org.traccar.model.Position; + +public final class OverspeedProcessor { + + public static final String ATTRIBUTE_SPEED = "speed"; + + private OverspeedProcessor() { + } + + public static void updateState( + OverspeedState state, Position position, + double speedLimit, double multiplier, long minimalDuration, long geofenceId) { + + state.setEvent(null); + + boolean oldState = state.getOverspeedState(); + if (oldState) { + boolean newState = position.getSpeed() > speedLimit * multiplier; + if (newState) { + checkEvent(state, position, speedLimit, minimalDuration); + } else { + state.setOverspeedState(false); + state.setOverspeedTime(null); + state.setOverspeedGeofenceId(0); + } + } else if (position != null && position.getSpeed() > speedLimit * multiplier) { + state.setOverspeedState(true); + state.setOverspeedTime(position.getFixTime()); + state.setOverspeedGeofenceId(geofenceId); + + checkEvent(state, position, speedLimit, minimalDuration); + } + } + + private static void checkEvent(OverspeedState state, Position position, double speedLimit, long minimalDuration) { + if (state.getOverspeedTime() != null) { + long oldTime = state.getOverspeedTime().getTime(); + long newTime = position.getFixTime().getTime(); + if (newTime - oldTime >= minimalDuration) { + + Event event = new Event(Event.TYPE_DEVICE_OVERSPEED, position); + event.set(ATTRIBUTE_SPEED, position.getSpeed()); + event.set(Position.KEY_SPEED_LIMIT, speedLimit); + event.setGeofenceId(state.getOverspeedGeofenceId()); + + state.setOverspeedTime(null); + state.setOverspeedGeofenceId(0); + state.setEvent(event); + + } + } + } +} diff --git a/src/main/java/org/traccar/session/state/OverspeedState.java b/src/main/java/org/traccar/session/state/OverspeedState.java new file mode 100644 index 000000000..340ede6d7 --- /dev/null +++ b/src/main/java/org/traccar/session/state/OverspeedState.java @@ -0,0 +1,88 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.session.state; + +import org.traccar.model.Device; +import org.traccar.model.Event; + +import java.util.Date; + +public class OverspeedState { + + public static OverspeedState fromDevice(Device device) { + OverspeedState state = new OverspeedState(); + state.overspeedState = device.getOverspeedState(); + state.overspeedTime = device.getOverspeedTime(); + state.overspeedGeofenceId = device.getOverspeedGeofenceId(); + return state; + } + + public void toDevice(Device device) { + device.setOverspeedState(overspeedState); + device.setOverspeedTime(overspeedTime); + device.setOverspeedGeofenceId(overspeedGeofenceId); + } + + private boolean changed; + + public boolean isChanged() { + return changed; + } + + private boolean overspeedState; + + public boolean getOverspeedState() { + return overspeedState; + } + + public void setOverspeedState(boolean overspeedState) { + this.overspeedState = overspeedState; + changed = true; + } + + private Date overspeedTime; + + public Date getOverspeedTime() { + return overspeedTime; + } + + public void setOverspeedTime(Date overspeedTime) { + this.overspeedTime = overspeedTime; + changed = true; + } + + private long overspeedGeofenceId; + + public long getOverspeedGeofenceId() { + return overspeedGeofenceId; + } + + public void setOverspeedGeofenceId(long overspeedGeofenceId) { + this.overspeedGeofenceId = overspeedGeofenceId; + changed = true; + } + + private Event event; + + public Event getEvent() { + return event; + } + + public void setEvent(Event event) { + this.event = event; + } + +} |