From bd64dc0bc759e8325f09686909c25f3a08fbf686 Mon Sep 17 00:00:00 2001 From: Rodolfo Silva Date: Thu, 13 Jul 2023 14:26:34 -0300 Subject: AMQP Forwarder Add support to forward events and positions to AMQP (RabbitMQ). --- src/main/java/org/traccar/MainModule.java | 6 ++ src/main/java/org/traccar/config/Keys.java | 20 +++++- .../org/traccar/forward/EventForwarderAmqp.java | 81 ++++++++++++++++++++++ .../org/traccar/forward/PositionForwarderAmqp.java | 80 +++++++++++++++++++++ 4 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/traccar/forward/EventForwarderAmqp.java create mode 100644 src/main/java/org/traccar/forward/PositionForwarderAmqp.java (limited to 'src') diff --git a/src/main/java/org/traccar/MainModule.java b/src/main/java/org/traccar/MainModule.java index b7bdbc6bf..f5db75846 100644 --- a/src/main/java/org/traccar/MainModule.java +++ b/src/main/java/org/traccar/MainModule.java @@ -37,10 +37,12 @@ import org.traccar.database.OpenIdProvider; import org.traccar.database.StatisticsManager; import org.traccar.forward.EventForwarder; import org.traccar.forward.EventForwarderJson; +import org.traccar.forward.EventForwarderAmqp; import org.traccar.forward.EventForwarderKafka; import org.traccar.forward.EventForwarderMqtt; import org.traccar.forward.PositionForwarder; import org.traccar.forward.PositionForwarderJson; +import org.traccar.forward.PositionForwarderAmqp; import org.traccar.forward.PositionForwarderKafka; import org.traccar.forward.PositionForwarderRedis; import org.traccar.forward.PositionForwarderUrl; @@ -360,6 +362,8 @@ public class MainModule extends AbstractModule { if (config.hasKey(Keys.EVENT_FORWARD_URL)) { String forwardType = config.getString(Keys.EVENT_FORWARD_TYPE); switch (forwardType) { + case "amqp": + return new EventForwarderAmqp(config, objectMapper); case "kafka": return new EventForwarderKafka(config, objectMapper); case "mqtt": @@ -379,6 +383,8 @@ public class MainModule extends AbstractModule { switch (config.getString(Keys.FORWARD_TYPE)) { case "json": return new PositionForwarderJson(config, client, objectMapper); + case "amqp": + return new PositionForwarderAmqp(config, objectMapper); case "kafka": return new PositionForwarderKafka(config, objectMapper); case "redis": diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index 6f5360c22..314ac0df2 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -837,7 +837,15 @@ public final class Keys { "url"); /** - * Position forwarding Kafka topic. + * Position forwarding AMQP exchange. + */ + public static final ConfigKey FORWARD_EXCHANGE = new StringConfigKey( + "forward.exchange", + List.of(KeyType.CONFIG), + "traccar"); + + /** + * Position forwarding Kafka topic or AQMP Routing Key. */ public static final ConfigKey FORWARD_TOPIC = new StringConfigKey( "forward.topic", @@ -906,7 +914,15 @@ public final class Keys { "json"); /** - * Events forwarding Kafka topic. + * Events forwarding AMQP exchange. + */ + public static final ConfigKey EVENT_FORWARD_EXCHANGE = new StringConfigKey( + "event.forward.exchange", + List.of(KeyType.CONFIG), + "traccar"); + + /** + * Events forwarding Kafka topic or AQMP Routing Key. */ public static final ConfigKey EVENT_FORWARD_TOPIC = new StringConfigKey( "event.forward.topic", diff --git a/src/main/java/org/traccar/forward/EventForwarderAmqp.java b/src/main/java/org/traccar/forward/EventForwarderAmqp.java new file mode 100644 index 000000000..64edbe8a5 --- /dev/null +++ b/src/main/java/org/traccar/forward/EventForwarderAmqp.java @@ -0,0 +1,81 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.forward; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.traccar.config.Config; +import org.traccar.config.Keys; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.Properties; +import java.util.concurrent.TimeoutException; + +public class EventForwarderAmqp implements EventForwarder { + + private final Channel channel; + private final ObjectMapper objectMapper; + + private final String exchange; + private final String topic; + + public EventForwarderAmqp(Config config, ObjectMapper objectMapper) { + ConnectionFactory factory = new ConnectionFactory(); + try { + factory.setUri(config.getString(Keys.EVENT_FORWARD_URL)); + } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) { + throw new RuntimeException(e); + } + + try { + Connection connection = factory.newConnection(); + exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE); + topic = config.getString(Keys.EVENT_FORWARD_TOPIC); + channel = connection.createChannel(); + channel.exchangeDeclare(exchange, "topic", true); + this.objectMapper = objectMapper; + } catch (IOException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + @Override + public void forward(EventData eventData, ResultHandler resultHandler) { + try { + String value = objectMapper.writeValueAsString(eventData); + + BasicProperties properties = new BasicProperties.Builder() + .contentType("application/json") + .contentEncoding("string") + .deliveryMode(2) + .priority(10) + .build(); + + channel.basicPublish(exchange, topic, properties, value.getBytes()); + resultHandler.onResult(true, null); + } catch (IOException e) { + resultHandler.onResult(false, e); + } + } +} diff --git a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java new file mode 100644 index 000000000..c55ca3d73 --- /dev/null +++ b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java @@ -0,0 +1,80 @@ +/* + * Copyright 2022 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.forward; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.traccar.config.Config; +import org.traccar.config.Keys; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.Properties; +import java.util.concurrent.TimeoutException; + +public class PositionForwarderAmqp implements PositionForwarder { + + private final Channel channel; + private final ObjectMapper objectMapper; + + private final String exchange; + private final String topic; + + public PositionForwarderAmqp(Config config, ObjectMapper objectMapper) { + ConnectionFactory factory = new ConnectionFactory(); + try { + factory.setUri(config.getString(Keys.FORWARD_URL)); + } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) { + throw new RuntimeException(e); + } + + try { + Connection connection = factory.newConnection(); + topic = config.getString(Keys.FORWARD_TOPIC); + exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE); + channel = connection.createChannel(); + channel.exchangeDeclare(exchange, "topic", true); + this.objectMapper = objectMapper; + } catch (IOException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + @Override + public void forward(PositionData positionData, ResultHandler resultHandler) { + try { + String value = objectMapper.writeValueAsString(positionData); + + BasicProperties properties = new BasicProperties.Builder() + .contentType("application/json") + .contentEncoding("string") + .deliveryMode(2) + .priority(10) + .build(); + + channel.basicPublish(exchange, topic, properties, value.getBytes()); + resultHandler.onResult(true, null); + } catch (IOException e) { + resultHandler.onResult(false, e); + } + } +} -- cgit v1.2.3 From 8970e5ad49cbe9ad2ec106b2140cfc9f276ef7de Mon Sep 17 00:00:00 2001 From: Rodolfo Silva Date: Wed, 19 Jul 2023 21:38:33 -0300 Subject: change the year from the copyright text --- src/main/java/org/traccar/forward/EventForwarderAmqp.java | 2 +- src/main/java/org/traccar/forward/PositionForwarderAmqp.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/main/java/org/traccar/forward/EventForwarderAmqp.java b/src/main/java/org/traccar/forward/EventForwarderAmqp.java index 64edbe8a5..97f14d4ea 100644 --- a/src/main/java/org/traccar/forward/EventForwarderAmqp.java +++ b/src/main/java/org/traccar/forward/EventForwarderAmqp.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Anton Tananaev (anton@traccar.org) + * Copyright 2023 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java index c55ca3d73..ea9e5c370 100644 --- a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java +++ b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Anton Tananaev (anton@traccar.org) + * Copyright 2023 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. -- cgit v1.2.3 From dacaefcd4e416eeeedf1074729a8670aec1e53fb Mon Sep 17 00:00:00 2001 From: Rodolfo Silva Date: Wed, 19 Jul 2023 22:42:16 -0300 Subject: refactor the amqp client logic to have a separated class --- src/main/java/org/traccar/forward/AmqpClient.java | 57 ++++++++++++++++++++++ .../org/traccar/forward/EventForwarderAmqp.java | 49 +++---------------- .../org/traccar/forward/PositionForwarderAmqp.java | 48 +++--------------- 3 files changed, 73 insertions(+), 81 deletions(-) create mode 100644 src/main/java/org/traccar/forward/AmqpClient.java (limited to 'src') diff --git a/src/main/java/org/traccar/forward/AmqpClient.java b/src/main/java/org/traccar/forward/AmqpClient.java new file mode 100644 index 000000000..302aa664c --- /dev/null +++ b/src/main/java/org/traccar/forward/AmqpClient.java @@ -0,0 +1,57 @@ +/* + * Copyright 2023 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.forward; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.TimeoutException; + +public class AmqpClient { + private final Channel channel; + private final String exchange; + private final String topic; + + AmqpClient(String connectionUrl, String exchange, String topic) { + this.exchange = exchange; + this.topic = topic; + + ConnectionFactory factory = new ConnectionFactory(); + try { + factory.setUri(connectionUrl); + } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) { + throw new RuntimeException("Error while setting URI for RabbitMQ connection factory", e); + } + + try (Connection connection = factory.newConnection()) { + channel = connection.createChannel(); + channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true); + } catch (IOException | TimeoutException e) { + throw new RuntimeException("Error while creating and configuring RabbitMQ channel", e); + } + } + + public void publishMessage(String message) throws IOException { + channel.basicPublish(exchange, topic, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); + } +} diff --git a/src/main/java/org/traccar/forward/EventForwarderAmqp.java b/src/main/java/org/traccar/forward/EventForwarderAmqp.java index 97f14d4ea..5c38a4459 100644 --- a/src/main/java/org/traccar/forward/EventForwarderAmqp.java +++ b/src/main/java/org/traccar/forward/EventForwarderAmqp.java @@ -15,64 +15,31 @@ */ package org.traccar.forward; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; + import org.traccar.config.Config; import org.traccar.config.Keys; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.Properties; -import java.util.concurrent.TimeoutException; public class EventForwarderAmqp implements EventForwarder { - private final Channel channel; + private final AmqpClient amqpClient; private final ObjectMapper objectMapper; - private final String exchange; - private final String topic; - public EventForwarderAmqp(Config config, ObjectMapper objectMapper) { - ConnectionFactory factory = new ConnectionFactory(); - try { - factory.setUri(config.getString(Keys.EVENT_FORWARD_URL)); - } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) { - throw new RuntimeException(e); - } - - try { - Connection connection = factory.newConnection(); - exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE); - topic = config.getString(Keys.EVENT_FORWARD_TOPIC); - channel = connection.createChannel(); - channel.exchangeDeclare(exchange, "topic", true); - this.objectMapper = objectMapper; - } catch (IOException | TimeoutException e) { - throw new RuntimeException(e); - } + String connectionUrl = config.getString(Keys.EVENT_FORWARD_URL); + String exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE); + String topic = config.getString(Keys.EVENT_FORWARD_TOPIC); + this.objectMapper = objectMapper; + amqpClient = new AmqpClient(connectionUrl, exchange, topic); } @Override public void forward(EventData eventData, ResultHandler resultHandler) { try { String value = objectMapper.writeValueAsString(eventData); - - BasicProperties properties = new BasicProperties.Builder() - .contentType("application/json") - .contentEncoding("string") - .deliveryMode(2) - .priority(10) - .build(); - - channel.basicPublish(exchange, topic, properties, value.getBytes()); + amqpClient.publishMessage(value); resultHandler.onResult(true, null); } catch (IOException e) { resultHandler.onResult(false, e); diff --git a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java index ea9e5c370..3996bda15 100644 --- a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java +++ b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java @@ -15,63 +15,31 @@ */ package org.traccar.forward; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; + import org.traccar.config.Config; import org.traccar.config.Keys; import java.io.IOException; -import java.net.URISyntaxException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.Properties; -import java.util.concurrent.TimeoutException; public class PositionForwarderAmqp implements PositionForwarder { - private final Channel channel; + private final AmqpClient amqpClient; private final ObjectMapper objectMapper; - private final String exchange; - private final String topic; - public PositionForwarderAmqp(Config config, ObjectMapper objectMapper) { - ConnectionFactory factory = new ConnectionFactory(); - try { - factory.setUri(config.getString(Keys.FORWARD_URL)); - } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) { - throw new RuntimeException(e); - } - - try { - Connection connection = factory.newConnection(); - topic = config.getString(Keys.FORWARD_TOPIC); - exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE); - channel = connection.createChannel(); - channel.exchangeDeclare(exchange, "topic", true); - this.objectMapper = objectMapper; - } catch (IOException | TimeoutException e) { - throw new RuntimeException(e); - } + String connectionUrl = config.getString(Keys.FORWARD_URL); + String exchange = config.getString(Keys.FORWARD_EXCHANGE); + String topic = config.getString(Keys.FORWARD_TOPIC); + amqpClient = new AmqpClient(connectionUrl, exchange, topic); + this.objectMapper = objectMapper; } @Override public void forward(PositionData positionData, ResultHandler resultHandler) { try { String value = objectMapper.writeValueAsString(positionData); - - BasicProperties properties = new BasicProperties.Builder() - .contentType("application/json") - .contentEncoding("string") - .deliveryMode(2) - .priority(10) - .build(); - - channel.basicPublish(exchange, topic, properties, value.getBytes()); + amqpClient.publishMessage(value); resultHandler.onResult(true, null); } catch (IOException e) { resultHandler.onResult(false, e); -- cgit v1.2.3