diff options
-rw-r--r-- | src/main/java/org/traccar/config/Keys.java | 16 | ||||
-rw-r--r-- | src/main/java/org/traccar/forward/EventForwarderKafka.java | 5 | ||||
-rw-r--r-- | src/main/java/org/traccar/forward/PositionForwarderKafka.java | 7 |
3 files changed, 25 insertions, 3 deletions
diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 2224192d9..9ae71921e 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -707,6 +707,14 @@ public final class Keys { "url"); /** + * Position forwarding Kafka topic. + */ + public static final ConfigKey<String> FORWARD_TOPIC = new StringConfigKey( + "forward.topic", + List.of(KeyType.CONFIG), + "positions"); + + /** * URL to forward positions. Data is passed through URL parameters. For example, {uniqueId} for device identifier, * {latitude} and {longitude} for coordinates. */ @@ -768,6 +776,14 @@ public final class Keys { "json"); /** + * Events forwarding Kafka topic. + */ + public static final ConfigKey<String> EVENT_FORWARD_TOPIC = new StringConfigKey( + "event.forward.topic", + List.of(KeyType.CONFIG), + "events"); + + /** * Events forwarding URL. */ public static final ConfigKey<String> EVENT_FORWARD_URL = new StringConfigKey( diff --git a/src/main/java/org/traccar/forward/EventForwarderKafka.java b/src/main/java/org/traccar/forward/EventForwarderKafka.java index 71e06ddd1..db97d22de 100644 --- a/src/main/java/org/traccar/forward/EventForwarderKafka.java +++ b/src/main/java/org/traccar/forward/EventForwarderKafka.java @@ -30,6 +30,8 @@ public class EventForwarderKafka implements EventForwarder { private final Producer<String, String> producer; private final ObjectMapper objectMapper; + private final String topic; + public EventForwarderKafka(Config config, ObjectMapper objectMapper) { this.objectMapper = objectMapper; Properties properties = new Properties(); @@ -38,6 +40,7 @@ public class EventForwarderKafka implements EventForwarder { properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); + topic = config.getString(Keys.EVENT_FORWARD_TOPIC); } @SuppressWarnings("deprecation") @@ -51,7 +54,7 @@ public class EventForwarderKafka implements EventForwarder { try { String key = Long.toString(eventData.getDevice().getId()); String value = objectMapper.writeValueAsString(eventData); - producer.send(new ProducerRecord<>("events", key, value)); + producer.send(new ProducerRecord<>(topic, key, value)); resultHandler.onResult(true, null); } catch (JsonProcessingException e) { resultHandler.onResult(false, e); diff --git a/src/main/java/org/traccar/forward/PositionForwarderKafka.java b/src/main/java/org/traccar/forward/PositionForwarderKafka.java index 3921539ac..7432e9364 100644 --- a/src/main/java/org/traccar/forward/PositionForwarderKafka.java +++ b/src/main/java/org/traccar/forward/PositionForwarderKafka.java @@ -30,14 +30,17 @@ public class PositionForwarderKafka implements PositionForwarder { private final Producer<String, String> producer; private final ObjectMapper objectMapper; + private final String topic; + public PositionForwarderKafka(Config config, ObjectMapper objectMapper) { this.objectMapper = objectMapper; Properties properties = new Properties(); - properties.put("bootstrap.servers", config.getString(Keys.EVENT_FORWARD_URL)); + properties.put("bootstrap.servers", config.getString(Keys.FORWARD_URL)); properties.put("acks", "all"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); + topic = config.getString(Keys.FORWARD_TOPIC); } @Override @@ -45,7 +48,7 @@ public class PositionForwarderKafka implements PositionForwarder { try { String key = Long.toString(positionData.getDevice().getId()); String value = objectMapper.writeValueAsString(positionData); - producer.send(new ProducerRecord<>("positions", key, value)); + producer.send(new ProducerRecord<>(topic, key, value)); resultHandler.onResult(true, null); } catch (JsonProcessingException e) { resultHandler.onResult(false, e); |