From fb82a8167b99f706cb25d828bf1d72424ba98a75 Mon Sep 17 00:00:00 2001 From: namo Date: Thu, 19 Oct 2017 18:00:30 +0300 Subject: flespi integration: listening messages --- src/org/traccar/BaseProtocolDecoder.java | 9 ++ src/org/traccar/ExtendedObjectDecoder.java | 13 +++ src/org/traccar/protocol/FlespiProtocol.java | 49 +++++++++ .../traccar/protocol/FlespiProtocolDecoder.java | 121 +++++++++++++++++++++ 4 files changed, 192 insertions(+) create mode 100644 src/org/traccar/protocol/FlespiProtocol.java create mode 100644 src/org/traccar/protocol/FlespiProtocolDecoder.java (limited to 'src') diff --git a/src/org/traccar/BaseProtocolDecoder.java b/src/org/traccar/BaseProtocolDecoder.java index 2d6286bf8..b0fb72225 100644 --- a/src/org/traccar/BaseProtocolDecoder.java +++ b/src/org/traccar/BaseProtocolDecoder.java @@ -29,6 +29,8 @@ import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.Iterator; import java.sql.SQLException; public abstract class BaseProtocolDecoder extends ExtendedObjectDecoder { @@ -205,6 +207,13 @@ public abstract class BaseProtocolDecoder extends ExtendedObjectDecoder { if (!positions.isEmpty()) { position = (Position) positions.iterator().next(); } + } else if (decodedMessage instanceof Map) { + Set deviceIds = ((HashMap) decodedMessage).keySet(); + Iterator deviceIdsIterator = deviceIds.iterator(); + while (deviceIdsIterator.hasNext()) { + Context.getConnectionManager().updateDevice( + deviceIdsIterator.next(), Device.STATUS_ONLINE, new Date()); + } } } if (position != null) { diff --git a/src/org/traccar/ExtendedObjectDecoder.java b/src/org/traccar/ExtendedObjectDecoder.java index 268e6f688..a9b3a1053 100644 --- a/src/org/traccar/ExtendedObjectDecoder.java +++ b/src/org/traccar/ExtendedObjectDecoder.java @@ -29,6 +29,9 @@ import javax.xml.bind.DatatypeConverter; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; public abstract class ExtendedObjectDecoder implements ChannelUpstreamHandler { @@ -69,6 +72,16 @@ public abstract class ExtendedObjectDecoder implements ChannelUpstreamHandler { saveOriginal(o, originalMessage); Channels.fireMessageReceived(ctx, o, e.getRemoteAddress()); } + } else if (decodedMessage instanceof Map) { + Iterator it = ((Map) decodedMessage).entrySet().iterator(); + while (it.hasNext()) { + Map.Entry pair = (Map.Entry) it.next(); + List positions = (List) pair.getValue(); + for (Position position : positions) { + saveOriginal(position, originalMessage); + Channels.fireMessageReceived(ctx, position, e.getRemoteAddress()); + } + } } else { saveOriginal(decodedMessage, originalMessage); Channels.fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); diff --git a/src/org/traccar/protocol/FlespiProtocol.java b/src/org/traccar/protocol/FlespiProtocol.java new file mode 100644 index 000000000..1164dac2d --- /dev/null +++ b/src/org/traccar/protocol/FlespiProtocol.java @@ -0,0 +1,49 @@ +/* + * Copyright 2015 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.protocol; + +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.handler.codec.http.HttpChunkAggregator; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; +import org.jboss.netty.handler.codec.http.HttpResponseEncoder; +import org.traccar.BaseProtocol; +import org.traccar.TrackerServer; +import org.traccar.model.Command; + +import java.util.List; + +public class FlespiProtocol extends BaseProtocol { + + public FlespiProtocol() { + super("flespi"); + setSupportedDataCommands( + Command.TYPE_CUSTOM); + } + + @Override + public void initTrackerServers(List serverList) { + serverList.add(new TrackerServer(new ServerBootstrap(), getName()) { + @Override + protected void addSpecificHandlers(ChannelPipeline pipeline) { + pipeline.addLast("httpEncoder", new HttpResponseEncoder()); + pipeline.addLast("httpDecoder", new HttpRequestDecoder()); + pipeline.addLast("httpAggregator", new HttpChunkAggregator(Integer.MAX_VALUE)); + pipeline.addLast("objectDecoder", new FlespiProtocolDecoder(FlespiProtocol.this)); + } + }); + } +} diff --git a/src/org/traccar/protocol/FlespiProtocolDecoder.java b/src/org/traccar/protocol/FlespiProtocolDecoder.java new file mode 100644 index 000000000..6d5d68382 --- /dev/null +++ b/src/org/traccar/protocol/FlespiProtocolDecoder.java @@ -0,0 +1,121 @@ +/* + * Copyright 2013 - 2017 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.protocol; + +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.util.CharsetUtil; +import org.traccar.BaseProtocolDecoder; +import org.traccar.DeviceSession; +import org.traccar.helper.Log; +import org.traccar.model.Position; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonNumber; +import javax.json.JsonObject; +import java.io.StringReader; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.Date; + +public class FlespiProtocolDecoder extends BaseProtocolDecoder { + + public FlespiProtocolDecoder(FlespiProtocol protocol) { + super(protocol); + } + + @Override + protected Object decode( + Channel channel, SocketAddress remoteAddress, Object msg) throws Exception { + + HttpRequest request = (HttpRequest) msg; + JsonArray result = Json.createReader(new StringReader(request.getContent().toString(StandardCharsets.UTF_8))) + .readArray(); + Map unitsPositions = new HashMap<>(); + Log.debug(String.format("messages received msgs_count=%d", result.size())); + for (int i = 0; i < result.size(); i++) { + JsonObject message = result.getJsonObject(i); + String ident = message.getString("ident"); + if (ident == null) { + continue; + } + DeviceSession deviceSession = getDeviceSession(channel, remoteAddress, ident); + if (deviceSession == null) { + continue; + } + if (unitsPositions.get(deviceSession.getDeviceId()) == null) { + unitsPositions.put(deviceSession.getDeviceId(), new LinkedList()); + } + Position position = new Position(); + position.setDeviceId(deviceSession.getDeviceId()); + decodePosition(message, position); + unitsPositions.get(deviceSession.getDeviceId()).add(position); + } + + sendResponse(channel, HttpResponseStatus.OK); + return unitsPositions; + } + + private void sendResponse(Channel channel, HttpResponseStatus status) { + if (channel != null) { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status); + response.headers().add(HttpHeaders.Names.CONTENT_LENGTH, 10); + response.setContent(ChannelBuffers.copiedBuffer("Hello namo", CharsetUtil.US_ASCII)); + channel.write(response); + } + } + + private void decodePosition(JsonObject msg, Position position) { + position.setProtocol("flespi"); + + position.setTime(new Date((long) msg.getJsonNumber("timestamp").doubleValue() * 1000)); + JsonNumber lat = msg.getJsonNumber("position.latitude"); + JsonNumber lon = msg.getJsonNumber("position.longitude"); + position.setLatitude((lat != null && lon != null) ? lat.doubleValue() : 0); + position.setLongitude((lat != null && lon != null) ? lon.doubleValue() : 0); + + JsonNumber speed = msg.getJsonNumber("position.speed"); + position.setSpeed(speed != null ? speed.doubleValue() : 0); + if (position.getSpeed() == 111) { + position.set(Position.KEY_ALARM, Position.ALARM_GENERAL); + } + + JsonNumber course = msg.getJsonNumber("position.direction"); + position.setCourse(course != null ? course.doubleValue() : 0); + + JsonNumber altitude = msg.getJsonNumber("position.altitude"); + position.setAltitude(altitude != null ? altitude.doubleValue() : 0); + + int satellites = msg.getInt("position.satellites", 0); + position.setValid(position.getLatitude() != 0 && position.getLongitude() != 0 && satellites >= 3); + position.set(Position.KEY_SATELLITES, satellites); + + if (msg.getBoolean("alarm.event.trigger", false)) { + position.set(Position.KEY_ALARM, Position.ALARM_GENERAL); + } + } +} -- cgit v1.2.3