diff options
-rw-r--r-- | src/main/java/org/traccar/forward/EventForwarderMqtt.java | 10 | ||||
-rw-r--r-- | src/main/java/org/traccar/forward/MqttClient.java (renamed from src/main/java/org/traccar/helper/MqttUtil.java) | 27 | ||||
-rw-r--r-- | src/main/java/org/traccar/forward/PositionForwarderMqtt.java | 9 |
3 files changed, 17 insertions, 29 deletions
diff --git a/src/main/java/org/traccar/forward/EventForwarderMqtt.java b/src/main/java/org/traccar/forward/EventForwarderMqtt.java index 83df795b1..7d1c7dd3c 100644 --- a/src/main/java/org/traccar/forward/EventForwarderMqtt.java +++ b/src/main/java/org/traccar/forward/EventForwarderMqtt.java @@ -17,22 +17,20 @@ package org.traccar.forward; import org.traccar.config.Config; import org.traccar.config.Keys; -import org.traccar.helper.MqttUtil; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; public class EventForwarderMqtt implements EventForwarder { - private final Mqtt5AsyncClient client; + private final MqttClient mqttClient; private final ObjectMapper objectMapper; private final String topic; public EventForwarderMqtt(Config config, ObjectMapper objectMapper) { this.topic = config.getString(Keys.EVENT_FORWARD_TOPIC); - client = MqttUtil.createClient(config.getString(Keys.EVENT_FORWARD_URL)); + mqttClient = new MqttClient(config.getString(Keys.FORWARD_URL)); this.objectMapper = objectMapper; } @@ -41,12 +39,10 @@ public class EventForwarderMqtt implements EventForwarder { String payload; try { payload = objectMapper.writeValueAsString(eventData); + mqttClient.publish(topic, payload, (message, e) -> resultHandler.onResult(e == null, e)); } catch (JsonProcessingException e) { resultHandler.onResult(false, e); - return; } - - MqttUtil.publish(client, topic, payload, (message, e) -> resultHandler.onResult(e == null, e)); } } diff --git a/src/main/java/org/traccar/helper/MqttUtil.java b/src/main/java/org/traccar/forward/MqttClient.java index 9fc16012f..9059fc876 100644 --- a/src/main/java/org/traccar/helper/MqttUtil.java +++ b/src/main/java/org/traccar/forward/MqttClient.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.traccar.helper; +package org.traccar.forward; import java.net.URI; import java.net.URISyntaxException; @@ -27,13 +27,9 @@ import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder; import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -public final class MqttUtil { - - private MqttUtil() { - - } - - public static Mqtt5AsyncClient createClient(final String url) { +public class MqttClient { + private final Mqtt5AsyncClient client; + MqttClient(String url) { URI uri; try { uri = new URI(url); @@ -41,22 +37,21 @@ public final class MqttUtil { throw new RuntimeException(e); } - Mqtt5SimpleAuth simpleAuth = getSimpleAuth(uri); + Mqtt5SimpleAuth simpleAuth = this.getSimpleAuth(uri); String host = uri.getHost(); int port = uri.getPort(); Mqtt5ClientBuilder builder = Mqtt5Client.builder().identifier("traccar-" + UUID.randomUUID()) .serverHost(host).serverPort(port).simpleAuth(simpleAuth).automaticReconnectWithDefaultConfig(); - Mqtt5AsyncClient client; + client = builder.buildAsync(); client.connectWith().send().whenComplete((message, e) -> { throw new RuntimeException(e); }); - - return client; } - private static Mqtt5SimpleAuth getSimpleAuth(final URI uri) { + + private Mqtt5SimpleAuth getSimpleAuth(final URI uri) { String userInfo = uri.getUserInfo(); Mqtt5SimpleAuth simpleAuth = null; if (userInfo != null) { @@ -71,9 +66,9 @@ public final class MqttUtil { return simpleAuth; } - public static void publish(final Mqtt5AsyncClient client, final String pubTopic, final String payload, - final BiConsumer<? super Mqtt5PublishResult, - ? super Throwable> whenComplete) { + public void publish(final String pubTopic, final String payload, + final BiConsumer<? super Mqtt5PublishResult, + ? super Throwable> whenComplete) { client.publishWith().topic(pubTopic).qos(MqttQos.AT_LEAST_ONCE).payload(payload.getBytes()).send() .whenComplete(whenComplete); } diff --git a/src/main/java/org/traccar/forward/PositionForwarderMqtt.java b/src/main/java/org/traccar/forward/PositionForwarderMqtt.java index a6d0965b8..a22c3bee6 100644 --- a/src/main/java/org/traccar/forward/PositionForwarderMqtt.java +++ b/src/main/java/org/traccar/forward/PositionForwarderMqtt.java @@ -17,14 +17,12 @@ package org.traccar.forward; import org.traccar.config.Config; import org.traccar.config.Keys; -import org.traccar.helper.MqttUtil; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; public class PositionForwarderMqtt implements PositionForwarder { - private final Mqtt5AsyncClient client; + private final MqttClient mqttClient; @Override public void forward(final PositionData positionData, final ResultHandler resultHandler) { @@ -37,7 +35,7 @@ public class PositionForwarderMqtt implements PositionForwarder { public PositionForwarderMqtt(final Config config, final ObjectMapper objectMapper) { this.topic = config.getString(Keys.FORWARD_TOPIC); - this.client = MqttUtil.createClient(config.getString(Keys.FORWARD_URL)); + mqttClient = new MqttClient(config.getString(Keys.FORWARD_URL)); this.objectMapper = objectMapper; } @@ -45,11 +43,10 @@ public class PositionForwarderMqtt implements PositionForwarder { final String payload; try { payload = objectMapper.writeValueAsString(object); + mqttClient.publish(pubTopic, payload, (message, e) -> resultHandler.onResult(e == null, e)); } catch (JsonProcessingException e) { resultHandler.onResult(false, e); - return; } - MqttUtil.publish(client, pubTopic, payload, (message, e) -> resultHandler.onResult(e == null, e)); } } |