diff options
author | Anton Tananaev <anton.tananaev@gmail.com> | 2015-04-25 20:51:38 +1200 |
---|---|---|
committer | Anton Tananaev <anton.tananaev@gmail.com> | 2015-04-25 20:51:38 +1200 |
commit | b10cebd692b610a13ea27b6150a79726de59f40e (patch) | |
tree | 31919802ddae807fa4887446addc9eb98d5fbe50 /src | |
parent | 67dc1c43932bffaf9df2c9ec201cff879d0226d9 (diff) | |
download | traccar-server-b10cebd692b610a13ea27b6150a79726de59f40e.tar.gz traccar-server-b10cebd692b610a13ea27b6150a79726de59f40e.tar.bz2 traccar-server-b10cebd692b610a13ea27b6150a79726de59f40e.zip |
Implement async position update
Diffstat (limited to 'src')
-rw-r--r-- | src/org/traccar/database/DataCache.java | 21 | ||||
-rw-r--r-- | src/org/traccar/database/DataManager.java | 25 | ||||
-rw-r--r-- | src/org/traccar/database/ObjectConverter.java | 54 | ||||
-rw-r--r-- | src/org/traccar/http/MainServlet.java | 177 |
4 files changed, 260 insertions, 17 deletions
diff --git a/src/org/traccar/database/DataCache.java b/src/org/traccar/database/DataCache.java index f5aa3b1e9..f28f1b838 100644 --- a/src/org/traccar/database/DataCache.java +++ b/src/org/traccar/database/DataCache.java @@ -18,6 +18,8 @@ package org.traccar.database; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import org.traccar.model.Position; @@ -31,7 +33,7 @@ public class DataCache { // TODO: load latest data from datavase } - public void update(Position position) { + public synchronized void update(Position position) { long device = position.getDeviceId(); positions.put(device, position); if (listeners.containsKey(device)) { @@ -41,6 +43,19 @@ public class DataCache { } } + public synchronized Collection<Position> getInitialState(Collection<Long> devices) { + + List<Position> result = new LinkedList<Position>(); + + for (long device : devices) { + if (positions.containsKey(device)) { + result.add(positions.get(device)); + } + } + + return result; + } + public static interface DataCacheListener { public void onUpdate(Position position); } @@ -51,7 +66,7 @@ public class DataCache { } } - public void addListener(long device, DataCacheListener listener) { + public synchronized void addListener(long device, DataCacheListener listener) { if (!listeners.containsKey(device)) { listeners.put(device, new HashSet<DataCacheListener>()); } @@ -64,7 +79,7 @@ public class DataCache { } } - public void removeListener(long device, DataCacheListener listener) { + public synchronized void removeListener(long device, DataCacheListener listener) { if (!listeners.containsKey(device)) { listeners.put(device, new HashSet<DataCacheListener>()); } diff --git a/src/org/traccar/database/DataManager.java b/src/org/traccar/database/DataManager.java index 2f029e74c..faded815b 100644 --- a/src/org/traccar/database/DataManager.java +++ b/src/org/traccar/database/DataManager.java @@ -362,6 +362,31 @@ public class DataManager { } } + public List<Long> getDeviceList(long userId) throws SQLException { + + Connection connection = dataSource.getConnection(); + try { + PreparedStatement statement = connection.prepareStatement( + "SELECT id FROM device WHERE id IN (" + + "SELECT device_id FROM user_device WHERE user_id = ?);"); + try { + statement.setLong(1, userId); + + ResultSet resultSet = statement.executeQuery(); + + List<Long> result = new LinkedList<Long>(); + while (resultSet.next()) { + result.add(resultSet.getLong(1)); + } + return result; + } finally { + statement.close(); + } + } finally { + connection.close(); + } + } + public JsonArray getDevices(long userId) throws SQLException { Connection connection = dataSource.getConnection(); diff --git a/src/org/traccar/database/ObjectConverter.java b/src/org/traccar/database/ObjectConverter.java new file mode 100644 index 000000000..072ab7a7d --- /dev/null +++ b/src/org/traccar/database/ObjectConverter.java @@ -0,0 +1,54 @@ +/* + * Copyright 2015 Anton Tananaev (anton.tananaev@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.database; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Collection; +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonObjectBuilder; +import org.traccar.model.Position; + +public class ObjectConverter { + + private static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssZ"); + + public static JsonArray convert(Collection<Position> positions) { + + JsonArrayBuilder array = Json.createArrayBuilder(); + + for (Position position : positions) { + JsonObjectBuilder object = Json.createObjectBuilder(); + + //object.add("id", position.getId()); + object.add("time", dateFormat.format(position.getTime())); + object.add("valid", position.getValid()); + object.add("latitude", position.getLatitude()); + object.add("longitude", position.getLongitude()); + object.add("altitude", position.getAltitude()); + object.add("speed", position.getSpeed()); + object.add("course", position.getCourse()); + //object.add("other", position.getExtendedInfo()); + + array.add(object.build()); + } + + return array.build(); + } + +} diff --git a/src/org/traccar/http/MainServlet.java b/src/org/traccar/http/MainServlet.java index 7483b22b4..655b47312 100644 --- a/src/org/traccar/http/MainServlet.java +++ b/src/org/traccar/http/MainServlet.java @@ -17,6 +17,11 @@ package org.traccar.http; import java.io.IOException; import java.sql.SQLException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.json.Json; import javax.json.JsonObjectBuilder; import javax.servlet.AsyncContext; @@ -25,13 +30,21 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; import org.traccar.Context; +import org.traccar.GlobalTimer; +import org.traccar.database.DataCache; +import org.traccar.database.ObjectConverter; import org.traccar.helper.Log; +import org.traccar.model.Position; public class MainServlet extends HttpServlet { - private static final String USER_ID = "userId"; + private static final long ASYNC_TIMEOUT = 120000; + private static final String USER_ID = "userId"; + @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { @@ -50,25 +63,161 @@ public class MainServlet extends HttpServlet { } } - private void async(final AsyncContext context) { - context.start(new Runnable() { + public class AsyncSession { + + private static final boolean DEBUG_ASYNC = true; + + private static final long SESSION_TIMEOUT = 30; + private static final long REQUEST_TIMEOUT = 30; + + private boolean destroyed; + private final long userId; + private final List<Long> devices; + private Timeout sessionTimeout; + private Timeout requestTimeout; + private final Map<Long, Position> positions = new HashMap<Long, Position>(); + private AsyncContext activeContext; + + private void logEvent(String message) { + if (DEBUG_ASYNC) { + Log.debug("AsyncSession: " + this.hashCode() + " destroyed: " + destroyed + " " + message); + } + } + + public AsyncSession(long userId, List<Long> devices) { + logEvent("create userId: " + userId + " devices: " + devices.size()); + this.userId = userId; + this.devices = devices; + } + + @Override + protected void finalize() throws Throwable { + logEvent("finalize"); + } + + private final DataCache.DataCacheListener dataListener = new DataCache.DataCacheListener() { + @Override + public void onUpdate(Position position) { + synchronized (AsyncSession.this) { + logEvent("onUpdate deviceId: " + position.getDeviceId()); + if (!destroyed) { + if (requestTimeout != null) { + requestTimeout.cancel(); + requestTimeout = null; + } + positions.put(position.getDeviceId(), position); + if (activeContext != null) { + response(); + } + } + } + } + }; + + private final TimerTask sessionTimer = new TimerTask() { + @Override + public void run(Timeout tmt) throws Exception { + synchronized (AsyncSession.this) { + logEvent("sessionTimeout"); + Context.getDataCache().removeListener(devices, dataListener); + synchronized (asyncSessions) { + asyncSessions.remove(userId); + } + destroyed = true; + } + } + }; + + private final TimerTask requestTimer = new TimerTask() { @Override - public void run() { + public void run(Timeout tmt) throws Exception { + synchronized (AsyncSession.this) { + logEvent("requestTimeout"); + if (!destroyed) { + if (activeContext != null) { + response(); + } + } + } + } + }; + + public synchronized void init() { + logEvent("init"); + Collection<Position> initialPositions = Context.getDataCache().getInitialState(devices); + for (Position position : initialPositions) { + positions.put(position.getDeviceId(), position); + } + + Context.getDataCache().addListener(devices, dataListener); + } + + public synchronized void request(AsyncContext context) { + logEvent("request context: " + context.hashCode()); + if (!destroyed) { + activeContext = context; + if (sessionTimeout != null) { + sessionTimeout.cancel(); + sessionTimeout = null; + } + + if (!positions.isEmpty()) { + response(); + } else { + requestTimeout = GlobalTimer.getTimer().newTimeout( + requestTimer, REQUEST_TIMEOUT, TimeUnit.SECONDS); + } + } + } + + private synchronized void response() { + logEvent("response context: " + activeContext.hashCode()); + if (!destroyed) { + ServletResponse response = activeContext.getResponse(); + + JsonObjectBuilder result = Json.createObjectBuilder(); + result.add("success", true); + result.add("data", ObjectConverter.convert(positions.values())); + positions.clear(); + try { - Thread.sleep(5000); // DELME + response.getWriter().println(result.build().toString()); + } catch (IOException error) { + Log.warning(error); + } - ServletResponse response = context.getResponse(); - response.getWriter().println("{ success: true, data: ["+ - "{ id: 1, device_id: 1, time: \"2012-04-23T18:25:43.511Z\", latitude: 60, longitude: 30, speed: 0, course: 0 }," + - "{ id: 2, device_id: 2, time: \"2012-04-23T19:25:43.511Z\", latitude: 61, longitude: 31, speed: 0, course: 0 }" + - "] }"); - context.complete(); + activeContext.complete(); + activeContext = null; - } catch (Exception ex) { - Log.warning(ex); + sessionTimeout = GlobalTimer.getTimer().newTimeout( + sessionTimer, SESSION_TIMEOUT, TimeUnit.SECONDS); + } + } + + } + + private final Map<Long, AsyncSession> asyncSessions = new HashMap<Long, AsyncSession>(); + + private void async(final AsyncContext context) { + + context.setTimeout(60000); + HttpServletRequest req = (HttpServletRequest) context.getRequest(); + long userId = (Long) req.getSession().getAttribute(USER_ID); + + synchronized (asyncSessions) { + + if (!asyncSessions.containsKey(userId)) { + try { + List<Long> devices = Context.getDataManager().getDeviceList(userId); + asyncSessions.put(userId, new AsyncSession(userId, devices)); + } catch (SQLException error) { + Log.warning(error); } + asyncSessions.get(userId).init(); } - }); + + asyncSessions.get(userId).request(context); + } } private void device(HttpServletRequest req, HttpServletResponse resp) throws IOException { |