aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRodolfo Silva <contato@rodolfosilva.com>2023-07-13 14:26:34 -0300
committerRodolfo Silva <contato@rodolfosilva.com>2023-07-13 15:05:37 -0300
commitbd64dc0bc759e8325f09686909c25f3a08fbf686 (patch)
tree47c13829b922c022a1ab89dc78449ec22c91b2db
parent2be2a4558ace9825a69aecf6329305490eb5fe5e (diff)
downloadtrackermap-server-bd64dc0bc759e8325f09686909c25f3a08fbf686.tar.gz
trackermap-server-bd64dc0bc759e8325f09686909c25f3a08fbf686.tar.bz2
trackermap-server-bd64dc0bc759e8325f09686909c25f3a08fbf686.zip
AMQP Forwarder
Add support to forward events and positions to AMQP (RabbitMQ).
-rw-r--r--build.gradle1
-rw-r--r--src/main/java/org/traccar/MainModule.java6
-rw-r--r--src/main/java/org/traccar/config/Keys.java20
-rw-r--r--src/main/java/org/traccar/forward/EventForwarderAmqp.java81
-rw-r--r--src/main/java/org/traccar/forward/PositionForwarderAmqp.java80
5 files changed, 186 insertions, 2 deletions
diff --git a/build.gradle b/build.gradle
index b4e11ed98..d08dfebc5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -89,6 +89,7 @@ dependencies {
implementation "redis.clients:jedis:4.4.1"
implementation "com.google.firebase:firebase-admin:9.1.1"
implementation "com.nimbusds:oauth2-oidc-sdk:10.9.1"
+ implementation "com.rabbitmq:amqp-client:5.18.0"
testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
testImplementation "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
testImplementation "org.mockito:mockito-core:5.3.1"
diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java
index b7bdbc6bf..f5db75846 100644
--- a/src/main/java/org/traccar/MainModule.java
+++ b/src/main/java/org/traccar/MainModule.java
@@ -37,10 +37,12 @@ import org.traccar.database.OpenIdProvider;
import org.traccar.database.StatisticsManager;
import org.traccar.forward.EventForwarder;
import org.traccar.forward.EventForwarderJson;
+import org.traccar.forward.EventForwarderAmqp;
import org.traccar.forward.EventForwarderKafka;
import org.traccar.forward.EventForwarderMqtt;
import org.traccar.forward.PositionForwarder;
import org.traccar.forward.PositionForwarderJson;
+import org.traccar.forward.PositionForwarderAmqp;
import org.traccar.forward.PositionForwarderKafka;
import org.traccar.forward.PositionForwarderRedis;
import org.traccar.forward.PositionForwarderUrl;
@@ -360,6 +362,8 @@ public class MainModule extends AbstractModule {
if (config.hasKey(Keys.EVENT_FORWARD_URL)) {
String forwardType = config.getString(Keys.EVENT_FORWARD_TYPE);
switch (forwardType) {
+ case "amqp":
+ return new EventForwarderAmqp(config, objectMapper);
case "kafka":
return new EventForwarderKafka(config, objectMapper);
case "mqtt":
@@ -379,6 +383,8 @@ public class MainModule extends AbstractModule {
switch (config.getString(Keys.FORWARD_TYPE)) {
case "json":
return new PositionForwarderJson(config, client, objectMapper);
+ case "amqp":
+ return new PositionForwarderAmqp(config, objectMapper);
case "kafka":
return new PositionForwarderKafka(config, objectMapper);
case "redis":
diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java
index 6f5360c22..314ac0df2 100644
--- a/src/main/java/org/traccar/config/Keys.java
+++ b/src/main/java/org/traccar/config/Keys.java
@@ -837,7 +837,15 @@ public final class Keys {
"url");
/**
- * Position forwarding Kafka topic.
+ * Position forwarding AMQP exchange.
+ */
+ public static final ConfigKey<String> FORWARD_EXCHANGE = new StringConfigKey(
+ "forward.exchange",
+ List.of(KeyType.CONFIG),
+ "traccar");
+
+ /**
+ * Position forwarding Kafka topic or AQMP Routing Key.
*/
public static final ConfigKey<String> FORWARD_TOPIC = new StringConfigKey(
"forward.topic",
@@ -906,7 +914,15 @@ public final class Keys {
"json");
/**
- * Events forwarding Kafka topic.
+ * Events forwarding AMQP exchange.
+ */
+ public static final ConfigKey<String> EVENT_FORWARD_EXCHANGE = new StringConfigKey(
+ "event.forward.exchange",
+ List.of(KeyType.CONFIG),
+ "traccar");
+
+ /**
+ * Events forwarding Kafka topic or AQMP Routing Key.
*/
public static final ConfigKey<String> EVENT_FORWARD_TOPIC = new StringConfigKey(
"event.forward.topic",
diff --git a/src/main/java/org/traccar/forward/EventForwarderAmqp.java b/src/main/java/org/traccar/forward/EventForwarderAmqp.java
new file mode 100644
index 000000000..64edbe8a5
--- /dev/null
+++ b/src/main/java/org/traccar/forward/EventForwarderAmqp.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.forward;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.traccar.config.Config;
+import org.traccar.config.Keys;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+public class EventForwarderAmqp implements EventForwarder {
+
+ private final Channel channel;
+ private final ObjectMapper objectMapper;
+
+ private final String exchange;
+ private final String topic;
+
+ public EventForwarderAmqp(Config config, ObjectMapper objectMapper) {
+ ConnectionFactory factory = new ConnectionFactory();
+ try {
+ factory.setUri(config.getString(Keys.EVENT_FORWARD_URL));
+ } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Connection connection = factory.newConnection();
+ exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE);
+ topic = config.getString(Keys.EVENT_FORWARD_TOPIC);
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchange, "topic", true);
+ this.objectMapper = objectMapper;
+ } catch (IOException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void forward(EventData eventData, ResultHandler resultHandler) {
+ try {
+ String value = objectMapper.writeValueAsString(eventData);
+
+ BasicProperties properties = new BasicProperties.Builder()
+ .contentType("application/json")
+ .contentEncoding("string")
+ .deliveryMode(2)
+ .priority(10)
+ .build();
+
+ channel.basicPublish(exchange, topic, properties, value.getBytes());
+ resultHandler.onResult(true, null);
+ } catch (IOException e) {
+ resultHandler.onResult(false, e);
+ }
+ }
+}
diff --git a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java
new file mode 100644
index 000000000..c55ca3d73
--- /dev/null
+++ b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2022 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.forward;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.traccar.config.Config;
+import org.traccar.config.Keys;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+public class PositionForwarderAmqp implements PositionForwarder {
+
+ private final Channel channel;
+ private final ObjectMapper objectMapper;
+
+ private final String exchange;
+ private final String topic;
+
+ public PositionForwarderAmqp(Config config, ObjectMapper objectMapper) {
+ ConnectionFactory factory = new ConnectionFactory();
+ try {
+ factory.setUri(config.getString(Keys.FORWARD_URL));
+ } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Connection connection = factory.newConnection();
+ topic = config.getString(Keys.FORWARD_TOPIC);
+ exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE);
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchange, "topic", true);
+ this.objectMapper = objectMapper;
+ } catch (IOException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void forward(PositionData positionData, ResultHandler resultHandler) {
+ try {
+ String value = objectMapper.writeValueAsString(positionData);
+
+ BasicProperties properties = new BasicProperties.Builder()
+ .contentType("application/json")
+ .contentEncoding("string")
+ .deliveryMode(2)
+ .priority(10)
+ .build();
+
+ channel.basicPublish(exchange, topic, properties, value.getBytes());
+ resultHandler.onResult(true, null);
+ } catch (IOException e) {
+ resultHandler.onResult(false, e);
+ }
+ }
+}