aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/traccar/protocol
diff options
context:
space:
mode:
authorAnton Tananaev <anton@traccar.org>2023-08-27 14:24:36 -0700
committerAnton Tananaev <anton@traccar.org>2023-08-27 14:24:36 -0700
commitf7ad63b6f898a38d97fe2e200c688d04c2b051ff (patch)
treee2aa2d19c488e4b4cfb3f13dc84b315677eab0a8 /src/main/java/org/traccar/protocol
parentffd116961741a81748f21d90d34fe63120928575 (diff)
downloadtrackermap-server-f7ad63b6f898a38d97fe2e200c688d04c2b051ff.tar.gz
trackermap-server-f7ad63b6f898a38d97fe2e200c688d04c2b051ff.tar.bz2
trackermap-server-f7ad63b6f898a38d97fe2e200c688d04c2b051ff.zip
Extract base MQTT decoder
Diffstat (limited to 'src/main/java/org/traccar/protocol')
-rw-r--r--src/main/java/org/traccar/protocol/IotmProtocolDecoder.java147
1 files changed, 45 insertions, 102 deletions
diff --git a/src/main/java/org/traccar/protocol/IotmProtocolDecoder.java b/src/main/java/org/traccar/protocol/IotmProtocolDecoder.java
index 7bbe6c8de..d9e6670c6 100644
--- a/src/main/java/org/traccar/protocol/IotmProtocolDecoder.java
+++ b/src/main/java/org/traccar/protocol/IotmProtocolDecoder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 - 2022 Anton Tananaev (anton@traccar.org)
+ * Copyright 2020 - 2023 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,27 +17,19 @@ package org.traccar.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.MqttConnectMessage;
-import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import org.traccar.BaseProtocolDecoder;
-import org.traccar.session.DeviceSession;
-import org.traccar.NetworkMessage;
+import org.traccar.BaseMqttProtocolDecoder;
import org.traccar.Protocol;
import org.traccar.helper.UnitsConverter;
import org.traccar.model.Position;
+import org.traccar.session.DeviceSession;
-import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
-public class IotmProtocolDecoder extends BaseProtocolDecoder {
+public class IotmProtocolDecoder extends BaseMqttProtocolDecoder {
public IotmProtocolDecoder(Protocol protocol) {
super(protocol);
@@ -236,121 +228,72 @@ public class IotmProtocolDecoder extends BaseProtocolDecoder {
@Override
protected Object decode(
- Channel channel, SocketAddress remoteAddress, Object msg) throws Exception {
-
- if (msg instanceof MqttConnectMessage) {
-
- MqttConnectMessage message = (MqttConnectMessage) msg;
+ DeviceSession deviceSession, MqttPublishMessage message) throws Exception {
- DeviceSession deviceSession = getDeviceSession(
- channel, remoteAddress, message.payload().clientIdentifier());
-
- MqttConnectReturnCode returnCode = deviceSession != null
- ? MqttConnectReturnCode.CONNECTION_ACCEPTED
- : MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
-
- MqttMessage response = MqttMessageBuilders.connAck().returnCode(returnCode).build();
-
- if (channel != null) {
- channel.writeAndFlush(new NetworkMessage(response, remoteAddress));
- }
+ List<Position> positions = new LinkedList<>();
- } else if (msg instanceof MqttSubscribeMessage) {
+ ByteBuf buf = message.payload();
- MqttSubscribeMessage message = (MqttSubscribeMessage) msg;
-
- MqttMessage response = MqttMessageBuilders.subAck()
- .packetId(message.variableHeader().messageId())
- .build();
-
- if (channel != null) {
- channel.writeAndFlush(new NetworkMessage(response, remoteAddress));
- }
-
- } else if (msg instanceof MqttPublishMessage) {
-
- DeviceSession deviceSession = getDeviceSession(channel, remoteAddress);
- if (deviceSession == null) {
- return null;
- }
+ buf.readUnsignedByte(); // structure version
- List<Position> positions = new LinkedList<>();
+ while (buf.readableBytes() > 1) {
+ int type = buf.readUnsignedByte();
+ int length = buf.readUnsignedShortLE();
+ ByteBuf record = buf.readSlice(length);
+ if (type == 1) {
- MqttPublishMessage message = (MqttPublishMessage) msg;
- ByteBuf buf = message.payload();
+ Position position = new Position(getProtocolName());
+ position.setDeviceId(deviceSession.getDeviceId());
+ position.setTime(new Date(record.readUnsignedIntLE() * 1000));
- buf.readUnsignedByte(); // structure version
+ while (record.readableBytes() > 0) {
+ int sensorType = record.readUnsignedByte();
+ int sensorId = record.readUnsignedShortLE();
+ if (sensorType == 14) {
- while (buf.readableBytes() > 1) {
- int type = buf.readUnsignedByte();
- int length = buf.readUnsignedShortLE();
- ByteBuf record = buf.readSlice(length);
- if (type == 1) {
+ position.setValid(true);
+ position.setLatitude(record.readFloatLE());
+ position.setLongitude(record.readFloatLE());
+ position.setSpeed(UnitsConverter.knotsFromKph(record.readUnsignedShortLE()));
- Position position = new Position(getProtocolName());
- position.setDeviceId(deviceSession.getDeviceId());
- position.setTime(new Date(record.readUnsignedIntLE() * 1000));
+ position.set(Position.KEY_HDOP, record.readUnsignedByte());
+ position.set(Position.KEY_SATELLITES, record.readUnsignedByte());
- while (record.readableBytes() > 0) {
- int sensorType = record.readUnsignedByte();
- int sensorId = record.readUnsignedShortLE();
- if (sensorType == 14) {
+ position.setCourse(record.readUnsignedShortLE());
+ position.setAltitude(record.readShortLE());
- position.setValid(true);
- position.setLatitude(record.readFloatLE());
- position.setLongitude(record.readFloatLE());
- position.setSpeed(UnitsConverter.knotsFromKph(record.readUnsignedShortLE()));
-
- position.set(Position.KEY_HDOP, record.readUnsignedByte());
- position.set(Position.KEY_SATELLITES, record.readUnsignedByte());
-
- position.setCourse(record.readUnsignedShortLE());
- position.setAltitude(record.readShortLE());
-
- } else {
-
- if (sensorType == 3) {
- continue;
- }
-
- decodeSensor(position, record, sensorType, sensorId);
+ } else {
+ if (sensorType == 3) {
+ continue;
}
- }
-
- positions.add(position);
- } else if (type == 3) {
+ decodeSensor(position, record, sensorType, sensorId);
- Position position = new Position(getProtocolName());
- position.setDeviceId(deviceSession.getDeviceId());
+ }
+ }
- getLastLocation(position, new Date(record.readUnsignedIntLE() * 1000));
+ positions.add(position);
- record.readUnsignedByte(); // function identifier
+ } else if (type == 3) {
- position.set(Position.KEY_EVENT, record.readUnsignedByte());
+ Position position = new Position(getProtocolName());
+ position.setDeviceId(deviceSession.getDeviceId());
- positions.add(position);
+ getLastLocation(position, new Date(record.readUnsignedIntLE() * 1000));
- }
- }
+ record.readUnsignedByte(); // function identifier
- buf.readUnsignedByte(); // checksum
+ position.set(Position.KEY_EVENT, record.readUnsignedByte());
- MqttMessage response = MqttMessageBuilders.pubAck()
- .packetId(message.variableHeader().packetId())
- .build();
+ positions.add(position);
- if (channel != null) {
- channel.writeAndFlush(new NetworkMessage(response, remoteAddress));
}
-
- return positions.isEmpty() ? null : positions;
-
}
- return null;
+ buf.readUnsignedByte(); // checksum
+
+ return positions.isEmpty() ? null : positions;
}
}