aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/traccar/forward/AmqpClient.java57
-rw-r--r--src/main/java/org/traccar/forward/EventForwarderAmqp.java49
-rw-r--r--src/main/java/org/traccar/forward/PositionForwarderAmqp.java48
3 files changed, 73 insertions, 81 deletions
diff --git a/src/main/java/org/traccar/forward/AmqpClient.java b/src/main/java/org/traccar/forward/AmqpClient.java
new file mode 100644
index 000000000..302aa664c
--- /dev/null
+++ b/src/main/java/org/traccar/forward/AmqpClient.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2023 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.forward;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.TimeoutException;
+
+public class AmqpClient {
+ private final Channel channel;
+ private final String exchange;
+ private final String topic;
+
+ AmqpClient(String connectionUrl, String exchange, String topic) {
+ this.exchange = exchange;
+ this.topic = topic;
+
+ ConnectionFactory factory = new ConnectionFactory();
+ try {
+ factory.setUri(connectionUrl);
+ } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) {
+ throw new RuntimeException("Error while setting URI for RabbitMQ connection factory", e);
+ }
+
+ try (Connection connection = factory.newConnection()) {
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
+ } catch (IOException | TimeoutException e) {
+ throw new RuntimeException("Error while creating and configuring RabbitMQ channel", e);
+ }
+ }
+
+ public void publishMessage(String message) throws IOException {
+ channel.basicPublish(exchange, topic, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
+ }
+}
diff --git a/src/main/java/org/traccar/forward/EventForwarderAmqp.java b/src/main/java/org/traccar/forward/EventForwarderAmqp.java
index 97f14d4ea..5c38a4459 100644
--- a/src/main/java/org/traccar/forward/EventForwarderAmqp.java
+++ b/src/main/java/org/traccar/forward/EventForwarderAmqp.java
@@ -15,64 +15,31 @@
*/
package org.traccar.forward;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.rabbitmq.client.AMQP.BasicProperties;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
+
import org.traccar.config.Config;
import org.traccar.config.Keys;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
public class EventForwarderAmqp implements EventForwarder {
- private final Channel channel;
+ private final AmqpClient amqpClient;
private final ObjectMapper objectMapper;
- private final String exchange;
- private final String topic;
-
public EventForwarderAmqp(Config config, ObjectMapper objectMapper) {
- ConnectionFactory factory = new ConnectionFactory();
- try {
- factory.setUri(config.getString(Keys.EVENT_FORWARD_URL));
- } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) {
- throw new RuntimeException(e);
- }
-
- try {
- Connection connection = factory.newConnection();
- exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE);
- topic = config.getString(Keys.EVENT_FORWARD_TOPIC);
- channel = connection.createChannel();
- channel.exchangeDeclare(exchange, "topic", true);
- this.objectMapper = objectMapper;
- } catch (IOException | TimeoutException e) {
- throw new RuntimeException(e);
- }
+ String connectionUrl = config.getString(Keys.EVENT_FORWARD_URL);
+ String exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE);
+ String topic = config.getString(Keys.EVENT_FORWARD_TOPIC);
+ this.objectMapper = objectMapper;
+ amqpClient = new AmqpClient(connectionUrl, exchange, topic);
}
@Override
public void forward(EventData eventData, ResultHandler resultHandler) {
try {
String value = objectMapper.writeValueAsString(eventData);
-
- BasicProperties properties = new BasicProperties.Builder()
- .contentType("application/json")
- .contentEncoding("string")
- .deliveryMode(2)
- .priority(10)
- .build();
-
- channel.basicPublish(exchange, topic, properties, value.getBytes());
+ amqpClient.publishMessage(value);
resultHandler.onResult(true, null);
} catch (IOException e) {
resultHandler.onResult(false, e);
diff --git a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java
index ea9e5c370..3996bda15 100644
--- a/src/main/java/org/traccar/forward/PositionForwarderAmqp.java
+++ b/src/main/java/org/traccar/forward/PositionForwarderAmqp.java
@@ -15,63 +15,31 @@
*/
package org.traccar.forward;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.rabbitmq.client.AMQP.BasicProperties;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
+
import org.traccar.config.Config;
import org.traccar.config.Keys;
import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
public class PositionForwarderAmqp implements PositionForwarder {
- private final Channel channel;
+ private final AmqpClient amqpClient;
private final ObjectMapper objectMapper;
- private final String exchange;
- private final String topic;
-
public PositionForwarderAmqp(Config config, ObjectMapper objectMapper) {
- ConnectionFactory factory = new ConnectionFactory();
- try {
- factory.setUri(config.getString(Keys.FORWARD_URL));
- } catch (NoSuchAlgorithmException | URISyntaxException | KeyManagementException e) {
- throw new RuntimeException(e);
- }
-
- try {
- Connection connection = factory.newConnection();
- topic = config.getString(Keys.FORWARD_TOPIC);
- exchange = config.getString(Keys.EVENT_FORWARD_EXCHANGE);
- channel = connection.createChannel();
- channel.exchangeDeclare(exchange, "topic", true);
- this.objectMapper = objectMapper;
- } catch (IOException | TimeoutException e) {
- throw new RuntimeException(e);
- }
+ String connectionUrl = config.getString(Keys.FORWARD_URL);
+ String exchange = config.getString(Keys.FORWARD_EXCHANGE);
+ String topic = config.getString(Keys.FORWARD_TOPIC);
+ amqpClient = new AmqpClient(connectionUrl, exchange, topic);
+ this.objectMapper = objectMapper;
}
@Override
public void forward(PositionData positionData, ResultHandler resultHandler) {
try {
String value = objectMapper.writeValueAsString(positionData);
-
- BasicProperties properties = new BasicProperties.Builder()
- .contentType("application/json")
- .contentEncoding("string")
- .deliveryMode(2)
- .priority(10)
- .build();
-
- channel.basicPublish(exchange, topic, properties, value.getBytes());
+ amqpClient.publishMessage(value);
resultHandler.onResult(true, null);
} catch (IOException e) {
resultHandler.onResult(false, e);