path: root/src/main/java/org/traccar/session
diff options
Diffstat (limited to 'src/main/java/org/traccar/session')
10 files changed, 1403 insertions, 0 deletions
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java
new file mode 100644
index 000000000..28214840d
--- /dev/null
+++ b/src/main/java/org/traccar/session/ConnectionManager.java
@@ -0,0 +1,376 @@
+ * Copyright 2015 - 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session;
+import io.netty.channel.Channel;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.traccar.Protocol;
+import org.traccar.broadcast.BroadcastInterface;
+import org.traccar.broadcast.BroadcastService;
+import org.traccar.config.Config;
+import org.traccar.config.Keys;
+import org.traccar.database.DeviceLookupService;
+import org.traccar.database.NotificationManager;
+import org.traccar.model.BaseModel;
+import org.traccar.model.Device;
+import org.traccar.model.Event;
+import org.traccar.model.Position;
+import org.traccar.model.User;
+import org.traccar.session.cache.CacheManager;
+import org.traccar.storage.Storage;
+import org.traccar.storage.StorageException;
+import org.traccar.storage.query.Columns;
+import org.traccar.storage.query.Condition;
+import org.traccar.storage.query.Request;
+import jakarta.inject.Inject;
+import jakarta.inject.Singleton;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+public class ConnectionManager implements BroadcastInterface {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
+ private final long deviceTimeout;
+ private final Map<Long, DeviceSession> sessionsByDeviceId = new ConcurrentHashMap<>();
+ private final Map<Endpoint, Map<String, DeviceSession>> sessionsByEndpoint = new ConcurrentHashMap<>();
+ private final Config config;
+ private final CacheManager cacheManager;
+ private final Storage storage;
+ private final NotificationManager notificationManager;
+ private final Timer timer;
+ private final BroadcastService broadcastService;
+ private final DeviceLookupService deviceLookupService;
+ private final Map<Long, Set<UpdateListener>> listeners = new HashMap<>();
+ private final Map<Long, Set<Long>> userDevices = new HashMap<>();
+ private final Map<Long, Set<Long>> deviceUsers = new HashMap<>();
+ private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>();
+ @Inject
+ public ConnectionManager(
+ Config config, CacheManager cacheManager, Storage storage,
+ NotificationManager notificationManager, Timer timer, BroadcastService broadcastService,
+ DeviceLookupService deviceLookupService) {
+ this.config = config;
+ this.cacheManager = cacheManager;
+ this.storage = storage;
+ this.notificationManager = notificationManager;
+ this.timer = timer;
+ this.broadcastService = broadcastService;
+ this.deviceLookupService = deviceLookupService;
+ deviceTimeout = config.getLong(Keys.STATUS_TIMEOUT);
+ broadcastService.registerListener(this);
+ }
+ public DeviceSession getDeviceSession(long deviceId) {
+ return sessionsByDeviceId.get(deviceId);
+ }
+ public DeviceSession getDeviceSession(
+ Protocol protocol, Channel channel, SocketAddress remoteAddress,
+ String... uniqueIds) throws StorageException {
+ Endpoint endpoint = new Endpoint(channel, remoteAddress);
+ Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.getOrDefault(
+ endpoint, new ConcurrentHashMap<>());
+ uniqueIds = Arrays.stream(uniqueIds).filter(Objects::nonNull).toArray(String[]::new);
+ if (uniqueIds.length > 0) {
+ for (String uniqueId : uniqueIds) {
+ DeviceSession deviceSession = endpointSessions.get(uniqueId);
+ if (deviceSession != null) {
+ return deviceSession;
+ }
+ }
+ } else {
+ return endpointSessions.values().stream().findAny().orElse(null);
+ }
+ Device device = deviceLookupService.lookup(uniqueIds);
+ if (device == null && config.getBoolean(Keys.DATABASE_REGISTER_UNKNOWN)) {
+ if (uniqueIds[0].matches(config.getString(Keys.DATABASE_REGISTER_UNKNOWN_REGEX))) {
+ device = addUnknownDevice(uniqueIds[0]);
+ }
+ }
+ if (device != null) {
+ device.checkDisabled();
+ DeviceSession oldSession = sessionsByDeviceId.remove(device.getId());
+ if (oldSession != null) {
+ Endpoint oldEndpoint = new Endpoint(oldSession.getChannel(), oldSession.getRemoteAddress());
+ Map<String, DeviceSession> oldEndpointSessions = sessionsByEndpoint.get(oldEndpoint);
+ if (oldEndpointSessions != null && oldEndpointSessions.size() > 1) {
+ oldEndpointSessions.remove(device.getUniqueId());
+ } else {
+ sessionsByEndpoint.remove(oldEndpoint);
+ }
+ }
+ DeviceSession deviceSession = new DeviceSession(
+ device.getId(), device.getUniqueId(), protocol, channel, remoteAddress);
+ endpointSessions.put(device.getUniqueId(), deviceSession);
+ sessionsByEndpoint.put(endpoint, endpointSessions);
+ sessionsByDeviceId.put(device.getId(), deviceSession);
+ if (oldSession == null) {
+ cacheManager.addDevice(device.getId());
+ }
+ return deviceSession;
+ } else {
+ LOGGER.warn("Unknown device - " + String.join(" ", uniqueIds)
+ + " (" + ((InetSocketAddress) remoteAddress).getHostString() + ")");
+ return null;
+ }
+ }
+ private Device addUnknownDevice(String uniqueId) {
+ Device device = new Device();
+ device.setName(uniqueId);
+ device.setUniqueId(uniqueId);
+ device.setCategory(config.getString(Keys.DATABASE_REGISTER_UNKNOWN_DEFAULT_CATEGORY));
+ long defaultGroupId = config.getLong(Keys.DATABASE_REGISTER_UNKNOWN_DEFAULT_GROUP_ID);
+ if (defaultGroupId != 0) {
+ device.setGroupId(defaultGroupId);
+ }
+ try {
+ device.setId(storage.addObject(device, new Request(new Columns.Exclude("id"))));
+ LOGGER.info("Automatically registered " + uniqueId);
+ return device;
+ } catch (StorageException e) {
+ LOGGER.warn("Automatic registration failed", e);
+ return null;
+ }
+ }
+ public void deviceDisconnected(Channel channel, boolean supportsOffline) {
+ Endpoint endpoint = new Endpoint(channel, channel.remoteAddress());
+ Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.remove(endpoint);
+ if (endpointSessions != null) {
+ for (DeviceSession deviceSession : endpointSessions.values()) {
+ if (supportsOffline) {
+ updateDevice(deviceSession.getDeviceId(), Device.STATUS_OFFLINE, null);
+ }
+ sessionsByDeviceId.remove(deviceSession.getDeviceId());
+ cacheManager.removeDevice(deviceSession.getDeviceId());
+ }
+ }
+ }
+ public void deviceUnknown(long deviceId) {
+ updateDevice(deviceId, Device.STATUS_UNKNOWN, null);
+ removeDeviceSession(deviceId);
+ }
+ private void removeDeviceSession(long deviceId) {
+ DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId);
+ if (deviceSession != null) {
+ cacheManager.removeDevice(deviceId);
+ Endpoint endpoint = new Endpoint(deviceSession.getChannel(), deviceSession.getRemoteAddress());
+ sessionsByEndpoint.computeIfPresent(endpoint, (e, sessions) -> {
+ sessions.remove(deviceSession.getUniqueId());
+ return sessions.isEmpty() ? null : sessions;
+ });
+ }
+ }
+ public void updateDevice(long deviceId, String status, Date time) {
+ Device device = cacheManager.getObject(Device.class, deviceId);
+ if (device == null) {
+ try {
+ device = storage.getObject(Device.class, new Request(
+ new Columns.All(), new Condition.Equals("id", deviceId)));
+ } catch (StorageException e) {
+ LOGGER.warn("Failed to get device", e);
+ }
+ if (device == null) {
+ return;
+ }
+ }
+ String oldStatus = device.getStatus();
+ device.setStatus(status);
+ if (!status.equals(oldStatus)) {
+ String eventType;
+ Map<Event, Position> events = new HashMap<>();
+ switch (status) {
+ case Device.STATUS_ONLINE:
+ eventType = Event.TYPE_DEVICE_ONLINE;
+ break;
+ case Device.STATUS_UNKNOWN:
+ eventType = Event.TYPE_DEVICE_UNKNOWN;
+ break;
+ default:
+ eventType = Event.TYPE_DEVICE_OFFLINE;
+ break;
+ }
+ events.put(new Event(eventType, deviceId), null);
+ notificationManager.updateEvents(events);
+ }
+ if (time != null) {
+ device.setLastUpdate(time);
+ }
+ Timeout timeout = timeouts.remove(deviceId);
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ if (status.equals(Device.STATUS_ONLINE)) {
+ timeouts.put(deviceId, timer.newTimeout(timeout1 -> {
+ if (!timeout1.isCancelled()) {
+ deviceUnknown(deviceId);
+ }
+ }, deviceTimeout, TimeUnit.SECONDS));
+ }
+ try {
+ storage.updateObject(device, new Request(
+ new Columns.Include("status", "lastUpdate"),
+ new Condition.Equals("id", deviceId)));
+ } catch (StorageException e) {
+ LOGGER.warn("Update device status error", e);
+ }
+ updateDevice(true, device);
+ }
+ public synchronized void sendKeepalive() {
+ for (Set<UpdateListener> userListeners : listeners.values()) {
+ for (UpdateListener listener : userListeners) {
+ listener.onKeepalive();
+ }
+ }
+ }
+ @Override
+ public synchronized void updateDevice(boolean local, Device device) {
+ if (local) {
+ broadcastService.updateDevice(true, device);
+ } else if (Device.STATUS_ONLINE.equals(device.getStatus())) {
+ timeouts.remove(device.getId());
+ removeDeviceSession(device.getId());
+ }
+ for (long userId : deviceUsers.getOrDefault(device.getId(), Collections.emptySet())) {
+ if (listeners.containsKey(userId)) {
+ for (UpdateListener listener : listeners.get(userId)) {
+ listener.onUpdateDevice(device);
+ }
+ }
+ }
+ }
+ @Override
+ public synchronized void updatePosition(boolean local, Position position) {
+ if (local) {
+ broadcastService.updatePosition(true, position);
+ }
+ for (long userId : deviceUsers.getOrDefault(position.getDeviceId(), Collections.emptySet())) {
+ if (listeners.containsKey(userId)) {
+ for (UpdateListener listener : listeners.get(userId)) {
+ listener.onUpdatePosition(position);
+ }
+ }
+ }
+ }
+ @Override
+ public synchronized void updateEvent(boolean local, long userId, Event event) {
+ if (local) {
+ broadcastService.updateEvent(true, userId, event);
+ }
+ if (listeners.containsKey(userId)) {
+ for (UpdateListener listener : listeners.get(userId)) {
+ listener.onUpdateEvent(event);
+ }
+ }
+ }
+ @Override
+ public synchronized void invalidatePermission(
+ boolean local,
+ Class<? extends BaseModel> clazz1, long id1,
+ Class<? extends BaseModel> clazz2, long id2) {
+ if (clazz1.equals(User.class) && clazz2.equals(Device.class)) {
+ if (listeners.containsKey(id1)) {
+ userDevices.get(id1).add(id2);
+ deviceUsers.put(id2, new HashSet<>(List.of(id1)));
+ }
+ }
+ }
+ public interface UpdateListener {
+ void onKeepalive();
+ void onUpdateDevice(Device device);
+ void onUpdatePosition(Position position);
+ void onUpdateEvent(Event event);
+ }
+ public synchronized void addListener(long userId, UpdateListener listener) throws StorageException {
+ var set = listeners.get(userId);
+ if (set == null) {
+ set = new HashSet<>();
+ listeners.put(userId, set);
+ var devices = storage.getObjects(Device.class, new Request(
+ new Columns.Include("id"), new Condition.Permission(User.class, userId, Device.class)));
+ userDevices.put(userId, devices.stream().map(BaseModel::getId).collect(Collectors.toSet()));
+ devices.forEach(device -> deviceUsers.computeIfAbsent(device.getId(), id -> new HashSet<>()).add(userId));
+ }
+ set.add(listener);
+ }
+ public synchronized void removeListener(long userId, UpdateListener listener) {
+ var set = listeners.get(userId);
+ set.remove(listener);
+ if (set.isEmpty()) {
+ listeners.remove(userId);
+ userDevices.remove(userId).forEach(deviceId -> deviceUsers.computeIfPresent(deviceId, (x, userIds) -> {
+ userIds.remove(userId);
+ return userIds.isEmpty() ? null : userIds;
+ }));
+ }
+ }
diff --git a/src/main/java/org/traccar/session/DeviceSession.java b/src/main/java/org/traccar/session/DeviceSession.java
new file mode 100644
index 000000000..009f90f5a
--- /dev/null
+++ b/src/main/java/org/traccar/session/DeviceSession.java
@@ -0,0 +1,90 @@
+ * Copyright 2016 - 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import org.traccar.BasePipelineFactory;
+import org.traccar.Protocol;
+import org.traccar.model.Command;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+public class DeviceSession {
+ private final long deviceId;
+ private final String uniqueId;
+ private final Protocol protocol;
+ private final Channel channel;
+ private final SocketAddress remoteAddress;
+ public DeviceSession(
+ long deviceId, String uniqueId, Protocol protocol, Channel channel, SocketAddress remoteAddress) {
+ this.deviceId = deviceId;
+ this.uniqueId = uniqueId;
+ this.protocol = protocol;
+ this.channel = channel;
+ this.remoteAddress = remoteAddress;
+ }
+ public long getDeviceId() {
+ return deviceId;
+ }
+ public String getUniqueId() {
+ return uniqueId;
+ }
+ public Channel getChannel() {
+ return channel;
+ }
+ public SocketAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+ public boolean supportsLiveCommands() {
+ return BasePipelineFactory.getHandler(channel.pipeline(), HttpRequestDecoder.class) == null;
+ }
+ public void sendCommand(Command command) {
+ protocol.sendDataCommand(channel, remoteAddress, command);
+ }
+ public static final String KEY_TIMEZONE = "timezone";
+ private final Map<String, Object> locals = new HashMap<>();
+ public boolean contains(String key) {
+ return locals.containsKey(key);
+ }
+ public void set(String key, Object value) {
+ if (value != null) {
+ locals.put(key, value);
+ } else {
+ locals.remove(key);
+ }
+ }
+ @SuppressWarnings("unchecked")
+ public <T> T get(String key) {
+ return (T) locals.get(key);
+ }
diff --git a/src/main/java/org/traccar/session/Endpoint.java b/src/main/java/org/traccar/session/Endpoint.java
new file mode 100644
index 000000000..76aac3444
--- /dev/null
+++ b/src/main/java/org/traccar/session/Endpoint.java
@@ -0,0 +1,58 @@
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session;
+import io.netty.channel.Channel;
+import java.net.SocketAddress;
+import java.util.Objects;
+public class Endpoint {
+ private final Channel channel;
+ private final SocketAddress remoteAddress;
+ public Endpoint(Channel channel, SocketAddress remoteAddress) {
+ this.channel = channel;
+ this.remoteAddress = remoteAddress;
+ }
+ public Channel getChannel() {
+ return channel;
+ }
+ public SocketAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Endpoint endpoint = (Endpoint) o;
+ return channel.equals(endpoint.channel) && remoteAddress.equals(endpoint.remoteAddress);
+ }
+ @Override
+ public int hashCode() {
+ return Objects.hash(channel, remoteAddress);
+ }
diff --git a/src/main/java/org/traccar/session/cache/CacheKey.java b/src/main/java/org/traccar/session/cache/CacheKey.java
new file mode 100644
index 000000000..23145e34b
--- /dev/null
+++ b/src/main/java/org/traccar/session/cache/CacheKey.java
@@ -0,0 +1,57 @@
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session.cache;
+import org.traccar.model.BaseModel;
+import java.util.Objects;
+class CacheKey {
+ private final Class<? extends BaseModel> clazz;
+ private final long id;
+ CacheKey(BaseModel object) {
+ this(object.getClass(), object.getId());
+ }
+ CacheKey(Class<? extends BaseModel> clazz, long id) {
+ this.clazz = clazz;
+ this.id = id;
+ }
+ public boolean classIs(Class<? extends BaseModel> clazz) {
+ return clazz.equals(this.clazz);
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ return id == cacheKey.id && Objects.equals(clazz, cacheKey.clazz);
+ }
+ @Override
+ public int hashCode() {
+ return Objects.hash(clazz, id);
+ }
diff --git a/src/main/java/org/traccar/session/cache/CacheManager.java b/src/main/java/org/traccar/session/cache/CacheManager.java
new file mode 100644
index 000000000..58320cf29
--- /dev/null
+++ b/src/main/java/org/traccar/session/cache/CacheManager.java
@@ -0,0 +1,428 @@
+ * Copyright 2022 - 2023 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session.cache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.traccar.broadcast.BroadcastInterface;
+import org.traccar.broadcast.BroadcastService;
+import org.traccar.config.Config;
+import org.traccar.model.Attribute;
+import org.traccar.model.BaseModel;
+import org.traccar.model.Calendar;
+import org.traccar.model.Device;
+import org.traccar.model.Driver;
+import org.traccar.model.Geofence;
+import org.traccar.model.Group;
+import org.traccar.model.GroupedModel;
+import org.traccar.model.Maintenance;
+import org.traccar.model.Notification;
+import org.traccar.model.Position;
+import org.traccar.model.Schedulable;
+import org.traccar.model.Server;
+import org.traccar.model.User;
+import org.traccar.storage.Storage;
+import org.traccar.storage.StorageException;
+import org.traccar.storage.query.Columns;
+import org.traccar.storage.query.Condition;
+import org.traccar.storage.query.Request;
+import jakarta.inject.Inject;
+import jakarta.inject.Singleton;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+public class CacheManager implements BroadcastInterface {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CacheManager.class);
+ private static final int GROUP_DEPTH_LIMIT = 3;
+ private static final Collection<Class<? extends BaseModel>> CLASSES = Arrays.asList(
+ Attribute.class, Driver.class, Geofence.class, Maintenance.class, Notification.class);
+ private final Config config;
+ private final Storage storage;
+ private final BroadcastService broadcastService;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Map<CacheKey, CacheValue> deviceCache = new HashMap<>();
+ private final Map<Long, Integer> deviceReferences = new HashMap<>();
+ private final Map<Long, Map<Class<? extends BaseModel>, Set<Long>>> deviceLinks = new HashMap<>();
+ private final Map<Long, Position> devicePositions = new HashMap<>();
+ private Server server;
+ private final Map<Long, List<User>> notificationUsers = new HashMap<>();
+ @Inject
+ public CacheManager(Config config, Storage storage, BroadcastService broadcastService) throws StorageException {
+ this.config = config;
+ this.storage = storage;
+ this.broadcastService = broadcastService;
+ invalidateServer();
+ invalidateUsers();
+ broadcastService.registerListener(this);
+ }
+ public Config getConfig() {
+ return config;
+ }
+ public <T extends BaseModel> T getObject(Class<T> clazz, long id) {
+ try {
+ lock.readLock().lock();
+ var cacheValue = deviceCache.get(new CacheKey(clazz, id));
+ return cacheValue != null ? cacheValue.getValue() : null;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ public <T extends BaseModel> List<T> getDeviceObjects(long deviceId, Class<T> clazz) {
+ try {
+ lock.readLock().lock();
+ var links = deviceLinks.get(deviceId);
+ if (links != null) {
+ return links.getOrDefault(clazz, new LinkedHashSet<>()).stream()
+ .map(id -> {
+ var cacheValue = deviceCache.get(new CacheKey(clazz, id));
+ return cacheValue != null ? cacheValue.<T>getValue() : null;
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ } else {
+ LOGGER.warn("Device {} cache missing", deviceId);
+ return Collections.emptyList();
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ public Position getPosition(long deviceId) {
+ try {
+ lock.readLock().lock();
+ return devicePositions.get(deviceId);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ public Server getServer() {
+ try {
+ lock.readLock().lock();
+ return server;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ public List<User> getNotificationUsers(long notificationId, long deviceId) {
+ try {
+ lock.readLock().lock();
+ var users = deviceLinks.get(deviceId).get(User.class).stream()
+ .collect(Collectors.toUnmodifiableSet());
+ return notificationUsers.getOrDefault(notificationId, new LinkedList<>()).stream()
+ .filter(user -> users.contains(user.getId()))
+ .collect(Collectors.toUnmodifiableList());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ public Driver findDriverByUniqueId(long deviceId, String driverUniqueId) {
+ return getDeviceObjects(deviceId, Driver.class).stream()
+ .filter(driver -> driver.getUniqueId().equals(driverUniqueId))
+ .findFirst()
+ .orElse(null);
+ }
+ public void addDevice(long deviceId) throws StorageException {
+ try {
+ lock.writeLock().lock();
+ Integer references = deviceReferences.get(deviceId);
+ if (references != null) {
+ references += 1;
+ } else {
+ unsafeAddDevice(deviceId);
+ references = 1;
+ }
+ deviceReferences.put(deviceId, references);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ public void removeDevice(long deviceId) {
+ try {
+ lock.writeLock().lock();
+ Integer references = deviceReferences.get(deviceId);
+ if (references != null) {
+ references -= 1;
+ if (references <= 0) {
+ unsafeRemoveDevice(deviceId);
+ deviceReferences.remove(deviceId);
+ } else {
+ deviceReferences.put(deviceId, references);
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ public void updatePosition(Position position) {
+ try {
+ lock.writeLock().lock();
+ if (deviceLinks.containsKey(position.getDeviceId())) {
+ devicePositions.put(position.getDeviceId(), position);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ @Override
+ public void invalidateObject(boolean local, Class<? extends BaseModel> clazz, long id) {
+ try {
+ var object = storage.getObject(clazz, new Request(
+ new Columns.All(), new Condition.Equals("id", id)));
+ if (object != null) {
+ updateOrInvalidate(local, object);
+ } else {
+ invalidate(clazz, id);
+ }
+ } catch (StorageException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ public <T extends BaseModel> void updateOrInvalidate(boolean local, T object) throws StorageException {
+ if (local) {
+ broadcastService.invalidateObject(true, object.getClass(), object.getId());
+ }
+ if (object instanceof Server) {
+ invalidateServer();
+ return;
+ }
+ if (object instanceof User) {
+ invalidateUsers();
+ return;
+ }
+ boolean invalidate = false;
+ var before = getObject(object.getClass(), object.getId());
+ if (before == null) {
+ return;
+ } else if (object instanceof GroupedModel) {
+ if (((GroupedModel) before).getGroupId() != ((GroupedModel) object).getGroupId()) {
+ invalidate = true;
+ }
+ } else if (object instanceof Schedulable) {
+ if (((Schedulable) before).getCalendarId() != ((Schedulable) object).getCalendarId()) {
+ invalidate = true;
+ }
+ }
+ if (invalidate) {
+ invalidate(object.getClass(), object.getId());
+ } else {
+ try {
+ lock.writeLock().lock();
+ deviceCache.get(new CacheKey(object.getClass(), object.getId())).setValue(object);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ }
+ public <T extends BaseModel> void invalidate(Class<T> clazz, long id) throws StorageException {
+ invalidate(new CacheKey(clazz, id));
+ }
+ @Override
+ public void invalidatePermission(
+ boolean local,
+ Class<? extends BaseModel> clazz1, long id1,
+ Class<? extends BaseModel> clazz2, long id2) {
+ if (local) {
+ broadcastService.invalidatePermission(true, clazz1, id1, clazz2, id2);
+ }
+ try {
+ invalidate(new CacheKey(clazz1, id1), new CacheKey(clazz2, id2));
+ } catch (StorageException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ private void invalidateServer() throws StorageException {
+ server = storage.getObject(Server.class, new Request(new Columns.All()));
+ }
+ private void invalidateUsers() throws StorageException {
+ notificationUsers.clear();
+ Map<Long, User> users = new HashMap<>();
+ storage.getObjects(User.class, new Request(new Columns.All()))
+ .forEach(user -> users.put(user.getId(), user));
+ storage.getPermissions(User.class, Notification.class).forEach(permission -> {
+ long notificationId = permission.getPropertyId();
+ var user = users.get(permission.getOwnerId());
+ notificationUsers.computeIfAbsent(notificationId, k -> new LinkedList<>()).add(user);
+ });
+ }
+ private void addObject(long deviceId, BaseModel object) {
+ deviceCache.computeIfAbsent(new CacheKey(object), k -> new CacheValue(object)).retain(deviceId);
+ }
+ private void unsafeAddDevice(long deviceId) throws StorageException {
+ Map<Class<? extends BaseModel>, Set<Long>> links = new HashMap<>();
+ Device device = storage.getObject(Device.class, new Request(
+ new Columns.All(), new Condition.Equals("id", deviceId)));
+ if (device != null) {
+ addObject(deviceId, device);
+ if (device.getCalendarId() > 0) {
+ var calendar = storage.getObject(Calendar.class, new Request(
+ new Columns.All(), new Condition.Equals("id", device.getCalendarId())));
+ links.computeIfAbsent(Calendar.class, k -> new LinkedHashSet<>()).add(calendar.getId());
+ addObject(deviceId, calendar);
+ }
+ int groupDepth = 0;
+ long groupId = device.getGroupId();
+ while (groupDepth < GROUP_DEPTH_LIMIT && groupId > 0) {
+ Group group = storage.getObject(Group.class, new Request(
+ new Columns.All(), new Condition.Equals("id", groupId)));
+ links.computeIfAbsent(Group.class, k -> new LinkedHashSet<>()).add(group.getId());
+ addObject(deviceId, group);
+ groupId = group.getGroupId();
+ groupDepth += 1;
+ }
+ for (Class<? extends BaseModel> clazz : CLASSES) {
+ var objects = storage.getObjects(clazz, new Request(
+ new Columns.All(), new Condition.Permission(Device.class, deviceId, clazz)));
+ links.put(clazz, objects.stream().map(BaseModel::getId).collect(Collectors.toSet()));
+ for (var object : objects) {
+ addObject(deviceId, object);
+ if (object instanceof Schedulable) {
+ var scheduled = (Schedulable) object;
+ if (scheduled.getCalendarId() > 0) {
+ var calendar = storage.getObject(Calendar.class, new Request(
+ new Columns.All(), new Condition.Equals("id", scheduled.getCalendarId())));
+ links.computeIfAbsent(Calendar.class, k -> new LinkedHashSet<>()).add(calendar.getId());
+ addObject(deviceId, calendar);
+ }
+ }
+ }
+ }
+ var users = storage.getObjects(User.class, new Request(
+ new Columns.All(), new Condition.Permission(User.class, Device.class, deviceId)));
+ links.put(User.class, users.stream().map(BaseModel::getId).collect(Collectors.toSet()));
+ for (var user : users) {
+ addObject(deviceId, user);
+ var notifications = storage.getObjects(Notification.class, new Request(
+ new Columns.All(),
+ new Condition.Permission(User.class, user.getId(), Notification.class))).stream()
+ .filter(Notification::getAlways)
+ .collect(Collectors.toList());
+ for (var notification : notifications) {
+ links.computeIfAbsent(Notification.class, k -> new LinkedHashSet<>()).add(notification.getId());
+ addObject(deviceId, notification);
+ if (notification.getCalendarId() > 0) {
+ var calendar = storage.getObject(Calendar.class, new Request(
+ new Columns.All(), new Condition.Equals("id", notification.getCalendarId())));
+ links.computeIfAbsent(Calendar.class, k -> new LinkedHashSet<>()).add(calendar.getId());
+ addObject(deviceId, calendar);
+ }
+ }
+ }
+ deviceLinks.put(deviceId, links);
+ if (device.getPositionId() > 0) {
+ devicePositions.put(deviceId, storage.getObject(Position.class, new Request(
+ new Columns.All(), new Condition.Equals("id", device.getPositionId()))));
+ }
+ }
+ }
+ private void unsafeRemoveDevice(long deviceId) {
+ deviceCache.remove(new CacheKey(Device.class, deviceId));
+ deviceLinks.remove(deviceId).forEach((clazz, ids) -> ids.forEach(id -> {
+ var key = new CacheKey(clazz, id);
+ deviceCache.computeIfPresent(key, (k, value) -> {
+ value.release(deviceId);
+ return value.getReferences().size() > 0 ? value : null;
+ });
+ }));
+ devicePositions.remove(deviceId);
+ }
+ private void invalidate(CacheKey... keys) throws StorageException {
+ try {
+ lock.writeLock().lock();
+ unsafeInvalidate(keys);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ private void unsafeInvalidate(CacheKey[] keys) throws StorageException {
+ boolean invalidateServer = false;
+ boolean invalidateUsers = false;
+ Set<Long> linkedDevices = new HashSet<>();
+ for (var key : keys) {
+ if (key.classIs(Server.class)) {
+ invalidateServer = true;
+ } else {
+ if (key.classIs(User.class) || key.classIs(Notification.class)) {
+ invalidateUsers = true;
+ }
+ deviceCache.computeIfPresent(key, (k, value) -> {
+ linkedDevices.addAll(value.getReferences());
+ return value;
+ });
+ }
+ }
+ for (long deviceId : linkedDevices) {
+ unsafeRemoveDevice(deviceId);
+ unsafeAddDevice(deviceId);
+ }
+ if (invalidateServer) {
+ invalidateServer();
+ }
+ if (invalidateUsers) {
+ invalidateUsers();
+ }
+ }
diff --git a/src/main/java/org/traccar/session/cache/CacheValue.java b/src/main/java/org/traccar/session/cache/CacheValue.java
new file mode 100644
index 000000000..1f0383ce5
--- /dev/null
+++ b/src/main/java/org/traccar/session/cache/CacheValue.java
@@ -0,0 +1,53 @@
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session.cache;
+import org.traccar.model.BaseModel;
+import java.util.HashSet;
+import java.util.Set;
+class CacheValue {
+ private BaseModel value;
+ private final Set<Long> references = new HashSet<>();
+ CacheValue(BaseModel value) {
+ this.value = value;
+ }
+ public void retain(long deviceId) {
+ references.add(deviceId);
+ }
+ public void release(long deviceId) {
+ references.remove(deviceId);
+ }
+ @SuppressWarnings("unchecked")
+ public <T extends BaseModel> T getValue() {
+ return (T) value;
+ }
+ public void setValue(BaseModel value) {
+ this.value = value;
+ }
+ public Set<Long> getReferences() {
+ return references;
+ }
diff --git a/src/main/java/org/traccar/session/state/MotionProcessor.java b/src/main/java/org/traccar/session/state/MotionProcessor.java
new file mode 100644
index 000000000..a1737a739
--- /dev/null
+++ b/src/main/java/org/traccar/session/state/MotionProcessor.java
@@ -0,0 +1,81 @@
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session.state;
+import org.traccar.model.Event;
+import org.traccar.model.Position;
+import org.traccar.reports.common.TripsConfig;
+public final class MotionProcessor {
+ private MotionProcessor() {
+ }
+ public static void updateState(
+ MotionState state, Position position, boolean newState, TripsConfig tripsConfig) {
+ state.setEvent(null);
+ boolean oldState = state.getMotionState();
+ if (oldState == newState) {
+ if (state.getMotionTime() != null) {
+ long oldTime = state.getMotionTime().getTime();
+ long newTime = position.getFixTime().getTime();
+ double distance = position.getDouble(Position.KEY_TOTAL_DISTANCE) - state.getMotionDistance();
+ Boolean ignition = null;
+ if (tripsConfig.getUseIgnition() && position.hasAttribute(Position.KEY_IGNITION)) {
+ ignition = position.getBoolean(Position.KEY_IGNITION);
+ }
+ boolean generateEvent = false;
+ if (newState) {
+ if (newTime - oldTime >= tripsConfig.getMinimalTripDuration()
+ || distance >= tripsConfig.getMinimalTripDistance()) {
+ generateEvent = true;
+ }
+ } else {
+ if (newTime - oldTime >= tripsConfig.getMinimalParkingDuration()
+ || ignition != null && !ignition) {
+ generateEvent = true;
+ }
+ }
+ if (generateEvent) {
+ String eventType = newState ? Event.TYPE_DEVICE_MOVING : Event.TYPE_DEVICE_STOPPED;
+ Event event = new Event(eventType, position);
+ state.setMotionStreak(newState);
+ state.setMotionTime(null);
+ state.setMotionDistance(0);
+ state.setEvent(event);
+ }
+ }
+ } else {
+ state.setMotionState(newState);
+ if (state.getMotionStreak() == newState) {
+ state.setMotionTime(null);
+ state.setMotionDistance(0);
+ } else {
+ state.setMotionTime(position.getFixTime());
+ state.setMotionDistance(position.getDouble(Position.KEY_TOTAL_DISTANCE));
+ }
+ }
+ }
diff --git a/src/main/java/org/traccar/session/state/MotionState.java b/src/main/java/org/traccar/session/state/MotionState.java
new file mode 100644
index 000000000..6c917ad16
--- /dev/null
+++ b/src/main/java/org/traccar/session/state/MotionState.java
@@ -0,0 +1,101 @@
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session.state;
+import org.traccar.model.Device;
+import org.traccar.model.Event;
+import java.util.Date;
+public class MotionState {
+ public static MotionState fromDevice(Device device) {
+ MotionState state = new MotionState();
+ state.motionStreak = device.getMotionStreak();
+ state.motionState = device.getMotionState();
+ state.motionTime = device.getMotionTime();
+ state.motionDistance = device.getMotionDistance();
+ return state;
+ }
+ public void toDevice(Device device) {
+ device.setMotionStreak(motionStreak);
+ device.setMotionState(motionState);
+ device.setMotionTime(motionTime);
+ device.setMotionDistance(motionDistance);
+ }
+ private boolean changed;
+ public boolean isChanged() {
+ return changed;
+ }
+ private boolean motionStreak;
+ public boolean getMotionStreak() {
+ return motionStreak;
+ }
+ public void setMotionStreak(boolean motionStreak) {
+ this.motionStreak = motionStreak;
+ changed = true;
+ }
+ private boolean motionState;
+ public boolean getMotionState() {
+ return motionState;
+ }
+ public void setMotionState(boolean motionState) {
+ this.motionState = motionState;
+ changed = true;
+ }
+ private Date motionTime;
+ public Date getMotionTime() {
+ return motionTime;
+ }
+ public void setMotionTime(Date motionTime) {
+ this.motionTime = motionTime;
+ changed = true;
+ }
+ private double motionDistance;
+ public double getMotionDistance() {
+ return motionDistance;
+ }
+ public void setMotionDistance(double motionDistance) {
+ this.motionDistance = motionDistance;
+ changed = true;
+ }
+ private Event event;
+ public Event getEvent() {
+ return event;
+ }
+ public void setEvent(Event event) {
+ this.event = event;
+ }
diff --git a/src/main/java/org/traccar/session/state/OverspeedProcessor.java b/src/main/java/org/traccar/session/state/OverspeedProcessor.java
new file mode 100644
index 000000000..221b51ff5
--- /dev/null
+++ b/src/main/java/org/traccar/session/state/OverspeedProcessor.java
@@ -0,0 +1,71 @@
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session.state;
+import org.traccar.model.Event;
+import org.traccar.model.Position;
+public final class OverspeedProcessor {
+ public static final String ATTRIBUTE_SPEED = "speed";
+ private OverspeedProcessor() {
+ }
+ public static void updateState(
+ OverspeedState state, Position position,
+ double speedLimit, double multiplier, long minimalDuration, long geofenceId) {
+ state.setEvent(null);
+ boolean oldState = state.getOverspeedState();
+ if (oldState) {
+ boolean newState = position.getSpeed() > speedLimit * multiplier;
+ if (newState) {
+ checkEvent(state, position, speedLimit, minimalDuration);
+ } else {
+ state.setOverspeedState(false);
+ state.setOverspeedTime(null);
+ state.setOverspeedGeofenceId(0);
+ }
+ } else if (position != null && position.getSpeed() > speedLimit * multiplier) {
+ state.setOverspeedState(true);
+ state.setOverspeedTime(position.getFixTime());
+ state.setOverspeedGeofenceId(geofenceId);
+ checkEvent(state, position, speedLimit, minimalDuration);
+ }
+ }
+ private static void checkEvent(OverspeedState state, Position position, double speedLimit, long minimalDuration) {
+ if (state.getOverspeedTime() != null) {
+ long oldTime = state.getOverspeedTime().getTime();
+ long newTime = position.getFixTime().getTime();
+ if (newTime - oldTime >= minimalDuration) {
+ Event event = new Event(Event.TYPE_DEVICE_OVERSPEED, position);
+ event.set(ATTRIBUTE_SPEED, position.getSpeed());
+ event.set(Position.KEY_SPEED_LIMIT, speedLimit);
+ event.setGeofenceId(state.getOverspeedGeofenceId());
+ state.setOverspeedTime(null);
+ state.setOverspeedGeofenceId(0);
+ state.setEvent(event);
+ }
+ }
+ }
diff --git a/src/main/java/org/traccar/session/state/OverspeedState.java b/src/main/java/org/traccar/session/state/OverspeedState.java
new file mode 100644
index 000000000..340ede6d7
--- /dev/null
+++ b/src/main/java/org/traccar/session/state/OverspeedState.java
@@ -0,0 +1,88 @@
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.session.state;
+import org.traccar.model.Device;
+import org.traccar.model.Event;
+import java.util.Date;
+public class OverspeedState {
+ public static OverspeedState fromDevice(Device device) {
+ OverspeedState state = new OverspeedState();
+ state.overspeedState = device.getOverspeedState();
+ state.overspeedTime = device.getOverspeedTime();
+ state.overspeedGeofenceId = device.getOverspeedGeofenceId();
+ return state;
+ }
+ public void toDevice(Device device) {
+ device.setOverspeedState(overspeedState);
+ device.setOverspeedTime(overspeedTime);
+ device.setOverspeedGeofenceId(overspeedGeofenceId);
+ }
+ private boolean changed;
+ public boolean isChanged() {
+ return changed;
+ }
+ private boolean overspeedState;
+ public boolean getOverspeedState() {
+ return overspeedState;
+ }
+ public void setOverspeedState(boolean overspeedState) {
+ this.overspeedState = overspeedState;
+ changed = true;
+ }
+ private Date overspeedTime;
+ public Date getOverspeedTime() {
+ return overspeedTime;
+ }
+ public void setOverspeedTime(Date overspeedTime) {
+ this.overspeedTime = overspeedTime;
+ changed = true;
+ }
+ private long overspeedGeofenceId;
+ public long getOverspeedGeofenceId() {
+ return overspeedGeofenceId;
+ }
+ public void setOverspeedGeofenceId(long overspeedGeofenceId) {
+ this.overspeedGeofenceId = overspeedGeofenceId;
+ changed = true;
+ }
+ private Event event;
+ public Event getEvent() {
+ return event;
+ }
+ public void setEvent(Event event) {
+ this.event = event;
+ }