From b10cebd692b610a13ea27b6150a79726de59f40e Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Sat, 25 Apr 2015 20:51:38 +1200 Subject: Implement async position update --- src/org/traccar/http/MainServlet.java | 177 +++++++++++++++++++++++++++++++--- 1 file changed, 163 insertions(+), 14 deletions(-) (limited to 'src/org/traccar/http') diff --git a/src/org/traccar/http/MainServlet.java b/src/org/traccar/http/MainServlet.java index 7483b22b4..655b47312 100644 --- a/src/org/traccar/http/MainServlet.java +++ b/src/org/traccar/http/MainServlet.java @@ -17,6 +17,11 @@ package org.traccar.http; import java.io.IOException; import java.sql.SQLException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.json.Json; import javax.json.JsonObjectBuilder; import javax.servlet.AsyncContext; @@ -25,13 +30,21 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; import org.traccar.Context; +import org.traccar.GlobalTimer; +import org.traccar.database.DataCache; +import org.traccar.database.ObjectConverter; import org.traccar.helper.Log; +import org.traccar.model.Position; public class MainServlet extends HttpServlet { - private static final String USER_ID = "userId"; + private static final long ASYNC_TIMEOUT = 120000; + private static final String USER_ID = "userId"; + @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { @@ -50,25 +63,161 @@ public class MainServlet extends HttpServlet { } } - private void async(final AsyncContext context) { - context.start(new Runnable() { + public class AsyncSession { + + private static final boolean DEBUG_ASYNC = true; + + private static final long SESSION_TIMEOUT = 30; + private static final long REQUEST_TIMEOUT = 30; + + private boolean destroyed; + private final long userId; + private final List devices; + private Timeout sessionTimeout; + private Timeout requestTimeout; + private final Map positions = new HashMap(); + private AsyncContext activeContext; + + private void logEvent(String message) { + if (DEBUG_ASYNC) { + Log.debug("AsyncSession: " + this.hashCode() + " destroyed: " + destroyed + " " + message); + } + } + + public AsyncSession(long userId, List devices) { + logEvent("create userId: " + userId + " devices: " + devices.size()); + this.userId = userId; + this.devices = devices; + } + + @Override + protected void finalize() throws Throwable { + logEvent("finalize"); + } + + private final DataCache.DataCacheListener dataListener = new DataCache.DataCacheListener() { + @Override + public void onUpdate(Position position) { + synchronized (AsyncSession.this) { + logEvent("onUpdate deviceId: " + position.getDeviceId()); + if (!destroyed) { + if (requestTimeout != null) { + requestTimeout.cancel(); + requestTimeout = null; + } + positions.put(position.getDeviceId(), position); + if (activeContext != null) { + response(); + } + } + } + } + }; + + private final TimerTask sessionTimer = new TimerTask() { + @Override + public void run(Timeout tmt) throws Exception { + synchronized (AsyncSession.this) { + logEvent("sessionTimeout"); + Context.getDataCache().removeListener(devices, dataListener); + synchronized (asyncSessions) { + asyncSessions.remove(userId); + } + destroyed = true; + } + } + }; + + private final TimerTask requestTimer = new TimerTask() { @Override - public void run() { + public void run(Timeout tmt) throws Exception { + synchronized (AsyncSession.this) { + logEvent("requestTimeout"); + if (!destroyed) { + if (activeContext != null) { + response(); + } + } + } + } + }; + + public synchronized void init() { + logEvent("init"); + Collection initialPositions = Context.getDataCache().getInitialState(devices); + for (Position position : initialPositions) { + positions.put(position.getDeviceId(), position); + } + + Context.getDataCache().addListener(devices, dataListener); + } + + public synchronized void request(AsyncContext context) { + logEvent("request context: " + context.hashCode()); + if (!destroyed) { + activeContext = context; + if (sessionTimeout != null) { + sessionTimeout.cancel(); + sessionTimeout = null; + } + + if (!positions.isEmpty()) { + response(); + } else { + requestTimeout = GlobalTimer.getTimer().newTimeout( + requestTimer, REQUEST_TIMEOUT, TimeUnit.SECONDS); + } + } + } + + private synchronized void response() { + logEvent("response context: " + activeContext.hashCode()); + if (!destroyed) { + ServletResponse response = activeContext.getResponse(); + + JsonObjectBuilder result = Json.createObjectBuilder(); + result.add("success", true); + result.add("data", ObjectConverter.convert(positions.values())); + positions.clear(); + try { - Thread.sleep(5000); // DELME + response.getWriter().println(result.build().toString()); + } catch (IOException error) { + Log.warning(error); + } - ServletResponse response = context.getResponse(); - response.getWriter().println("{ success: true, data: ["+ - "{ id: 1, device_id: 1, time: \"2012-04-23T18:25:43.511Z\", latitude: 60, longitude: 30, speed: 0, course: 0 }," + - "{ id: 2, device_id: 2, time: \"2012-04-23T19:25:43.511Z\", latitude: 61, longitude: 31, speed: 0, course: 0 }" + - "] }"); - context.complete(); + activeContext.complete(); + activeContext = null; - } catch (Exception ex) { - Log.warning(ex); + sessionTimeout = GlobalTimer.getTimer().newTimeout( + sessionTimer, SESSION_TIMEOUT, TimeUnit.SECONDS); + } + } + + } + + private final Map asyncSessions = new HashMap(); + + private void async(final AsyncContext context) { + + context.setTimeout(60000); + HttpServletRequest req = (HttpServletRequest) context.getRequest(); + long userId = (Long) req.getSession().getAttribute(USER_ID); + + synchronized (asyncSessions) { + + if (!asyncSessions.containsKey(userId)) { + try { + List devices = Context.getDataManager().getDeviceList(userId); + asyncSessions.put(userId, new AsyncSession(userId, devices)); + } catch (SQLException error) { + Log.warning(error); } + asyncSessions.get(userId).init(); } - }); + + asyncSessions.get(userId).request(context); + } } private void device(HttpServletRequest req, HttpServletResponse resp) throws IOException { -- cgit v1.2.3