aboutsummaryrefslogtreecommitdiff
path: root/src/org/traccar/BasePipelineFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/org/traccar/BasePipelineFactory.java')
-rw-r--r--src/org/traccar/BasePipelineFactory.java312
1 files changed, 65 insertions, 247 deletions
diff --git a/src/org/traccar/BasePipelineFactory.java b/src/org/traccar/BasePipelineFactory.java
index b45e3a280..b3d37f689 100644
--- a/src/org/traccar/BasePipelineFactory.java
+++ b/src/org/traccar/BasePipelineFactory.java
@@ -15,36 +15,40 @@
*/
package org.traccar;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.DatagramChannel;
-import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.traccar.events.CommandResultEventHandler;
-import org.traccar.events.DriverEventHandler;
-import org.traccar.events.FuelDropEventHandler;
-import org.traccar.events.GeofenceEventHandler;
-import org.traccar.events.IgnitionEventHandler;
-import org.traccar.events.MaintenanceEventHandler;
-import org.traccar.events.MotionEventHandler;
-import org.traccar.events.OverspeedEventHandler;
-import org.traccar.events.AlertEventHandler;
-import org.traccar.processing.ComputedAttributesHandler;
-import org.traccar.processing.CopyAttributesHandler;
+import org.traccar.config.Keys;
+import org.traccar.handler.DefaultDataHandler;
+import org.traccar.handler.events.AlertEventHandler;
+import org.traccar.handler.events.CommandResultEventHandler;
+import org.traccar.handler.events.DriverEventHandler;
+import org.traccar.handler.events.FuelDropEventHandler;
+import org.traccar.handler.events.GeofenceEventHandler;
+import org.traccar.handler.events.IgnitionEventHandler;
+import org.traccar.handler.events.MaintenanceEventHandler;
+import org.traccar.handler.events.MotionEventHandler;
+import org.traccar.handler.events.OverspeedEventHandler;
+import org.traccar.handler.ComputedAttributesHandler;
+import org.traccar.handler.CopyAttributesHandler;
+import org.traccar.handler.DistanceHandler;
+import org.traccar.handler.EngineHoursHandler;
+import org.traccar.handler.FilterHandler;
+import org.traccar.handler.GeocoderHandler;
+import org.traccar.handler.GeolocationHandler;
+import org.traccar.handler.HemisphereHandler;
+import org.traccar.handler.MotionHandler;
+import org.traccar.handler.NetworkMessageHandler;
+import org.traccar.handler.OpenChannelHandler;
+import org.traccar.handler.RemoteAddressHandler;
+import org.traccar.handler.StandardLoggingHandler;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.Map;
public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
@@ -52,207 +56,25 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
private static final Logger LOGGER = LoggerFactory.getLogger(BasePipelineFactory.class);
private final TrackerServer server;
+ private boolean eventsEnabled;
private int timeout;
- private FilterHandler filterHandler;
- private DistanceHandler distanceHandler;
- private EngineHoursHandler engineHoursHandler;
- private RemoteAddressHandler remoteAddressHandler;
- private MotionHandler motionHandler;
- private GeocoderHandler geocoderHandler;
- private GeolocationHandler geolocationHandler;
- private HemisphereHandler hemisphereHandler;
- private CopyAttributesHandler copyAttributesHandler;
- private ComputedAttributesHandler computedAttributesHandler;
-
- private CommandResultEventHandler commandResultEventHandler;
- private OverspeedEventHandler overspeedEventHandler;
- private FuelDropEventHandler fuelDropEventHandler;
- private MotionEventHandler motionEventHandler;
- private GeofenceEventHandler geofenceEventHandler;
- private AlertEventHandler alertEventHandler;
- private IgnitionEventHandler ignitionEventHandler;
- private MaintenanceEventHandler maintenanceEventHandler;
- private DriverEventHandler driverEventHandler;
-
- private static final class OpenChannelHandler extends ChannelDuplexHandler {
-
- private final TrackerServer server;
-
- private OpenChannelHandler(TrackerServer server) {
- this.server = server;
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- server.getChannelGroup().add(ctx.channel());
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- super.channelInactive(ctx);
- server.getChannelGroup().remove(ctx.channel());
- }
-
- }
-
- private static class NetworkMessageHandler extends ChannelDuplexHandler {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- if (ctx.channel() instanceof DatagramChannel) {
- DatagramPacket packet = (DatagramPacket) msg;
- ctx.fireChannelRead(new NetworkMessage(packet.content(), packet.sender()));
- } else if (msg instanceof ByteBuf) {
- ByteBuf buffer = (ByteBuf) msg;
- ctx.fireChannelRead(new NetworkMessage(buffer, ctx.channel().remoteAddress()));
- }
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- if (msg instanceof NetworkMessage) {
- NetworkMessage message = (NetworkMessage) msg;
- if (ctx.channel() instanceof DatagramChannel) {
- InetSocketAddress recipient = (InetSocketAddress) message.getRemoteAddress();
- InetSocketAddress sender = (InetSocketAddress) ctx.channel().localAddress();
- ctx.write(new DatagramPacket((ByteBuf) message.getMessage(), recipient, sender), promise);
- } else {
- ctx.write(message.getMessage(), promise);
- }
- } else {
- ctx.write(msg, promise);
- }
- }
-
- }
-
- private static class StandardLoggingHandler extends ChannelDuplexHandler {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- log(ctx, false, msg);
- super.channelRead(ctx, msg);
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- log(ctx, true, msg);
- super.write(ctx, msg, promise);
- }
-
- public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
- if (o instanceof NetworkMessage) {
- NetworkMessage networkMessage = (NetworkMessage) o;
- if (networkMessage.getMessage() instanceof ByteBuf) {
- log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage());
- }
- } else if (o instanceof ByteBuf) {
- log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o);
- }
- }
-
- public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) {
- StringBuilder message = new StringBuilder();
-
- message.append("[").append(ctx.channel().id().asShortText()).append(": ");
- message.append(((InetSocketAddress) ctx.channel().localAddress()).getPort());
- if (downstream) {
- message.append(" > ");
- } else {
- message.append(" < ");
- }
-
- if (remoteAddress instanceof InetSocketAddress) {
- message.append(((InetSocketAddress) remoteAddress).getHostString());
- } else {
- message.append("unknown");
- }
- message.append("]");
-
- message.append(" HEX: ");
- message.append(ByteBufUtil.hexDump(buf));
-
- LOGGER.info(message.toString());
- }
-
- }
-
public BasePipelineFactory(TrackerServer server, String protocol) {
this.server = server;
-
- timeout = Context.getConfig().getInteger(protocol + ".timeout");
+ eventsEnabled = Context.getConfig().getBoolean(Keys.EVENT_ENABLE);
+ timeout = Context.getConfig().getInteger(Keys.PROTOCOL_TIMEOUT.withPrefix(protocol));
if (timeout == 0) {
- timeout = Context.getConfig().getInteger(protocol + ".resetDelay"); // temporary
- if (timeout == 0) {
- timeout = Context.getConfig().getInteger("server.timeout");
- }
- }
-
- distanceHandler = new DistanceHandler(
- Context.getConfig().getBoolean("coordinates.filter"),
- Context.getConfig().getInteger("coordinates.minError"),
- Context.getConfig().getInteger("coordinates.maxError"));
-
- if (Context.getConfig().getBoolean("processing.remoteAddress.enable")) {
- remoteAddressHandler = new RemoteAddressHandler();
- }
-
- if (Context.getConfig().getBoolean("filter.enable")) {
- filterHandler = new FilterHandler();
- }
-
- if (Context.getGeocoder() != null && !Context.getConfig().getBoolean("geocoder.ignorePositions")) {
- geocoderHandler = new GeocoderHandler(
- Context.getGeocoder(),
- Context.getConfig().getBoolean("geocoder.processInvalidPositions"));
- }
-
- if (Context.getGeolocationProvider() != null) {
- geolocationHandler = new GeolocationHandler(
- Context.getGeolocationProvider(),
- Context.getConfig().getBoolean("geolocation.processInvalidPositions"));
- }
-
- motionHandler = new MotionHandler(Context.getTripsConfig().getSpeedThreshold());
-
- if (Context.getConfig().getBoolean("processing.engineHours.enable")) {
- engineHoursHandler = new EngineHoursHandler();
- }
-
- if (Context.getConfig().hasKey("location.latitudeHemisphere")
- || Context.getConfig().hasKey("location.longitudeHemisphere")) {
- hemisphereHandler = new HemisphereHandler();
- }
-
- if (Context.getConfig().getBoolean("processing.copyAttributes.enable")) {
- copyAttributesHandler = new CopyAttributesHandler();
- }
-
- if (Context.getConfig().getBoolean("processing.computedAttributes.enable")) {
- computedAttributesHandler = new ComputedAttributesHandler();
- }
-
- if (Context.getConfig().getBoolean("event.enable")) {
- commandResultEventHandler = new CommandResultEventHandler();
- overspeedEventHandler = Context.getOverspeedEventHandler();
- fuelDropEventHandler = new FuelDropEventHandler();
- motionEventHandler = Context.getMotionEventHandler();
- geofenceEventHandler = new GeofenceEventHandler();
- alertEventHandler = new AlertEventHandler();
- ignitionEventHandler = new IgnitionEventHandler();
- maintenanceEventHandler = new MaintenanceEventHandler();
- driverEventHandler = new DriverEventHandler();
+ timeout = Context.getConfig().getInteger(Keys.SERVER_TIMEOUT);
}
}
protected abstract void addProtocolHandlers(PipelineBuilder pipeline);
- private void addHandlers(ChannelPipeline pipeline, ChannelHandler... handlers) {
- for (ChannelHandler handler : handlers) {
- if (handler != null) {
- pipeline.addLast(handler);
+ @SafeVarargs
+ private final void addHandlers(ChannelPipeline pipeline, Class<? extends ChannelHandler>... handlerClasses) {
+ for (Class<? extends ChannelHandler> handlerClass : handlerClasses) {
+ if (handlerClass != null) {
+ pipeline.addLast(Main.getInjector().getInstance(handlerClass));
}
}
}
@@ -273,8 +95,9 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
}
@Override
- protected void initChannel(Channel channel) throws Exception {
+ protected void initChannel(Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
+
if (timeout > 0 && !server.isDatagram()) {
pipeline.addLast(new IdleStateHandler(timeout, 0, 0));
}
@@ -295,53 +118,48 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
addHandlers(
pipeline,
- geolocationHandler,
- hemisphereHandler,
- distanceHandler,
- remoteAddressHandler);
+ GeolocationHandler.class,
+ HemisphereHandler.class,
+ DistanceHandler.class,
+ RemoteAddressHandler.class);
addDynamicHandlers(pipeline);
addHandlers(
pipeline,
- filterHandler,
- geocoderHandler,
- motionHandler,
- engineHoursHandler,
- copyAttributesHandler,
- computedAttributesHandler);
-
- if (Context.getDataManager() != null) {
- pipeline.addLast(new DefaultDataHandler());
- }
-
- if (Context.getConfig().getBoolean("forward.enable")) {
- pipeline.addLast(Main.getInjector().getInstance(WebDataHandler.Factory.class).create(
- Context.getConfig().getString("forward.url"), Context.getConfig().getBoolean("forward.json")));
+ FilterHandler.class,
+ GeocoderHandler.class,
+ MotionHandler.class,
+ EngineHoursHandler.class,
+ CopyAttributesHandler.class,
+ ComputedAttributesHandler.class,
+ WebDataHandler.class,
+ DefaultDataHandler.class);
+
+ if (eventsEnabled) {
+ addHandlers(
+ pipeline,
+ CommandResultEventHandler.class,
+ OverspeedEventHandler.class,
+ FuelDropEventHandler.class,
+ MotionEventHandler.class,
+ GeofenceEventHandler.class,
+ AlertEventHandler.class,
+ IgnitionEventHandler.class,
+ MaintenanceEventHandler.class,
+ DriverEventHandler.class);
}
- addHandlers(
- pipeline,
- commandResultEventHandler,
- overspeedEventHandler,
- fuelDropEventHandler,
- motionEventHandler,
- geofenceEventHandler,
- alertEventHandler,
- ignitionEventHandler,
- maintenanceEventHandler,
- driverEventHandler);
-
pipeline.addLast(new MainEventHandler());
}
private void addDynamicHandlers(ChannelPipeline pipeline) {
- if (Context.getConfig().hasKey("extra.handlers")) {
- String[] handlers = Context.getConfig().getString("extra.handlers").split(",");
- for (String handler : handlers) {
+ String handlers = Context.getConfig().getString(Keys.EXTRA_HANDLERS);
+ if (handlers != null) {
+ for (String handler : handlers.split(",")) {
try {
- pipeline.addLast((ChannelHandler) Class.forName(handler).newInstance());
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException error) {
+ pipeline.addLast((ChannelHandler) Class.forName(handler).getDeclaredConstructor().newInstance());
+ } catch (ReflectiveOperationException error) {
LOGGER.warn("Dynamic handler error", error);
}
}