aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/broadcast/RedisBroadcastService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/traccar/broadcast/RedisBroadcastService.java')
-rw-r--r--src/main/java/org/traccar/broadcast/RedisBroadcastService.java176
1 files changed, 50 insertions, 126 deletions
diff --git a/src/main/java/org/traccar/broadcast/RedisBroadcastService.java b/src/main/java/org/traccar/broadcast/RedisBroadcastService.java
index a6968c894..e619fef60 100644
--- a/src/main/java/org/traccar/broadcast/RedisBroadcastService.java
+++ b/src/main/java/org/traccar/broadcast/RedisBroadcastService.java
@@ -20,23 +20,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.config.Config;
import org.traccar.config.Keys;
-import org.traccar.model.BaseModel;
-import org.traccar.model.Device;
-import org.traccar.model.Event;
-import org.traccar.model.Permission;
-import org.traccar.model.Position;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisException;
-public class RedisBroadcastService implements BroadcastService {
+public class RedisBroadcastService extends BaseBroadcastService {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisBroadcastService.class);
@@ -44,25 +38,25 @@ public class RedisBroadcastService implements BroadcastService {
private final ExecutorService service = Executors.newSingleThreadExecutor();
- private final Set<BroadcastInterface> listeners = new HashSet<>();
-
private final String url;
- private final String pubsubChannel = "traccar:cast";
+ private final String channel = "traccar";
- private final Jedis subscriberJedis;
- private final Jedis publisherJedis;
+ private Jedis subscriber;
+ private Jedis publisher;
- private final String id;
+ private final String id = UUID.randomUUID().toString();
public RedisBroadcastService(Config config, ObjectMapper objectMapper) throws IOException {
this.objectMapper = objectMapper;
url = config.getString(Keys.BROADCAST_ADDRESS);
- subscriberJedis = new Jedis(url);
- publisherJedis = new Jedis(url);
-
- // id that will be used to identify this instance of the server
- id = String.valueOf(System.currentTimeMillis());
+ try {
+ subscriber = new Jedis(url);
+ publisher = new Jedis(url);
+ subscriber.connect();
+ } catch (JedisConnectionException e) {
+ throw new IOException(e);
+ }
}
@Override
@@ -71,90 +65,14 @@ public class RedisBroadcastService implements BroadcastService {
}
@Override
- public void registerListener(BroadcastInterface listener) {
- listeners.add(listener);
- }
-
- @Override
- public void updateDevice(boolean local, Device device) {
- BroadcastMessage message = new BroadcastMessage();
- message.setDevice(device);
- sendMessage(message);
- }
-
- @Override
- public void updatePosition(boolean local, Position position) {
- BroadcastMessage message = new BroadcastMessage();
- message.setPosition(position);
- sendMessage(message);
- }
-
- @Override
- public void updateEvent(boolean local, long userId, Event event) {
- BroadcastMessage message = new BroadcastMessage();
- message.setUserId(userId);
- message.setEvent(event);
- sendMessage(message);
- }
-
- @Override
- public void updateCommand(boolean local, long deviceId) {
- BroadcastMessage message = new BroadcastMessage();
- message.setCommandDeviceId(deviceId);
- sendMessage(message);
- }
-
- @Override
- public void invalidateObject(boolean local, Class<? extends BaseModel> clazz, long id) {
- BroadcastMessage message = new BroadcastMessage();
- message.setChanges(Map.of(Permission.getKey(clazz), id));
- sendMessage(message);
- }
-
- @Override
- public void invalidatePermission(
- boolean local,
- Class<? extends BaseModel> clazz1, long id1,
- Class<? extends BaseModel> clazz2, long id2) {
- BroadcastMessage message = new BroadcastMessage();
- message.setChanges(Map.of(Permission.getKey(clazz1), id1, Permission.getKey(clazz2), id2));
- sendMessage(message);
- }
-
- private void sendMessage(BroadcastMessage message) {
+ protected void sendMessage(BroadcastMessage message) {
try {
String payload = id + ":" + objectMapper.writeValueAsString(message);
- publisherJedis.publish(pubsubChannel, payload);
+ publisher.publish(channel, payload);
} catch (IOException e) {
LOGGER.warn("Broadcast failed", e);
- }
- }
-
- private void handleMessage(BroadcastMessage message) {
- if (message.getDevice() != null) {
- listeners.forEach(listener -> listener.updateDevice(false, message.getDevice()));
- } else if (message.getPosition() != null) {
- listeners.forEach(listener -> listener.updatePosition(false, message.getPosition()));
- } else if (message.getUserId() != null && message.getEvent() != null) {
- listeners.forEach(listener -> listener.updateEvent(false, message.getUserId(), message.getEvent()));
- } else if (message.getCommandDeviceId() != null) {
- listeners.forEach(listener -> listener.updateCommand(false, message.getCommandDeviceId()));
- } else if (message.getChanges() != null) {
- var iterator = message.getChanges().entrySet().iterator();
- if (iterator.hasNext()) {
- var first = iterator.next();
- if (iterator.hasNext()) {
- var second = iterator.next();
- listeners.forEach(listener -> listener.invalidatePermission(
- false,
- Permission.getKeyClass(first.getKey()), first.getValue(),
- Permission.getKeyClass(second.getKey()), second.getValue()));
- } else {
- listeners.forEach(listener -> listener.invalidateObject(
- false,
- Permission.getKeyClass(first.getKey()), first.getValue()));
- }
- }
+ } catch (JedisConnectionException e) {
+ LOGGER.warn("Broadcast failed", e);
}
}
@@ -165,39 +83,45 @@ public class RedisBroadcastService implements BroadcastService {
@Override
public void stop() {
+ try {
+ if (subscriber != null) {
+ subscriber.close();
+ subscriber = null;
+ }
+ } catch (JedisException e) {
+ LOGGER.warn("Subscriber close failed", e);
+ }
+ try {
+ if (publisher != null) {
+ publisher.close();
+ publisher = null;
+ }
+ } catch (JedisException e) {
+ LOGGER.warn("Publisher close failed", e);
+ }
service.shutdown();
}
private final Runnable receiver = new Runnable() {
@Override
public void run() {
- subscriberJedis.subscribe(new JedisPubSub() {
- @Override
- public void onMessage(String channel, String message) {
- try {
- String[] parts = message.split(":", 2);
- if (channel == pubsubChannel && parts.length == 2 && !id.equals(parts[0])) {
- handleMessage(objectMapper.readValue(parts[1], BroadcastMessage.class));
+ try {
+ subscriber.subscribe(new JedisPubSub() {
+ @Override
+ public void onMessage(String messageChannel, String message) {
+ try {
+ String[] parts = message.split(":", 2);
+ if (messageChannel == channel && parts.length == 2 && !id.equals(parts[0])) {
+ handleMessage(objectMapper.readValue(parts[1], BroadcastMessage.class));
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Broadcast handleMessage failed", e);
}
- } catch (IOException e) {
- LOGGER.warn("Broadcast handleMessage failed", e);
}
- }
- }, pubsubChannel);
-
- while (!service.isShutdown()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- break;
- }
- }
-
- try {
- subscriberJedis.close();
- publisherJedis.close();
- } catch (Exception e) {
- LOGGER.warn("Failed to close pubsub", e);
+ }, channel);
+ } catch (JedisConnectionException e) {
+ throw new RuntimeException(e);
+ } catch (JedisException e) {
throw new RuntimeException(e);
}
}