From 77c1e617587947c431b77773f53da5e222f4dd61 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Mon, 14 Nov 2022 08:19:22 -0800 Subject: Customizable Kafka topic --- src/main/java/org/traccar/config/Keys.java | 16 ++++++++++++++++ .../java/org/traccar/forward/EventForwarderKafka.java | 5 ++++- .../java/org/traccar/forward/PositionForwarderKafka.java | 7 +++++-- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 2224192d9..9ae71921e 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -706,6 +706,14 @@ public final class Keys { List.of(KeyType.CONFIG), "url"); + /** + * Position forwarding Kafka topic. + */ + public static final ConfigKey FORWARD_TOPIC = new StringConfigKey( + "forward.topic", + List.of(KeyType.CONFIG), + "positions"); + /** * URL to forward positions. Data is passed through URL parameters. For example, {uniqueId} for device identifier, * {latitude} and {longitude} for coordinates. @@ -767,6 +775,14 @@ public final class Keys { List.of(KeyType.CONFIG), "json"); + /** + * Events forwarding Kafka topic. + */ + public static final ConfigKey EVENT_FORWARD_TOPIC = new StringConfigKey( + "event.forward.topic", + List.of(KeyType.CONFIG), + "events"); + /** * Events forwarding URL. */ diff --git a/src/main/java/org/traccar/forward/EventForwarderKafka.java b/src/main/java/org/traccar/forward/EventForwarderKafka.java index 71e06ddd1..db97d22de 100644 --- a/src/main/java/org/traccar/forward/EventForwarderKafka.java +++ b/src/main/java/org/traccar/forward/EventForwarderKafka.java @@ -30,6 +30,8 @@ public class EventForwarderKafka implements EventForwarder { private final Producer producer; private final ObjectMapper objectMapper; + private final String topic; + public EventForwarderKafka(Config config, ObjectMapper objectMapper) { this.objectMapper = objectMapper; Properties properties = new Properties(); @@ -38,6 +40,7 @@ public class EventForwarderKafka implements EventForwarder { properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); + topic = config.getString(Keys.EVENT_FORWARD_TOPIC); } @SuppressWarnings("deprecation") @@ -51,7 +54,7 @@ public class EventForwarderKafka implements EventForwarder { try { String key = Long.toString(eventData.getDevice().getId()); String value = objectMapper.writeValueAsString(eventData); - producer.send(new ProducerRecord<>("events", key, value)); + producer.send(new ProducerRecord<>(topic, key, value)); resultHandler.onResult(true, null); } catch (JsonProcessingException e) { resultHandler.onResult(false, e); diff --git a/src/main/java/org/traccar/forward/PositionForwarderKafka.java b/src/main/java/org/traccar/forward/PositionForwarderKafka.java index 3921539ac..7432e9364 100644 --- a/src/main/java/org/traccar/forward/PositionForwarderKafka.java +++ b/src/main/java/org/traccar/forward/PositionForwarderKafka.java @@ -30,14 +30,17 @@ public class PositionForwarderKafka implements PositionForwarder { private final Producer producer; private final ObjectMapper objectMapper; + private final String topic; + public PositionForwarderKafka(Config config, ObjectMapper objectMapper) { this.objectMapper = objectMapper; Properties properties = new Properties(); - properties.put("bootstrap.servers", config.getString(Keys.EVENT_FORWARD_URL)); + properties.put("bootstrap.servers", config.getString(Keys.FORWARD_URL)); properties.put("acks", "all"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); + topic = config.getString(Keys.FORWARD_TOPIC); } @Override @@ -45,7 +48,7 @@ public class PositionForwarderKafka implements PositionForwarder { try { String key = Long.toString(positionData.getDevice().getId()); String value = objectMapper.writeValueAsString(positionData); - producer.send(new ProducerRecord<>("positions", key, value)); + producer.send(new ProducerRecord<>(topic, key, value)); resultHandler.onResult(true, null); } catch (JsonProcessingException e) { resultHandler.onResult(false, e); -- cgit v1.2.3