From c8ad5cc5c45ab47abe35e3ad2e4f607d520e627d Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Tue, 6 Oct 2015 11:17:18 +1300 Subject: Implement async reverse geocoding --- src/org/traccar/ExtendedObjectDecoder.java | 6 ++-- src/org/traccar/ReverseGeocoderHandler.java | 36 ++++++++++++++++---- src/org/traccar/geocode/JsonReverseGeocoder.java | 42 +++++++++++++----------- src/org/traccar/geocode/ReverseGeocoder.java | 8 ++++- 4 files changed, 61 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/org/traccar/ExtendedObjectDecoder.java b/src/org/traccar/ExtendedObjectDecoder.java index b00e04c33..382ef869d 100644 --- a/src/org/traccar/ExtendedObjectDecoder.java +++ b/src/org/traccar/ExtendedObjectDecoder.java @@ -21,7 +21,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelUpstreamHandler; -import static org.jboss.netty.channel.Channels.fireMessageReceived; +import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; public abstract class ExtendedObjectDecoder implements ChannelUpstreamHandler { @@ -42,10 +42,10 @@ public abstract class ExtendedObjectDecoder implements ChannelUpstreamHandler { } else if (decodedMessage != null) { if (decodedMessage instanceof Collection) { for (Object o : (Collection) decodedMessage) { - fireMessageReceived(ctx, o, e.getRemoteAddress()); + Channels.fireMessageReceived(ctx, o, e.getRemoteAddress()); } } else { - fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); + Channels.fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); } } } diff --git a/src/org/traccar/ReverseGeocoderHandler.java b/src/org/traccar/ReverseGeocoderHandler.java index bb4f5cedd..298f59736 100644 --- a/src/org/traccar/ReverseGeocoderHandler.java +++ b/src/org/traccar/ReverseGeocoderHandler.java @@ -15,29 +15,51 @@ */ package org.traccar; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelUpstreamHandler; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; import org.traccar.geocode.AddressFormat; import org.traccar.geocode.ReverseGeocoder; import org.traccar.model.Position; -public class ReverseGeocoderHandler extends BaseDataHandler { +public class ReverseGeocoderHandler implements ChannelUpstreamHandler { private final ReverseGeocoder geocoder; private final boolean processInvalidPositions; private final AddressFormat addressFormat; - public ReverseGeocoderHandler(ReverseGeocoder geocoder, boolean processInvalidPositions ) { + public ReverseGeocoderHandler(ReverseGeocoder geocoder, boolean processInvalidPositions) { this.geocoder = geocoder; this.processInvalidPositions = processInvalidPositions; addressFormat = new AddressFormat(); } @Override - protected Position handlePosition(Position position) { - if (geocoder != null && (processInvalidPositions || position.getValid())) { - position.setAddress(geocoder.getAddress( - addressFormat, position.getLatitude(), position.getLongitude())); + public void handleUpstream(final ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { + if (!(evt instanceof MessageEvent)) { + ctx.sendUpstream(evt); + return; + } + + final MessageEvent e = (MessageEvent) evt; + Object message = e.getMessage(); + if (message instanceof Position) { + final Position position = (Position) message; + if (geocoder != null && (processInvalidPositions || position.getValid())) { + geocoder.getAddress(addressFormat, position.getLatitude(), position.getLongitude(), + new ReverseGeocoder.ReverseGeocoderCallback() { + @Override + public void onResult(String address) { + position.setAddress(address); + Channels.fireMessageReceived(ctx, position, e.getRemoteAddress()); + } + }); + } + } else { + Channels.fireMessageReceived(ctx, message, e.getRemoteAddress()); } - return position; } } diff --git a/src/org/traccar/geocode/JsonReverseGeocoder.java b/src/org/traccar/geocode/JsonReverseGeocoder.java index fd86f06d5..25b8e7853 100644 --- a/src/org/traccar/geocode/JsonReverseGeocoder.java +++ b/src/org/traccar/geocode/JsonReverseGeocoder.java @@ -15,6 +15,9 @@ */ package org.traccar.geocode; +import com.ning.http.client.AsyncCompletionHandler; +import com.ning.http.client.Response; +import org.traccar.Context; import org.traccar.helper.Log; import javax.json.Json; @@ -24,6 +27,7 @@ import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.util.AbstractMap; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -36,50 +40,48 @@ public abstract class JsonReverseGeocoder implements ReverseGeocoder { public JsonReverseGeocoder(String url, final int cacheSize) { this.url = url; if (cacheSize > 0) { - this.cache = new LinkedHashMap, String>() { + this.cache = Collections.synchronizedMap(new LinkedHashMap, String>() { protected boolean removeEldestEntry(Map.Entry eldest) { return size() > cacheSize; } - }; + }); } } @Override - public String getAddress(AddressFormat format, double latitude, double longitude) { + public void getAddress(final AddressFormat format, final double latitude, final double longitude, final ReverseGeocoderCallback callback) { if (cache != null) { String cachedAddress = cache.get(new AbstractMap.SimpleImmutableEntry<>(latitude, longitude)); if (cachedAddress != null) { - return cachedAddress; + callback.onResult(cachedAddress); + return; } } - try { - HttpURLConnection conn = (HttpURLConnection) new URL(String.format(url, latitude, longitude)).openConnection(); - conn.setRequestProperty("Connection", "close"); // don't keep-alive connections - try { - InputStreamReader streamReader = new InputStreamReader(conn.getInputStream()); - try (JsonReader reader = Json.createReader(streamReader)) { + Context.getAsyncHttpClient().prepareGet(String.format(url, latitude, longitude)).execute(new AsyncCompletionHandler() { + @Override + public Object onCompleted(Response response) throws Exception { + try (JsonReader reader = Json.createReader(response.getResponseBodyAsStream())) { Address address = parseAddress(reader.readObject()); - while (streamReader.read() > 0); // make sure we reached the end if (address != null) { String formattedAddress = format.format(address); - if (cache != null) { cache.put(new AbstractMap.SimpleImmutableEntry<>(latitude, longitude), formattedAddress); } - - return formattedAddress; + callback.onResult(formattedAddress); + } else { + callback.onResult(null); } } - } finally { - conn.disconnect(); + return null; } - } catch(Exception error) { - Log.warning(error); - } - return null; + @Override + public void onThrowable(Throwable t) { + callback.onResult(null); + } + }); } protected abstract Address parseAddress(JsonObject json); diff --git a/src/org/traccar/geocode/ReverseGeocoder.java b/src/org/traccar/geocode/ReverseGeocoder.java index 336a26d83..1367e82dc 100644 --- a/src/org/traccar/geocode/ReverseGeocoder.java +++ b/src/org/traccar/geocode/ReverseGeocoder.java @@ -17,6 +17,12 @@ package org.traccar.geocode; public interface ReverseGeocoder { - String getAddress(AddressFormat format, double latitude, double longitude); + public interface ReverseGeocoderCallback { + + void onResult(String address); + + } + + void getAddress(AddressFormat format, double latitude, double longitude, ReverseGeocoderCallback callback); } -- cgit v1.2.3