aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Tananaev <anton@traccar.org>2022-06-25 13:33:35 -0700
committerAnton Tananaev <anton@traccar.org>2022-06-25 13:33:35 -0700
commitc53d98c668af9c79767e22964f05c7bf7dc866f2 (patch)
tree42d664009487232e3a21514bb5ab52c7571262a6
parent5a732a26c85785a9b801583f2fff0ce47314aa03 (diff)
downloadtrackermap-server-c53d98c668af9c79767e22964f05c7bf7dc866f2.tar.gz
trackermap-server-c53d98c668af9c79767e22964f05c7bf7dc866f2.tar.bz2
trackermap-server-c53d98c668af9c79767e22964f05c7bf7dc866f2.zip
Integrate broadcast service
-rw-r--r--src/main/java/org/traccar/MainEventHandler.java8
-rw-r--r--src/main/java/org/traccar/MainModule.java7
-rw-r--r--src/main/java/org/traccar/api/BaseObjectResource.java8
-rw-r--r--src/main/java/org/traccar/api/resource/DeviceResource.java8
-rw-r--r--src/main/java/org/traccar/api/resource/PermissionsResource.java22
-rw-r--r--src/main/java/org/traccar/broadcast/BroadcastInterface.java (renamed from src/main/java/org/traccar/broadcast/DeviceStatus.java)35
-rw-r--r--src/main/java/org/traccar/broadcast/BroadcastMessage.java43
-rw-r--r--src/main/java/org/traccar/broadcast/BroadcastService.java80
-rw-r--r--src/main/java/org/traccar/broadcast/MulticastBroadcastService.java170
-rw-r--r--src/main/java/org/traccar/broadcast/NullBroadcastService.java31
-rw-r--r--src/main/java/org/traccar/model/Permission.java8
-rw-r--r--src/main/java/org/traccar/notificators/NotificatorWeb.java8
-rw-r--r--src/main/java/org/traccar/session/ConnectionManager.java13
-rw-r--r--src/main/java/org/traccar/session/cache/CacheManager.java37
14 files changed, 345 insertions, 133 deletions
diff --git a/src/main/java/org/traccar/MainEventHandler.java b/src/main/java/org/traccar/MainEventHandler.java
index 06791c540..0a8c69b54 100644
--- a/src/main/java/org/traccar/MainEventHandler.java
+++ b/src/main/java/org/traccar/MainEventHandler.java
@@ -23,6 +23,7 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.config.Config;
import org.traccar.config.Keys;
import org.traccar.database.StatisticsManager;
@@ -56,15 +57,17 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter {
private final Storage storage;
private final ConnectionManager connectionManager;
private final StatisticsManager statisticsManager;
+ private final BroadcastService broadcastService;
@Inject
public MainEventHandler(
- Config config, CacheManager cacheManager, Storage storage,
- ConnectionManager connectionManager, StatisticsManager statisticsManager) {
+ Config config, CacheManager cacheManager, Storage storage, ConnectionManager connectionManager,
+ StatisticsManager statisticsManager, BroadcastService broadcastService) {
this.cacheManager = cacheManager;
this.storage = storage;
this.connectionManager = connectionManager;
this.statisticsManager = statisticsManager;
+ this.broadcastService = broadcastService;
String connectionlessProtocolList = config.getString(Keys.STATUS_IGNORE_OFFLINE);
if (connectionlessProtocolList != null) {
connectionlessProtocols.addAll(Arrays.asList(connectionlessProtocolList.split("[, ]")));
@@ -90,6 +93,7 @@ public class MainEventHandler extends ChannelInboundHandlerAdapter {
cacheManager.updatePosition(position);
connectionManager.updatePosition(position);
+ broadcastService.updatePosition(position);
}
} catch (StorageException error) {
LOGGER.warn("Failed to update device", error);
diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java
index 5a5675859..f60d41d62 100644
--- a/src/main/java/org/traccar/MainModule.java
+++ b/src/main/java/org/traccar/MainModule.java
@@ -29,6 +29,8 @@ import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.NullLogChute;
import org.eclipse.jetty.util.URIUtil;
import org.traccar.broadcast.BroadcastService;
+import org.traccar.broadcast.MulticastBroadcastService;
+import org.traccar.broadcast.NullBroadcastService;
import org.traccar.config.Config;
import org.traccar.config.Keys;
import org.traccar.database.LdapProvider;
@@ -278,13 +280,14 @@ public class MainModule extends AbstractModule {
return null;
}
+ @Singleton
@Provides
public static BroadcastService provideBroadcastService(
Config config, ObjectMapper objectMapper) throws IOException {
if (config.hasKey(Keys.BROADCAST_ADDRESS)) {
- return new BroadcastService(config, objectMapper);
+ return new MulticastBroadcastService(config, objectMapper);
}
- return null;
+ return new NullBroadcastService();
}
@Provides
diff --git a/src/main/java/org/traccar/api/BaseObjectResource.java b/src/main/java/org/traccar/api/BaseObjectResource.java
index 35ff04bf3..403021c6c 100644
--- a/src/main/java/org/traccar/api/BaseObjectResource.java
+++ b/src/main/java/org/traccar/api/BaseObjectResource.java
@@ -16,6 +16,7 @@
*/
package org.traccar.api;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.helper.LogAction;
import org.traccar.model.BaseModel;
import org.traccar.model.Group;
@@ -41,6 +42,9 @@ public abstract class BaseObjectResource<T extends BaseModel> extends BaseResour
@Inject
private CacheManager cacheManager;
+ @Inject
+ private BroadcastService broadcastService;
+
protected final Class<T> baseClass;
public BaseObjectResource(Class<T> baseClass) {
@@ -67,7 +71,8 @@ public abstract class BaseObjectResource<T extends BaseModel> extends BaseResour
entity.setId(storage.addObject(entity, new Request(new Columns.Exclude("id"))));
LogAction.create(getUserId(), entity);
storage.addPermission(new Permission(User.class, getUserId(), baseClass, entity.getId()));
- cacheManager.invalidate(User.class, getUserId(), baseClass, entity.getId());
+ cacheManager.invalidatePermission(User.class, getUserId(), baseClass, entity.getId());
+ broadcastService.invalidatePermission(User.class, getUserId(), baseClass, entity.getId());
LogAction.link(getUserId(), User.class, getUserId(), baseClass, entity.getId());
return Response.ok(entity).build();
@@ -94,6 +99,7 @@ public abstract class BaseObjectResource<T extends BaseModel> extends BaseResour
new Columns.Exclude("id"),
new Condition.Equals("id", "id")));
cacheManager.updateOrInvalidate(entity);
+ broadcastService.invalidateObject(entity.getClass(), entity.getId());
LogAction.edit(getUserId(), entity);
return Response.ok(entity).build();
diff --git a/src/main/java/org/traccar/api/resource/DeviceResource.java b/src/main/java/org/traccar/api/resource/DeviceResource.java
index ff682d1d1..2b673a108 100644
--- a/src/main/java/org/traccar/api/resource/DeviceResource.java
+++ b/src/main/java/org/traccar/api/resource/DeviceResource.java
@@ -16,6 +16,7 @@
package org.traccar.api.resource;
import org.traccar.api.BaseObjectResource;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.helper.LogAction;
import org.traccar.model.Device;
import org.traccar.model.DeviceAccumulators;
@@ -37,6 +38,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -52,6 +54,9 @@ public class DeviceResource extends BaseObjectResource<Device> {
@Inject
private ConnectionManager connectionManager;
+ @Inject
+ private BroadcastService broadcastService;
+
public DeviceResource() {
super(Device.class);
}
@@ -105,7 +110,7 @@ public class DeviceResource extends BaseObjectResource<Device> {
@Path("{id}/accumulators")
@PUT
- public Response updateAccumulators(DeviceAccumulators entity) throws StorageException {
+ public Response updateAccumulators(DeviceAccumulators entity) throws StorageException, IOException {
if (permissionsService.notAdmin(getUserId())) {
permissionsService.checkManager(getUserId());
permissionsService.checkPermission(Device.class, getUserId(), entity.getDeviceId());
@@ -133,6 +138,7 @@ public class DeviceResource extends BaseObjectResource<Device> {
cacheManager.addDevice(position.getDeviceId());
cacheManager.updatePosition(position);
connectionManager.updatePosition(position);
+ broadcastService.updatePosition(position);
} finally {
cacheManager.removeDevice(position.getDeviceId());
}
diff --git a/src/main/java/org/traccar/api/resource/PermissionsResource.java b/src/main/java/org/traccar/api/resource/PermissionsResource.java
index 7174a3eff..5ca865c31 100644
--- a/src/main/java/org/traccar/api/resource/PermissionsResource.java
+++ b/src/main/java/org/traccar/api/resource/PermissionsResource.java
@@ -17,6 +17,7 @@
package org.traccar.api.resource;
import org.traccar.api.BaseResource;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.helper.LogAction;
import org.traccar.model.Permission;
import org.traccar.model.UserRestrictions;
@@ -45,6 +46,9 @@ public class PermissionsResource extends BaseResource {
@Inject
private CacheManager cacheManager;
+ @Inject
+ private BroadcastService broadcastService;
+
private void checkPermission(Permission permission) throws StorageException {
if (permissionsService.notAdmin(getUserId())) {
permissionsService.checkPermission(permission.getOwnerClass(), getUserId(), permission.getOwnerId());
@@ -71,9 +75,14 @@ public class PermissionsResource extends BaseResource {
Permission permission = new Permission(entity);
checkPermission(permission);
storage.addPermission(permission);
- cacheManager.invalidate(permission.getOwnerClass(), permission.getOwnerId(),
+ cacheManager.invalidatePermission(
+ permission.getOwnerClass(), permission.getOwnerId(),
permission.getPropertyClass(), permission.getPropertyId());
- LogAction.link(getUserId(), permission.getOwnerClass(), permission.getOwnerId(),
+ broadcastService.invalidatePermission(
+ permission.getOwnerClass(), permission.getOwnerId(),
+ permission.getPropertyClass(), permission.getPropertyId());
+ LogAction.link(getUserId(),
+ permission.getOwnerClass(), permission.getOwnerId(),
permission.getPropertyClass(), permission.getPropertyId());
}
return Response.noContent().build();
@@ -93,9 +102,14 @@ public class PermissionsResource extends BaseResource {
Permission permission = new Permission(entity);
checkPermission(permission);
storage.removePermission(permission);
- cacheManager.invalidate(permission.getOwnerClass(), permission.getOwnerId(),
+ cacheManager.invalidatePermission(
+ permission.getOwnerClass(), permission.getOwnerId(),
+ permission.getPropertyClass(), permission.getPropertyId());
+ broadcastService.invalidatePermission(
+ permission.getOwnerClass(), permission.getOwnerId(),
permission.getPropertyClass(), permission.getPropertyId());
- LogAction.unlink(getUserId(), permission.getOwnerClass(), permission.getOwnerId(),
+ LogAction.unlink(getUserId(),
+ permission.getOwnerClass(), permission.getOwnerId(),
permission.getPropertyClass(), permission.getPropertyId());
}
return Response.noContent().build();
diff --git a/src/main/java/org/traccar/broadcast/DeviceStatus.java b/src/main/java/org/traccar/broadcast/BroadcastInterface.java
index 4f0143319..d5b49f213 100644
--- a/src/main/java/org/traccar/broadcast/DeviceStatus.java
+++ b/src/main/java/org/traccar/broadcast/BroadcastInterface.java
@@ -15,38 +15,27 @@
*/
package org.traccar.broadcast;
-import java.util.Date;
+import org.traccar.model.BaseModel;
+import org.traccar.model.Device;
+import org.traccar.model.Event;
+import org.traccar.model.Position;
-public class DeviceStatus {
+public interface BroadcastInterface {
- private long deviceId;
-
- public long getDeviceId() {
- return deviceId;
- }
-
- public void setDeviceId(long deviceId) {
- this.deviceId = deviceId;
+ default void updateDevice(Device device) {
}
- private String status;
-
- public String getStatus() {
- return status;
+ default void updatePosition(Position position) {
}
- public void setStatus(String status) {
- this.status = status;
+ default void updateEvent(long userId, Event event) {
}
- private Date lastUpdate;
-
- public Date getLastUpdate() {
- return this.lastUpdate;
+ default void invalidateObject(Class<? extends BaseModel> clazz, long id) {
}
- public void setLastUpdate(Date lastUpdate) {
- this.lastUpdate = lastUpdate;
+ default void invalidatePermission(
+ Class<? extends BaseModel> clazz1, long id1,
+ Class<? extends BaseModel> clazz2, long id2) {
}
-
}
diff --git a/src/main/java/org/traccar/broadcast/BroadcastMessage.java b/src/main/java/org/traccar/broadcast/BroadcastMessage.java
index 6b103f373..3e22be7e0 100644
--- a/src/main/java/org/traccar/broadcast/BroadcastMessage.java
+++ b/src/main/java/org/traccar/broadcast/BroadcastMessage.java
@@ -15,18 +15,22 @@
*/
package org.traccar.broadcast;
+import org.traccar.model.Device;
+import org.traccar.model.Event;
import org.traccar.model.Position;
+import java.util.Map;
+
public class BroadcastMessage {
- private DeviceStatus deviceStatus;
+ private Device device;
- public DeviceStatus getDeviceStatus() {
- return deviceStatus;
+ public Device getDevice() {
+ return device;
}
- public void setDeviceStatus(DeviceStatus deviceStatus) {
- this.deviceStatus = deviceStatus;
+ public void setDevice(Device device) {
+ this.device = device;
}
private Position position;
@@ -39,4 +43,33 @@ public class BroadcastMessage {
this.position = position;
}
+ private Long userId;
+
+ public Long getUserId() {
+ return userId;
+ }
+
+ public void setUserId(Long userId) {
+ this.userId = userId;
+ }
+
+ private Event event;
+
+ public Event getEvent() {
+ return event;
+ }
+
+ public void setEvent(Event event) {
+ this.event = event;
+ }
+
+ private Map<String, Long> changes;
+
+ public Map<String, Long> getChanges() {
+ return changes;
+ }
+
+ public void setChanges(Map<String, Long> changes) {
+ this.changes = changes;
+ }
}
diff --git a/src/main/java/org/traccar/broadcast/BroadcastService.java b/src/main/java/org/traccar/broadcast/BroadcastService.java
index 26e38400b..8a2e4bafc 100644
--- a/src/main/java/org/traccar/broadcast/BroadcastService.java
+++ b/src/main/java/org/traccar/broadcast/BroadcastService.java
@@ -15,84 +15,8 @@
*/
package org.traccar.broadcast;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.traccar.LifecycleObject;
-import org.traccar.config.Config;
-import org.traccar.config.Keys;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class BroadcastService implements LifecycleObject {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(BroadcastService.class);
-
- private final ObjectMapper objectMapper;
-
- private final InetAddress address;
- private final int port;
-
- private DatagramSocket publisherSocket;
-
- private final ExecutorService service = Executors.newSingleThreadExecutor();
- private final byte[] receiverBuffer = new byte[4096];
-
- public BroadcastService(Config config, ObjectMapper objectMapper) throws IOException {
- this.objectMapper = objectMapper;
- address = InetAddress.getByName(config.getString(Keys.BROADCAST_ADDRESS));
- port = config.getInteger(Keys.BROADCAST_PORT);
- }
-
- public void sendMessage(BroadcastMessage message) throws IOException {
- byte[] buffer = objectMapper.writeValueAsString(message).getBytes(StandardCharsets.UTF_8);
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length, address, port);
- publisherSocket.send(packet);
- }
-
- private void handleMessage(BroadcastMessage message) {
- if (message.getDeviceStatus() != null) {
- LOGGER.info("Broadcast received device {}", message.getDeviceStatus().getDeviceId());
- } else if (message.getPosition() != null) {
- LOGGER.info("Broadcast received position {}", message.getPosition().getDeviceId());
- }
- }
-
- @Override
- public void start() throws IOException {
- service.submit(receiver);
- publisherSocket = new DatagramSocket();
- }
-
- @Override
- public void stop() {
- publisherSocket.close();
- service.shutdown();
- }
-
- private final Runnable receiver = new Runnable() {
- @Override
- public void run() {
- try (MulticastSocket socket = new MulticastSocket(port)) {
- socket.joinGroup(address);
- while (!service.isShutdown()) {
- DatagramPacket packet = new DatagramPacket(receiverBuffer, receiverBuffer.length);
- socket.receive(packet);
- String data = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8);
- handleMessage(objectMapper.readValue(data, BroadcastMessage.class));
- }
- socket.leaveGroup(address);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- };
+public interface BroadcastService extends LifecycleObject, BroadcastInterface {
+ void registerListener(BroadcastInterface listener);
}
diff --git a/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java b/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java
new file mode 100644
index 000000000..0525fa742
--- /dev/null
+++ b/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.broadcast;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.traccar.config.Config;
+import org.traccar.config.Keys;
+import org.traccar.model.BaseModel;
+import org.traccar.model.Device;
+import org.traccar.model.Event;
+import org.traccar.model.Permission;
+import org.traccar.model.Position;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class MulticastBroadcastService implements BroadcastService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MulticastBroadcastService.class);
+
+ private final ObjectMapper objectMapper;
+
+ private final InetAddress address;
+ private final int port;
+
+ private DatagramSocket publisherSocket;
+
+ private final ExecutorService service = Executors.newSingleThreadExecutor();
+ private final byte[] receiverBuffer = new byte[4096];
+
+ private final Set<BroadcastInterface> listeners = new HashSet<>();
+
+ public MulticastBroadcastService(Config config, ObjectMapper objectMapper) throws IOException {
+ this.objectMapper = objectMapper;
+ address = InetAddress.getByName(config.getString(Keys.BROADCAST_ADDRESS));
+ port = config.getInteger(Keys.BROADCAST_PORT);
+ }
+
+ @Override
+ public void registerListener(BroadcastInterface listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void updateDevice(Device device) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setDevice(device);
+ sendMessage(message);
+ }
+
+ @Override
+ public void updatePosition(Position position) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setPosition(position);
+ sendMessage(message);
+ }
+
+ @Override
+ public void updateEvent(long userId, Event event) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setUserId(userId);
+ message.setEvent(event);
+ sendMessage(message);
+ }
+
+ @Override
+ public void invalidateObject(Class<? extends BaseModel> clazz, long id) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setChanges(Map.of(Permission.getKey(clazz), id));
+ sendMessage(message);
+ }
+
+ @Override
+ public void invalidatePermission(
+ Class<? extends BaseModel> clazz1, long id1,
+ Class<? extends BaseModel> clazz2, long id2) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setChanges(Map.of(Permission.getKey(clazz1), id1, Permission.getKey(clazz2), id2));
+ sendMessage(message);
+ }
+
+ private void sendMessage(BroadcastMessage message) {
+ try {
+ byte[] buffer = objectMapper.writeValueAsString(message).getBytes(StandardCharsets.UTF_8);
+ DatagramPacket packet = new DatagramPacket(buffer, buffer.length, address, port);
+ publisherSocket.send(packet);
+ } catch (IOException e) {
+ LOGGER.warn("Broadcast failed", e);
+ }
+ }
+
+ private void handleMessage(BroadcastMessage message) {
+ if (message.getDevice() != null) {
+ listeners.forEach(listener -> listener.updateDevice(message.getDevice()));
+ } else if (message.getPosition() != null) {
+ listeners.forEach(listener -> listener.updatePosition(message.getPosition()));
+ } else if (message.getUserId() != null && message.getEvent() != null) {
+ listeners.forEach(listener -> listener.updateEvent(message.getUserId(), message.getEvent()));
+ } else if (message.getChanges() != null) {
+ var iterator = message.getChanges().entrySet().iterator();
+ if (iterator.hasNext()) {
+ var first = iterator.next();
+ if (iterator.hasNext()) {
+ var second = iterator.next();
+ listeners.forEach(listener -> listener.invalidatePermission(
+ Permission.getKeyClass(first.getKey()), first.getValue(),
+ Permission.getKeyClass(second.getKey()), second.getValue()));
+ } else {
+ listeners.forEach(listener -> listener.invalidateObject(
+ Permission.getKeyClass(first.getKey()), first.getValue()));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void start() throws IOException {
+ service.submit(receiver);
+ publisherSocket = new DatagramSocket();
+ }
+
+ @Override
+ public void stop() {
+ publisherSocket.close();
+ service.shutdown();
+ }
+
+ private final Runnable receiver = new Runnable() {
+ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ try (MulticastSocket socket = new MulticastSocket(port)) {
+ socket.joinGroup(address);
+ while (!service.isShutdown()) {
+ DatagramPacket packet = new DatagramPacket(receiverBuffer, receiverBuffer.length);
+ socket.receive(packet);
+ String data = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8);
+ handleMessage(objectMapper.readValue(data, BroadcastMessage.class));
+ }
+ socket.leaveGroup(address);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+}
diff --git a/src/main/java/org/traccar/broadcast/NullBroadcastService.java b/src/main/java/org/traccar/broadcast/NullBroadcastService.java
new file mode 100644
index 000000000..3f41299db
--- /dev/null
+++ b/src/main/java/org/traccar/broadcast/NullBroadcastService.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.broadcast;
+
+public class NullBroadcastService implements BroadcastService {
+
+ @Override
+ public void registerListener(BroadcastInterface listener) {
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public void stop() throws Exception {
+ }
+}
diff --git a/src/main/java/org/traccar/model/Permission.java b/src/main/java/org/traccar/model/Permission.java
index 41dfa43e4..0b2f0584f 100644
--- a/src/main/java/org/traccar/model/Permission.java
+++ b/src/main/java/org/traccar/model/Permission.java
@@ -54,10 +54,10 @@ public class Permission {
this.data = data;
var iterator = data.entrySet().iterator();
var owner = iterator.next();
- ownerClass = CLASSES.get(owner.getKey().substring(0, owner.getKey().length() - 2));
+ ownerClass = getKeyClass(owner.getKey());
ownerId = owner.getValue();
var property = iterator.next();
- propertyClass = CLASSES.get(property.getKey().substring(0, property.getKey().length() - 2));
+ propertyClass = getKeyClass(property.getKey());
propertyId = property.getValue();
}
@@ -73,6 +73,10 @@ public class Permission {
data.put(getKey(propertyClass), propertyId);
}
+ public static Class<? extends BaseModel> getKeyClass(String key) {
+ return CLASSES.get(key.substring(0, key.length() - 2));
+ }
+
public static String getKey(Class<?> clazz) {
return Introspector.decapitalize(clazz.getSimpleName()) + "Id";
}
diff --git a/src/main/java/org/traccar/notificators/NotificatorWeb.java b/src/main/java/org/traccar/notificators/NotificatorWeb.java
index 3d899584d..efbbf24cc 100644
--- a/src/main/java/org/traccar/notificators/NotificatorWeb.java
+++ b/src/main/java/org/traccar/notificators/NotificatorWeb.java
@@ -16,6 +16,7 @@
*/
package org.traccar.notificators;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.model.Event;
import org.traccar.model.Position;
import org.traccar.model.User;
@@ -27,11 +28,15 @@ import javax.inject.Inject;
public final class NotificatorWeb implements Notificator {
private final ConnectionManager connectionManager;
+ private final BroadcastService broadcastService;
private final NotificationFormatter notificationFormatter;
@Inject
- public NotificatorWeb(ConnectionManager connectionManager, NotificationFormatter notificationFormatter) {
+ public NotificatorWeb(
+ ConnectionManager connectionManager, BroadcastService broadcastService,
+ NotificationFormatter notificationFormatter) {
this.connectionManager = connectionManager;
+ this.broadcastService = broadcastService;
this.notificationFormatter = notificationFormatter;
}
@@ -52,6 +57,7 @@ public final class NotificatorWeb implements Notificator {
copy.set("message", message.getBody());
connectionManager.updateEvent(user.getId(), copy);
+ broadcastService.updateEvent(user.getId(), copy);
}
}
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java
index fe6521d18..74427c08b 100644
--- a/src/main/java/org/traccar/session/ConnectionManager.java
+++ b/src/main/java/org/traccar/session/ConnectionManager.java
@@ -22,6 +22,8 @@ import io.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.Protocol;
+import org.traccar.broadcast.BroadcastInterface;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.config.Config;
import org.traccar.config.Keys;
import org.traccar.database.NotificationManager;
@@ -52,7 +54,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Singleton
-public class ConnectionManager {
+public class ConnectionManager implements BroadcastInterface {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
@@ -70,6 +72,7 @@ public class ConnectionManager {
private final Storage storage;
private final NotificationManager notificationManager;
private final Timer timer;
+ private final BroadcastService broadcastService;
private final Map<Long, Set<UpdateListener>> listeners = new ConcurrentHashMap<>();
private final Map<Long, Timeout> timeouts = new ConcurrentHashMap<>();
@@ -77,15 +80,17 @@ public class ConnectionManager {
@Inject
public ConnectionManager(
Injector injector, Config config, CacheManager cacheManager, Storage storage,
- NotificationManager notificationManager, Timer timer) {
+ NotificationManager notificationManager, Timer timer, BroadcastService broadcastService) {
this.injector = injector;
this.config = config;
this.cacheManager = cacheManager;
this.storage = storage;
this.notificationManager = notificationManager;
this.timer = timer;
+ this.broadcastService = broadcastService;
deviceTimeout = config.getLong(Keys.STATUS_TIMEOUT) * 1000;
updateDeviceState = config.getBoolean(Keys.STATUS_UPDATE_DEVICE_STATE);
+ broadcastService.registerListener(this);
}
public DeviceSession getDeviceSession(long deviceId) {
@@ -270,6 +275,7 @@ public class ConnectionManager {
}
updateDevice(device);
+ broadcastService.updateDevice(device);
}
public DeviceState getDeviceState(long deviceId) {
@@ -306,6 +312,7 @@ public class ConnectionManager {
}
}
+ @Override
public synchronized void updateDevice(Device device) {
for (User user : cacheManager.getDeviceObjects(device.getId(), User.class)) {
if (listeners.containsKey(user.getId())) {
@@ -316,6 +323,7 @@ public class ConnectionManager {
}
}
+ @Override
public synchronized void updatePosition(Position position) {
long deviceId = position.getDeviceId();
for (User user : cacheManager.getDeviceObjects(deviceId, User.class)) {
@@ -327,6 +335,7 @@ public class ConnectionManager {
}
}
+ @Override
public synchronized void updateEvent(long userId, Event event) {
if (listeners.containsKey(userId)) {
for (UpdateListener listener : listeners.get(userId)) {
diff --git a/src/main/java/org/traccar/session/cache/CacheManager.java b/src/main/java/org/traccar/session/cache/CacheManager.java
index abc8ca4c9..d2ada7d43 100644
--- a/src/main/java/org/traccar/session/cache/CacheManager.java
+++ b/src/main/java/org/traccar/session/cache/CacheManager.java
@@ -15,6 +15,8 @@
*/
package org.traccar.session.cache;
+import org.traccar.broadcast.BroadcastInterface;
+import org.traccar.broadcast.BroadcastService;
import org.traccar.config.Config;
import org.traccar.helper.model.GeofenceUtil;
import org.traccar.model.Attribute;
@@ -49,7 +51,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@Singleton
-public class CacheManager {
+public class CacheManager implements BroadcastInterface {
private static final Collection<Class<? extends BaseModel>> CLASSES = Arrays.asList(
Attribute.class, Driver.class, Geofence.class, Maintenance.class, Notification.class);
@@ -68,9 +70,10 @@ public class CacheManager {
private final Map<Long, List<User>> notificationUsers = new HashMap<>();
@Inject
- public CacheManager(Config config, Storage storage) throws StorageException {
+ public CacheManager(Config config, Storage storage, BroadcastService broadcastService) throws StorageException {
this.config = config;
this.storage = storage;
+ broadcastService.registerListener(this);
invalidateServer();
invalidateUsers();
}
@@ -179,13 +182,18 @@ public class CacheManager {
}
}
- public <T extends BaseModel> void updateOrInvalidate(Class<T> clazz, long id) throws StorageException {
- var object = storage.getObject(clazz, new Request(
- new Columns.All(), new Condition.Equals("id", "id", id)));
- if (object != null) {
- updateOrInvalidate(object);
- } else {
- invalidate(clazz, id);
+ @Override
+ public void invalidateObject(Class<? extends BaseModel> clazz, long id) {
+ try {
+ var object = storage.getObject(clazz, new Request(
+ new Columns.All(), new Condition.Equals("id", "id", id)));
+ if (object != null) {
+ updateOrInvalidate(object);
+ } else {
+ invalidate(clazz, id);
+ }
+ } catch (StorageException e) {
+ throw new RuntimeException(e);
}
}
@@ -219,10 +227,15 @@ public class CacheManager {
invalidate(new CacheKey(clazz, id));
}
- public void invalidate(
+ @Override
+ public void invalidatePermission(
Class<? extends BaseModel> clazz1, long id1,
- Class<? extends BaseModel> clazz2, long id2) throws StorageException {
- invalidate(new CacheKey(clazz1, id1), new CacheKey(clazz2, id2));
+ Class<? extends BaseModel> clazz2, long id2) {
+ try {
+ invalidate(new CacheKey(clazz1, id1), new CacheKey(clazz2, id2));
+ } catch (StorageException e) {
+ throw new RuntimeException(e);
+ }
}
private void invalidateServer() throws StorageException {