aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorAnton Tananaev <anton.tananaev@gmail.com>2023-07-19 22:13:19 -0700
committerGitHub <noreply@github.com>2023-07-19 22:13:19 -0700
commitf1470e56708b66c805e63619209930e83116f4ea (patch)
treef01e6b4777cbaad4b3464e89076c7e8aaf4ab6db /src/main
parent840d5600c69839e9be7a241d69005f322dfef124 (diff)
parentdacaefcd4e416eeeedf1074729a8670aec1e53fb (diff)
downloadtrackermap-server-f1470e56708b66c805e63619209930e83116f4ea.tar.gz
trackermap-server-f1470e56708b66c805e63619209930e83116f4ea.tar.bz2
trackermap-server-f1470e56708b66c805e63619209930e83116f4ea.zip
Merge pull request #5136 from RodolfoSilva/feature/amqp-forwarder
Add AMQP(RabbitMQ) Forwarder
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/traccar/MainModule.java6
-rw-r--r--src/main/java/org/traccar/config/Keys.java20
-rw-r--r--src/main/java/org/traccar/forward/AmqpClient.java57
-rw-r--r--src/main/java/org/traccar/forward/EventForwarderAmqp.java48
-rw-r--r--src/main/java/org/traccar/forward/PositionForwarderAmqp.java48
5 files changed, 177 insertions, 2 deletions
diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java
index b7bdbc6bf..f5db75846 100644
--- a/src/main/java/org/traccar/MainModule.java
+++ b/src/main/java/org/traccar/MainModule.java
@@ -37,10 +37,12 @@ import org.traccar.database.OpenIdProvider;
import org.traccar.database.StatisticsManager;
import org.traccar.forward.EventForwarder;
import org.traccar.forward.EventForwarderJson;
+import org.traccar.forward.EventForwarderAmqp;
import org.traccar.forward.EventForwarderKafka;
import org.traccar.forward.EventForwarderMqtt;
import org.traccar.forward.PositionForwarder;
import org.traccar.forward.PositionForwarderJson;
+import org.traccar.forward.PositionForwarderAmqp;
import org.traccar.forward.PositionForwarderKafka;
import org.traccar.forward.PositionForwarderRedis;
import org.traccar.forward.PositionForwarderUrl;
@@ -360,6 +362,8 @@ public class MainModule extends AbstractModule {
if (config.hasKey(Keys.EVENT_FORWARD_URL)) {
String forwardType = config.getString(Keys.EVENT_FORWARD_TYPE);
switch (forwardType) {
+ case "amqp":
+ return new EventForwarderAmqp(config, objectMapper);
case "kafka":
return new EventForwarderKafka(config, objectMapper);
case "mqtt":
@@ -379,6 +383,8 @@ public class MainModule extends AbstractModule {
switch (config.getString(Keys.FORWARD_TYPE)) {
case "json":
return new PositionForwarderJson(config, client, objectMapper);
+ case "amqp":
+ return new PositionForwarderAmqp(config, objectMapper);
case "kafka":
return new PositionForwarderKafka(config, objectMapper);
case "redis":
diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java
index 7ad981799..f95794e03 100644
--- a/src/main/java/org/traccar/config/Keys.java
+++ b/src/main/java/org/traccar/config/Keys.java
@@ -844,7 +844,15 @@ public final class Keys {
"url");
/**
- * Position forwarding Kafka topic.
+ * Position forwarding AMQP exchange.
+ */
+ public static final ConfigKey<String> FORWARD_EXCHANGE = new StringConfigKey(
+ "forward.exchange",
+ List.of(KeyType.CONFIG),
+ "traccar");
+
+ /**
+ * Position forwarding Kafka topic or AQMP Routing Key.
*/
public static final ConfigKey<String> FORWARD_TOPIC = new StringConfigKey(
"forward.topic",
@@ -913,7 +921,15 @@ public final class Keys {
"json");
/**
- * Events forwarding Kafka topic.
+ * Events forwarding AMQP exchange.
+ */
+ public static final ConfigKey<String> EVENT_FORWARD_EXCHANGE = new StringConfigKey(
+ "event.forward.exchange",
+ List.of(KeyType.CONFIG),
+ "traccar");
+
+ /**
+ * Events forwarding Kafka topic or AQMP Routing Key.
*/
public static final ConfigKey<String> EVENT_FORWARD_TOPIC = new StringConfigKey(
"event.forward.topic",
diff --git a/src/main/java/org/traccar/forward/AmqpClient.java b/src/main/java/org/traccar/forward/AmqpClient.java
new file mode 100644
index 000000000..302aa664c
--- /dev/null
+++ b/src/main/java/org/traccar/forward/AmqpClient.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2023 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.forward;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.TimeoutException;
+
+public class AmqpClient {
+ private final Channel channel;
+ private final String exchange;
+ private final String topic;
+
+ AmqpClient(String connectionUrl, String exchange, String topic) {
+ this.exchange = exchange;
+ this.topic = topic;
+
+ ConnectionFactory factory = new ConnectionFactory();
+ try {
+ factory.setUri(connectionUrl);
+ } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) {
+ throw new RuntimeException("Error while setting URI for RabbitMQ connection factory", e);
+ }
+
+ try (Connection connection = factory.newConnection()) {
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
+ } catch (IOException | TimeoutException e) {
+ throw new RuntimeException("Error while creating and configuring RabbitMQ channel", e);
+ }
+ }
+
+ public void publishMessage(String message) throws IOException {
+ channel.basicPublish(exchange, topic, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
+ }
+}
diff --git a/src/main/java/org/traccar/forward/EventForwarderAmqp.java b/src/main/java/org/traccar/forward/EventForwarderAmqp.java
new file mode 100644
index 000000000..5c38a4459
--- /dev/null
+++ b/src/main/java/org/traccar/forward/EventForwarderAmqp.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.forward;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.traccar.config.Config;
+import org.traccar.config.Keys;
+
+import java.io.IOException;
+
+public class EventForwarderAmqp implements EventForwarder {
+
+ private final AmqpClient amqpClient;
+ private final ObjectMapper objectMapper;
+
+ public EventForwarderAmqp(Config config, ObjectMapper objectMapper) {
+ String connectionUrl = config.getString(Keys.EVENT_FORWARD_URL);
+ String exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE);
+ String topic = config.getString(Keys.EVENT_FORWARD_TOPIC);
+ this.objectMapper = objectMapper;
+ amqpClient = new AmqpClient(connectionUrl, exchange, topic);
+ }
+
+ @Override
+ public void forward(EventData eventData, ResultHandler resultHandler) {
+ try {
+ String value = objectMapper.writeValueAsString(eventData);
+ amqpClient.publishMessage(value);
+ resultHandler.onResult(true, null);
+ } catch (IOException e) {
+ resultHandler.onResult(false, e);
+ }
+ }
+}
diff --git a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java
new file mode 100644
index 000000000..3996bda15
--- /dev/null
+++ b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.forward;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.traccar.config.Config;
+import org.traccar.config.Keys;
+
+import java.io.IOException;
+
+public class PositionForwarderAmqp implements PositionForwarder {
+
+ private final AmqpClient amqpClient;
+ private final ObjectMapper objectMapper;
+
+ public PositionForwarderAmqp(Config config, ObjectMapper objectMapper) {
+ String connectionUrl = config.getString(Keys.FORWARD_URL);
+ String exchange = config.getString(Keys.FORWARD_EXCHANGE);
+ String topic = config.getString(Keys.FORWARD_TOPIC);
+ amqpClient = new AmqpClient(connectionUrl, exchange, topic);
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ public void forward(PositionData positionData, ResultHandler resultHandler) {
+ try {
+ String value = objectMapper.writeValueAsString(positionData);
+ amqpClient.publishMessage(value);
+ resultHandler.onResult(true, null);
+ } catch (IOException e) {
+ resultHandler.onResult(false, e);
+ }
+ }
+}