aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/traccar/MainModule.java2
-rw-r--r--src/main/java/org/traccar/broadcast/BaseBroadcastService.java118
-rw-r--r--src/main/java/org/traccar/broadcast/MulticastBroadcastService.java92
-rw-r--r--src/main/java/org/traccar/broadcast/RedisBroadcastService.java176
-rw-r--r--src/main/java/org/traccar/config/Keys.java2
5 files changed, 172 insertions, 218 deletions
diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java
index b1a8006ee..29d846154 100644
--- a/src/main/java/org/traccar/MainModule.java
+++ b/src/main/java/org/traccar/MainModule.java
@@ -349,7 +349,7 @@ public class MainModule extends AbstractModule {
case "redis":
return new RedisBroadcastService(config, objectMapper);
default:
- return new NullBroadcastService();
+ break;
}
}
return new NullBroadcastService();
diff --git a/src/main/java/org/traccar/broadcast/BaseBroadcastService.java b/src/main/java/org/traccar/broadcast/BaseBroadcastService.java
new file mode 100644
index 000000000..1ed639dfd
--- /dev/null
+++ b/src/main/java/org/traccar/broadcast/BaseBroadcastService.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2023 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.broadcast;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.traccar.model.BaseModel;
+import org.traccar.model.Device;
+import org.traccar.model.Event;
+import org.traccar.model.Permission;
+import org.traccar.model.Position;
+
+public abstract class BaseBroadcastService implements BroadcastService {
+
+ private final Set<BroadcastInterface> listeners = new HashSet<>();
+
+ @Override
+ public boolean singleInstance() {
+ return true;
+ }
+
+ @Override
+ public void registerListener(BroadcastInterface listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void updateDevice(boolean local, Device device) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setDevice(device);
+ sendMessage(message);
+ }
+
+ @Override
+ public void updatePosition(boolean local, Position position) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setPosition(position);
+ sendMessage(message);
+ }
+
+ @Override
+ public void updateEvent(boolean local, long userId, Event event) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setUserId(userId);
+ message.setEvent(event);
+ sendMessage(message);
+ }
+
+ @Override
+ public void updateCommand(boolean local, long deviceId) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setCommandDeviceId(deviceId);
+ sendMessage(message);
+ }
+
+ @Override
+ public void invalidateObject(boolean local, Class<? extends BaseModel> clazz, long id) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setChanges(Map.of(Permission.getKey(clazz), id));
+ sendMessage(message);
+ }
+
+ @Override
+ public void invalidatePermission(
+ boolean local,
+ Class<? extends BaseModel> clazz1, long id1,
+ Class<? extends BaseModel> clazz2, long id2) {
+ BroadcastMessage message = new BroadcastMessage();
+ message.setChanges(Map.of(Permission.getKey(clazz1), id1, Permission.getKey(clazz2), id2));
+ sendMessage(message);
+ }
+
+ protected abstract void sendMessage(BroadcastMessage message);
+
+ protected void handleMessage(BroadcastMessage message) {
+ if (message.getDevice() != null) {
+ listeners.forEach(listener -> listener.updateDevice(false, message.getDevice()));
+ } else if (message.getPosition() != null) {
+ listeners.forEach(listener -> listener.updatePosition(false, message.getPosition()));
+ } else if (message.getUserId() != null && message.getEvent() != null) {
+ listeners.forEach(listener -> listener.updateEvent(false, message.getUserId(), message.getEvent()));
+ } else if (message.getCommandDeviceId() != null) {
+ listeners.forEach(listener -> listener.updateCommand(false, message.getCommandDeviceId()));
+ } else if (message.getChanges() != null) {
+ var iterator = message.getChanges().entrySet().iterator();
+ if (iterator.hasNext()) {
+ var first = iterator.next();
+ if (iterator.hasNext()) {
+ var second = iterator.next();
+ listeners.forEach(listener -> listener.invalidatePermission(
+ false,
+ Permission.getKeyClass(first.getKey()), first.getValue(),
+ Permission.getKeyClass(second.getKey()), second.getValue()));
+ } else {
+ listeners.forEach(listener -> listener.invalidateObject(
+ false,
+ Permission.getKeyClass(first.getKey()), first.getValue()));
+ }
+ }
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java b/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java
index b1b66f1e3..1c02b319b 100644
--- a/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java
+++ b/src/main/java/org/traccar/broadcast/MulticastBroadcastService.java
@@ -20,11 +20,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.config.Config;
import org.traccar.config.Keys;
-import org.traccar.model.BaseModel;
-import org.traccar.model.Device;
-import org.traccar.model.Event;
-import org.traccar.model.Permission;
-import org.traccar.model.Position;
import java.io.IOException;
import java.net.DatagramPacket;
@@ -34,13 +29,10 @@ import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-public class MulticastBroadcastService implements BroadcastService {
+public class MulticastBroadcastService extends BaseBroadcastService {
private static final Logger LOGGER = LoggerFactory.getLogger(MulticastBroadcastService.class);
@@ -55,8 +47,6 @@ public class MulticastBroadcastService implements BroadcastService {
private final ExecutorService service = Executors.newSingleThreadExecutor();
private final byte[] receiverBuffer = new byte[4096];
- private final Set<BroadcastInterface> listeners = new HashSet<>();
-
public MulticastBroadcastService(Config config, ObjectMapper objectMapper) throws IOException {
this.objectMapper = objectMapper;
port = config.getInteger(Keys.BROADCAST_PORT);
@@ -76,57 +66,7 @@ public class MulticastBroadcastService implements BroadcastService {
}
@Override
- public void registerListener(BroadcastInterface listener) {
- listeners.add(listener);
- }
-
- @Override
- public void updateDevice(boolean local, Device device) {
- BroadcastMessage message = new BroadcastMessage();
- message.setDevice(device);
- sendMessage(message);
- }
-
- @Override
- public void updatePosition(boolean local, Position position) {
- BroadcastMessage message = new BroadcastMessage();
- message.setPosition(position);
- sendMessage(message);
- }
-
- @Override
- public void updateEvent(boolean local, long userId, Event event) {
- BroadcastMessage message = new BroadcastMessage();
- message.setUserId(userId);
- message.setEvent(event);
- sendMessage(message);
- }
-
- @Override
- public void updateCommand(boolean local, long deviceId) {
- BroadcastMessage message = new BroadcastMessage();
- message.setCommandDeviceId(deviceId);
- sendMessage(message);
- }
-
- @Override
- public void invalidateObject(boolean local, Class<? extends BaseModel> clazz, long id) {
- BroadcastMessage message = new BroadcastMessage();
- message.setChanges(Map.of(Permission.getKey(clazz), id));
- sendMessage(message);
- }
-
- @Override
- public void invalidatePermission(
- boolean local,
- Class<? extends BaseModel> clazz1, long id1,
- Class<? extends BaseModel> clazz2, long id2) {
- BroadcastMessage message = new BroadcastMessage();
- message.setChanges(Map.of(Permission.getKey(clazz1), id1, Permission.getKey(clazz2), id2));
- sendMessage(message);
- }
-
- private void sendMessage(BroadcastMessage message) {
+ protected void sendMessage(BroadcastMessage message) {
try {
byte[] buffer = objectMapper.writeValueAsString(message).getBytes(StandardCharsets.UTF_8);
DatagramPacket packet = new DatagramPacket(buffer, buffer.length, group);
@@ -136,34 +76,6 @@ public class MulticastBroadcastService implements BroadcastService {
}
}
- private void handleMessage(BroadcastMessage message) {
- if (message.getDevice() != null) {
- listeners.forEach(listener -> listener.updateDevice(false, message.getDevice()));
- } else if (message.getPosition() != null) {
- listeners.forEach(listener -> listener.updatePosition(false, message.getPosition()));
- } else if (message.getUserId() != null && message.getEvent() != null) {
- listeners.forEach(listener -> listener.updateEvent(false, message.getUserId(), message.getEvent()));
- } else if (message.getCommandDeviceId() != null) {
- listeners.forEach(listener -> listener.updateCommand(false, message.getCommandDeviceId()));
- } else if (message.getChanges() != null) {
- var iterator = message.getChanges().entrySet().iterator();
- if (iterator.hasNext()) {
- var first = iterator.next();
- if (iterator.hasNext()) {
- var second = iterator.next();
- listeners.forEach(listener -> listener.invalidatePermission(
- false,
- Permission.getKeyClass(first.getKey()), first.getValue(),
- Permission.getKeyClass(second.getKey()), second.getValue()));
- } else {
- listeners.forEach(listener -> listener.invalidateObject(
- false,
- Permission.getKeyClass(first.getKey()), first.getValue()));
- }
- }
- }
- }
-
@Override
public void start() throws IOException {
service.submit(receiver);
diff --git a/src/main/java/org/traccar/broadcast/RedisBroadcastService.java b/src/main/java/org/traccar/broadcast/RedisBroadcastService.java
index a6968c894..e619fef60 100644
--- a/src/main/java/org/traccar/broadcast/RedisBroadcastService.java
+++ b/src/main/java/org/traccar/broadcast/RedisBroadcastService.java
@@ -20,23 +20,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.config.Config;
import org.traccar.config.Keys;
-import org.traccar.model.BaseModel;
-import org.traccar.model.Device;
-import org.traccar.model.Event;
-import org.traccar.model.Permission;
-import org.traccar.model.Position;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisException;
-public class RedisBroadcastService implements BroadcastService {
+public class RedisBroadcastService extends BaseBroadcastService {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisBroadcastService.class);
@@ -44,25 +38,25 @@ public class RedisBroadcastService implements BroadcastService {
private final ExecutorService service = Executors.newSingleThreadExecutor();
- private final Set<BroadcastInterface> listeners = new HashSet<>();
-
private final String url;
- private final String pubsubChannel = "traccar:cast";
+ private final String channel = "traccar";
- private final Jedis subscriberJedis;
- private final Jedis publisherJedis;
+ private Jedis subscriber;
+ private Jedis publisher;
- private final String id;
+ private final String id = UUID.randomUUID().toString();
public RedisBroadcastService(Config config, ObjectMapper objectMapper) throws IOException {
this.objectMapper = objectMapper;
url = config.getString(Keys.BROADCAST_ADDRESS);
- subscriberJedis = new Jedis(url);
- publisherJedis = new Jedis(url);
-
- // id that will be used to identify this instance of the server
- id = String.valueOf(System.currentTimeMillis());
+ try {
+ subscriber = new Jedis(url);
+ publisher = new Jedis(url);
+ subscriber.connect();
+ } catch (JedisConnectionException e) {
+ throw new IOException(e);
+ }
}
@Override
@@ -71,90 +65,14 @@ public class RedisBroadcastService implements BroadcastService {
}
@Override
- public void registerListener(BroadcastInterface listener) {
- listeners.add(listener);
- }
-
- @Override
- public void updateDevice(boolean local, Device device) {
- BroadcastMessage message = new BroadcastMessage();
- message.setDevice(device);
- sendMessage(message);
- }
-
- @Override
- public void updatePosition(boolean local, Position position) {
- BroadcastMessage message = new BroadcastMessage();
- message.setPosition(position);
- sendMessage(message);
- }
-
- @Override
- public void updateEvent(boolean local, long userId, Event event) {
- BroadcastMessage message = new BroadcastMessage();
- message.setUserId(userId);
- message.setEvent(event);
- sendMessage(message);
- }
-
- @Override
- public void updateCommand(boolean local, long deviceId) {
- BroadcastMessage message = new BroadcastMessage();
- message.setCommandDeviceId(deviceId);
- sendMessage(message);
- }
-
- @Override
- public void invalidateObject(boolean local, Class<? extends BaseModel> clazz, long id) {
- BroadcastMessage message = new BroadcastMessage();
- message.setChanges(Map.of(Permission.getKey(clazz), id));
- sendMessage(message);
- }
-
- @Override
- public void invalidatePermission(
- boolean local,
- Class<? extends BaseModel> clazz1, long id1,
- Class<? extends BaseModel> clazz2, long id2) {
- BroadcastMessage message = new BroadcastMessage();
- message.setChanges(Map.of(Permission.getKey(clazz1), id1, Permission.getKey(clazz2), id2));
- sendMessage(message);
- }
-
- private void sendMessage(BroadcastMessage message) {
+ protected void sendMessage(BroadcastMessage message) {
try {
String payload = id + ":" + objectMapper.writeValueAsString(message);
- publisherJedis.publish(pubsubChannel, payload);
+ publisher.publish(channel, payload);
} catch (IOException e) {
LOGGER.warn("Broadcast failed", e);
- }
- }
-
- private void handleMessage(BroadcastMessage message) {
- if (message.getDevice() != null) {
- listeners.forEach(listener -> listener.updateDevice(false, message.getDevice()));
- } else if (message.getPosition() != null) {
- listeners.forEach(listener -> listener.updatePosition(false, message.getPosition()));
- } else if (message.getUserId() != null && message.getEvent() != null) {
- listeners.forEach(listener -> listener.updateEvent(false, message.getUserId(), message.getEvent()));
- } else if (message.getCommandDeviceId() != null) {
- listeners.forEach(listener -> listener.updateCommand(false, message.getCommandDeviceId()));
- } else if (message.getChanges() != null) {
- var iterator = message.getChanges().entrySet().iterator();
- if (iterator.hasNext()) {
- var first = iterator.next();
- if (iterator.hasNext()) {
- var second = iterator.next();
- listeners.forEach(listener -> listener.invalidatePermission(
- false,
- Permission.getKeyClass(first.getKey()), first.getValue(),
- Permission.getKeyClass(second.getKey()), second.getValue()));
- } else {
- listeners.forEach(listener -> listener.invalidateObject(
- false,
- Permission.getKeyClass(first.getKey()), first.getValue()));
- }
- }
+ } catch (JedisConnectionException e) {
+ LOGGER.warn("Broadcast failed", e);
}
}
@@ -165,39 +83,45 @@ public class RedisBroadcastService implements BroadcastService {
@Override
public void stop() {
+ try {
+ if (subscriber != null) {
+ subscriber.close();
+ subscriber = null;
+ }
+ } catch (JedisException e) {
+ LOGGER.warn("Subscriber close failed", e);
+ }
+ try {
+ if (publisher != null) {
+ publisher.close();
+ publisher = null;
+ }
+ } catch (JedisException e) {
+ LOGGER.warn("Publisher close failed", e);
+ }
service.shutdown();
}
private final Runnable receiver = new Runnable() {
@Override
public void run() {
- subscriberJedis.subscribe(new JedisPubSub() {
- @Override
- public void onMessage(String channel, String message) {
- try {
- String[] parts = message.split(":", 2);
- if (channel == pubsubChannel && parts.length == 2 && !id.equals(parts[0])) {
- handleMessage(objectMapper.readValue(parts[1], BroadcastMessage.class));
+ try {
+ subscriber.subscribe(new JedisPubSub() {
+ @Override
+ public void onMessage(String messageChannel, String message) {
+ try {
+ String[] parts = message.split(":", 2);
+ if (messageChannel == channel && parts.length == 2 && !id.equals(parts[0])) {
+ handleMessage(objectMapper.readValue(parts[1], BroadcastMessage.class));
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Broadcast handleMessage failed", e);
}
- } catch (IOException e) {
- LOGGER.warn("Broadcast handleMessage failed", e);
}
- }
- }, pubsubChannel);
-
- while (!service.isShutdown()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- break;
- }
- }
-
- try {
- subscriberJedis.close();
- publisherJedis.close();
- } catch (Exception e) {
- LOGGER.warn("Failed to close pubsub", e);
+ }, channel);
+ } catch (JedisConnectionException e) {
+ throw new RuntimeException(e);
+ } catch (JedisException e) {
throw new RuntimeException(e);
}
}
diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java
index 6f42e8937..381e3a108 100644
--- a/src/main/java/org/traccar/config/Keys.java
+++ b/src/main/java/org/traccar/config/Keys.java
@@ -1771,7 +1771,7 @@ public final class Keys {
List.of(KeyType.CONFIG));
/**
- * Multicast address for broadcasting synchronization events.
+ * Multicast address or Redis URL for broadcasting synchronization events.
*/
public static final ConfigKey<String> BROADCAST_ADDRESS = new StringConfigKey(
"broadcast.address",