aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/traccar/forward/EventForwarderMqtt.java10
-rw-r--r--src/main/java/org/traccar/forward/MqttClient.java (renamed from src/main/java/org/traccar/helper/MqttUtil.java)27
-rw-r--r--src/main/java/org/traccar/forward/PositionForwarderMqtt.java9
3 files changed, 17 insertions, 29 deletions
diff --git a/src/main/java/org/traccar/forward/EventForwarderMqtt.java b/src/main/java/org/traccar/forward/EventForwarderMqtt.java
index 83df795b1..7d1c7dd3c 100644
--- a/src/main/java/org/traccar/forward/EventForwarderMqtt.java
+++ b/src/main/java/org/traccar/forward/EventForwarderMqtt.java
@@ -17,22 +17,20 @@ package org.traccar.forward;
import org.traccar.config.Config;
import org.traccar.config.Keys;
-import org.traccar.helper.MqttUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
public class EventForwarderMqtt implements EventForwarder {
- private final Mqtt5AsyncClient client;
+ private final MqttClient mqttClient;
private final ObjectMapper objectMapper;
private final String topic;
public EventForwarderMqtt(Config config, ObjectMapper objectMapper) {
this.topic = config.getString(Keys.EVENT_FORWARD_TOPIC);
- client = MqttUtil.createClient(config.getString(Keys.EVENT_FORWARD_URL));
+ mqttClient = new MqttClient(config.getString(Keys.FORWARD_URL));
this.objectMapper = objectMapper;
}
@@ -41,12 +39,10 @@ public class EventForwarderMqtt implements EventForwarder {
String payload;
try {
payload = objectMapper.writeValueAsString(eventData);
+ mqttClient.publish(topic, payload, (message, e) -> resultHandler.onResult(e == null, e));
} catch (JsonProcessingException e) {
resultHandler.onResult(false, e);
- return;
}
-
- MqttUtil.publish(client, topic, payload, (message, e) -> resultHandler.onResult(e == null, e));
}
}
diff --git a/src/main/java/org/traccar/helper/MqttUtil.java b/src/main/java/org/traccar/forward/MqttClient.java
index 9fc16012f..9059fc876 100644
--- a/src/main/java/org/traccar/helper/MqttUtil.java
+++ b/src/main/java/org/traccar/forward/MqttClient.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.traccar.helper;
+package org.traccar.forward;
import java.net.URI;
import java.net.URISyntaxException;
@@ -27,13 +27,9 @@ import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
-public final class MqttUtil {
-
- private MqttUtil() {
-
- }
-
- public static Mqtt5AsyncClient createClient(final String url) {
+public class MqttClient {
+ private final Mqtt5AsyncClient client;
+ MqttClient(String url) {
URI uri;
try {
uri = new URI(url);
@@ -41,22 +37,21 @@ public final class MqttUtil {
throw new RuntimeException(e);
}
- Mqtt5SimpleAuth simpleAuth = getSimpleAuth(uri);
+ Mqtt5SimpleAuth simpleAuth = this.getSimpleAuth(uri);
String host = uri.getHost();
int port = uri.getPort();
Mqtt5ClientBuilder builder = Mqtt5Client.builder().identifier("traccar-" + UUID.randomUUID())
.serverHost(host).serverPort(port).simpleAuth(simpleAuth).automaticReconnectWithDefaultConfig();
- Mqtt5AsyncClient client;
+
client = builder.buildAsync();
client.connectWith().send().whenComplete((message, e) -> {
throw new RuntimeException(e);
});
-
- return client;
}
- private static Mqtt5SimpleAuth getSimpleAuth(final URI uri) {
+
+ private Mqtt5SimpleAuth getSimpleAuth(final URI uri) {
String userInfo = uri.getUserInfo();
Mqtt5SimpleAuth simpleAuth = null;
if (userInfo != null) {
@@ -71,9 +66,9 @@ public final class MqttUtil {
return simpleAuth;
}
- public static void publish(final Mqtt5AsyncClient client, final String pubTopic, final String payload,
- final BiConsumer<? super Mqtt5PublishResult,
- ? super Throwable> whenComplete) {
+ public void publish(final String pubTopic, final String payload,
+ final BiConsumer<? super Mqtt5PublishResult,
+ ? super Throwable> whenComplete) {
client.publishWith().topic(pubTopic).qos(MqttQos.AT_LEAST_ONCE).payload(payload.getBytes()).send()
.whenComplete(whenComplete);
}
diff --git a/src/main/java/org/traccar/forward/PositionForwarderMqtt.java b/src/main/java/org/traccar/forward/PositionForwarderMqtt.java
index a6d0965b8..a22c3bee6 100644
--- a/src/main/java/org/traccar/forward/PositionForwarderMqtt.java
+++ b/src/main/java/org/traccar/forward/PositionForwarderMqtt.java
@@ -17,14 +17,12 @@ package org.traccar.forward;
import org.traccar.config.Config;
import org.traccar.config.Keys;
-import org.traccar.helper.MqttUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
public class PositionForwarderMqtt implements PositionForwarder {
- private final Mqtt5AsyncClient client;
+ private final MqttClient mqttClient;
@Override
public void forward(final PositionData positionData, final ResultHandler resultHandler) {
@@ -37,7 +35,7 @@ public class PositionForwarderMqtt implements PositionForwarder {
public PositionForwarderMqtt(final Config config, final ObjectMapper objectMapper) {
this.topic = config.getString(Keys.FORWARD_TOPIC);
- this.client = MqttUtil.createClient(config.getString(Keys.FORWARD_URL));
+ mqttClient = new MqttClient(config.getString(Keys.FORWARD_URL));
this.objectMapper = objectMapper;
}
@@ -45,11 +43,10 @@ public class PositionForwarderMqtt implements PositionForwarder {
final String payload;
try {
payload = objectMapper.writeValueAsString(object);
+ mqttClient.publish(pubTopic, payload, (message, e) -> resultHandler.onResult(e == null, e));
} catch (JsonProcessingException e) {
resultHandler.onResult(false, e);
- return;
}
- MqttUtil.publish(client, pubTopic, payload, (message, e) -> resultHandler.onResult(e == null, e));
}
}