From 5193bd866734b6fea1bf96b5d975c52f09cbec2e Mon Sep 17 00:00:00 2001 From: namo Date: Fri, 13 Oct 2017 16:03:25 +0300 Subject: initial flespi integtation: pulling messages from flespi channels and updating devices' position --- src/org/traccar/Context.java | 21 +++++ src/org/traccar/Main.java | 6 ++ src/org/traccar/flespi/ChannelPullTask.java | 15 ++++ src/org/traccar/flespi/FlespiClient.java | 128 ++++++++++++++++++++++++++++ 4 files changed, 170 insertions(+) create mode 100644 src/org/traccar/flespi/ChannelPullTask.java create mode 100644 src/org/traccar/flespi/FlespiClient.java (limited to 'src') diff --git a/src/org/traccar/Context.java b/src/org/traccar/Context.java index 3b24c6460..e582c5256 100644 --- a/src/org/traccar/Context.java +++ b/src/org/traccar/Context.java @@ -21,6 +21,9 @@ import com.ning.http.client.AsyncHttpClient; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Properties; import org.apache.velocity.app.VelocityEngine; @@ -43,6 +46,7 @@ import org.traccar.database.StatisticsManager; import org.traccar.database.UsersManager; import org.traccar.events.MotionEventHandler; import org.traccar.events.OverspeedEventHandler; +import org.traccar.flespi.FlespiClient; import org.traccar.geocoder.BingMapsGeocoder; import org.traccar.geocoder.FactualGeocoder; import org.traccar.geocoder.GeocodeFarmGeocoder; @@ -263,6 +267,12 @@ public final class Context { config.getDouble("event.motion.speedThreshold", 0.01)); } + private final static List flespiClients = new ArrayList<>(); + + public static List getFlespiClients() { + return flespiClients; + } + public static void init(String[] arguments) throws Exception { config = new Config(); @@ -411,6 +421,17 @@ public final class Context { smppClient = new SmppClient(); } + if (config.getBoolean("flespi.enable")) { + String uri = config.getString("flespi.url"); + String token = config.getString("flespi.token"); + String channelIds = config.getString("flespi.channel.ids"); + List ids = Arrays.asList(channelIds.split(",")); + if (uri != null && token != null) { + for (String channelId : ids) { + flespiClients.add(new FlespiClient(uri, token, channelId)); + } + } + } } public static void init(IdentityManager testIdentityManager) { diff --git a/src/org/traccar/Main.java b/src/org/traccar/Main.java index 1e2db2693..0d7c2ecf0 100644 --- a/src/org/traccar/Main.java +++ b/src/org/traccar/Main.java @@ -15,9 +15,11 @@ */ package org.traccar; +import org.traccar.flespi.FlespiClient; import org.traccar.helper.Log; import java.sql.SQLException; +import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.Locale; @@ -56,6 +58,10 @@ public final class Main { public void run() { Log.info("Shutting down server..."); + List flespiClients = Context.getFlespiClients(); + for (FlespiClient flespiClient : flespiClients) { + flespiClient.stopPullTask(); + } if (Context.getWebServer() != null) { Context.getWebServer().stop(); } diff --git a/src/org/traccar/flespi/ChannelPullTask.java b/src/org/traccar/flespi/ChannelPullTask.java new file mode 100644 index 000000000..5e5525ec9 --- /dev/null +++ b/src/org/traccar/flespi/ChannelPullTask.java @@ -0,0 +1,15 @@ +package org.traccar.flespi; + +public class ChannelPullTask implements Runnable { + + private final FlespiClient flespiClient; + + protected ChannelPullTask(FlespiClient flespiClient) { + this.flespiClient = flespiClient; + } + + @Override + public void run() { + flespiClient.channelPull(); + } +} diff --git a/src/org/traccar/flespi/FlespiClient.java b/src/org/traccar/flespi/FlespiClient.java new file mode 100644 index 000000000..2c98f36b3 --- /dev/null +++ b/src/org/traccar/flespi/FlespiClient.java @@ -0,0 +1,128 @@ +package org.traccar.flespi; + + +import com.ning.http.client.AsyncCompletionHandler; +import com.ning.http.client.Response; +import org.traccar.Context; +import org.traccar.helper.Log; +import org.traccar.model.Device; +import org.traccar.model.Position; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonNumber; +import javax.json.JsonReader; +import javax.json.JsonArray; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class FlespiClient { + private final String url; + private final String token; + private final String channel_id; + private final ScheduledExecutorService pullChannelExecutor = Executors.newScheduledThreadPool(5); + private ScheduledFuture pullTask; + private Integer pullDelay = 5; + private int nextKey; + + public FlespiClient(String url, String token, String channelId) { + this.channel_id = channelId; + this.url = url + "/gw/channels/" + channelId + "/messages?data={\"limit_count\":1000000," + + "\"limit_size\":100000000,\"delete\":true,\"timeout\":25,\"curr_key\":%d}"; + this.token = "FlespiToken " + token; + + schedulePull(); + } + + private void schedulePull() { + pullTask = pullChannelExecutor.scheduleAtFixedRate(new ChannelPullTask(this), 1, pullDelay, TimeUnit.SECONDS); + } + + public void stopPullTask() { + if (pullTask != null) { + pullTask.cancel(false); + } + } + + protected synchronized void channelPull() { + Context.getAsyncHttpClient().prepareGet(String.format(this.url, nextKey)) + .addHeader("Authorization", this.token) + .execute(new AsyncCompletionHandler() { + @Override + public Object onCompleted(Response response) throws Exception { + try (JsonReader reader = Json.createReader(response.getResponseBodyAsStream())) { + JsonObject object = reader.readObject(); + JsonArray result = object.getJsonArray("result"); + nextKey = object.getInt("next_key", nextKey); + Log.debug(String.format("channelPull next_key=%d msgs_count=%d", nextKey, result.size())); + for (int i = 0; i < result.size(); i++) { + Position position = decodePosition(result.getJsonObject(i)); + if (position != null && position.getLatitude() != 0 && position.getLongitude() != 0) { + Context.getConnectionManager().updateDevice(position.getDeviceId(), + Device.STATUS_ONLINE, new Date()); + Context.getDeviceManager().updateLatestPosition(position); + } + } + JsonArray errors = object.getJsonArray("errors"); + if (errors != null) { + for (int i = 0; i < errors.size(); i++) { + JsonObject error = errors.getJsonObject(i); + Log.warning("Error in flespi channel: " + error.toString()); + } + if (result == null || result.size() == 0) { + stopPullTask(); + } + } + } + return null; + } + + @Override + public void onThrowable(Throwable t) { + t.printStackTrace(); + } + }); + } + + private Position decodePosition(JsonObject msg) { + Device device = null; + try { + device = Context.getIdentityManager().getByUniqueId(msg.getString("ident")); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + if (device == null) { + return null; + } + Position position = new Position(); + position.setDeviceId(device.getId()); + + + position.setProtocol("flespi"); + + position.setTime(new Date((long) msg.getJsonNumber("timestamp").doubleValue() * 1000)); + JsonNumber lat = msg.getJsonNumber("position.latitude"); + JsonNumber lon = msg.getJsonNumber("position.longitude"); + position.setLatitude((lat != null && lon != null) ? lat.doubleValue() : 0); + position.setLongitude((lat != null && lon != null) ? lon.doubleValue() : 0); + + JsonNumber speed = msg.getJsonNumber("position.speed"); + position.setSpeed(speed != null ? speed.doubleValue() : 0); + + JsonNumber course = msg.getJsonNumber("position.direction"); + position.setCourse(course != null ? course.doubleValue() : 0); + + JsonNumber altitude = msg.getJsonNumber("position.altitude"); + position.setAltitude(altitude != null ? altitude.doubleValue() : 0); + + int satellites = msg.getInt("position.satellites", 0); + position.setValid(lat != null && lon != null && satellites >= 3); + position.set(Position.KEY_SATELLITES, satellites); + + return position; + } +} -- cgit v1.2.3