From 77c1e617587947c431b77773f53da5e222f4dd61 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Mon, 14 Nov 2022 08:19:22 -0800 Subject: Customizable Kafka topic --- src/main/java/org/traccar/forward/EventForwarderKafka.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/main/java/org/traccar/forward/EventForwarderKafka.java') diff --git a/src/main/java/org/traccar/forward/EventForwarderKafka.java b/src/main/java/org/traccar/forward/EventForwarderKafka.java index 71e06ddd1..db97d22de 100644 --- a/src/main/java/org/traccar/forward/EventForwarderKafka.java +++ b/src/main/java/org/traccar/forward/EventForwarderKafka.java @@ -30,6 +30,8 @@ public class EventForwarderKafka implements EventForwarder { private final Producer producer; private final ObjectMapper objectMapper; + private final String topic; + public EventForwarderKafka(Config config, ObjectMapper objectMapper) { this.objectMapper = objectMapper; Properties properties = new Properties(); @@ -38,6 +40,7 @@ public class EventForwarderKafka implements EventForwarder { properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); + topic = config.getString(Keys.EVENT_FORWARD_TOPIC); } @SuppressWarnings("deprecation") @@ -51,7 +54,7 @@ public class EventForwarderKafka implements EventForwarder { try { String key = Long.toString(eventData.getDevice().getId()); String value = objectMapper.writeValueAsString(eventData); - producer.send(new ProducerRecord<>("events", key, value)); + producer.send(new ProducerRecord<>(topic, key, value)); resultHandler.onResult(true, null); } catch (JsonProcessingException e) { resultHandler.onResult(false, e); -- cgit v1.2.3