From dd8c46eec3282e344eeea5e27d084448c85e1518 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Mon, 10 Apr 2023 19:40:22 -0700 Subject: Add raw data forwarding --- src/main/java/org/traccar/BasePipelineFactory.java | 13 ++++- src/main/java/org/traccar/config/Keys.java | 7 +++ .../java/org/traccar/forward/NetworkForwarder.java | 66 ++++++++++++++++++++++ .../traccar/handler/NetworkForwarderHandler.java | 64 +++++++++++++++++++++ 4 files changed, 147 insertions(+), 3 deletions(-) create mode 100644 src/main/java/org/traccar/forward/NetworkForwarder.java create mode 100644 src/main/java/org/traccar/handler/NetworkForwarderHandler.java (limited to 'src/main/java') diff --git a/src/main/java/org/traccar/BasePipelineFactory.java b/src/main/java/org/traccar/BasePipelineFactory.java index 70f999c72..38b077980 100644 --- a/src/main/java/org/traccar/BasePipelineFactory.java +++ b/src/main/java/org/traccar/BasePipelineFactory.java @@ -36,6 +36,7 @@ import org.traccar.handler.GeocoderHandler; import org.traccar.handler.GeolocationHandler; import org.traccar.handler.HemisphereHandler; import org.traccar.handler.MotionHandler; +import org.traccar.handler.NetworkForwarderHandler; import org.traccar.handler.NetworkMessageHandler; import org.traccar.handler.OpenChannelHandler; import org.traccar.handler.RemoteAddressHandler; @@ -60,15 +61,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer { private final Injector injector; private final TrackerConnector connector; + private final Config config; private final String protocol; - private final boolean instantAcknowledgement; private final int timeout; public BasePipelineFactory(TrackerConnector connector, Config config, String protocol) { this.injector = Main.getInjector(); this.connector = connector; + this.config = config; this.protocol = protocol; - instantAcknowledgement = config.getBoolean(Keys.SERVER_INSTANT_ACKNOWLEDGEMENT); int timeout = config.getInteger(Keys.PROTOCOL_TIMEOUT.withPrefix(protocol)); if (timeout == 0) { this.timeout = config.getInteger(Keys.SERVER_TIMEOUT); @@ -115,9 +116,15 @@ public abstract class BasePipelineFactory extends ChannelInitializer { pipeline.addLast(new IdleStateHandler(timeout, 0, 0)); } pipeline.addLast(new OpenChannelHandler(connector)); + if (config.hasKey(Keys.SERVER_FORWARD)) { + int port = config.getInteger(Keys.PROTOCOL_PORT.withPrefix(protocol)); + var handler = new NetworkForwarderHandler(port); + injector.injectMembers(handler); + pipeline.addLast(handler); + } pipeline.addLast(new NetworkMessageHandler()); pipeline.addLast(new StandardLoggingHandler(protocol)); - if (!instantAcknowledgement) { + if (!config.getBoolean(Keys.SERVER_INSTANT_ACKNOWLEDGEMENT)) { pipeline.addLast(new AcknowledgementHandler()); } diff --git a/src/main/java/org/traccar/config/Keys.java b/src/main/java/org/traccar/config/Keys.java index b97acfd66..8ced32153 100644 --- a/src/main/java/org/traccar/config/Keys.java +++ b/src/main/java/org/traccar/config/Keys.java @@ -805,6 +805,13 @@ public final class Keys { List.of(KeyType.CONFIG), "max-age=3600,public"); + /** + * Host for raw data forwarding. + */ + public static final ConfigKey SERVER_FORWARD = new StringConfigKey( + "server.forward", + List.of(KeyType.CONFIG)); + /** * Position forwarding format. Available options are "url", "json" and "kafka". Default is "url". */ diff --git a/src/main/java/org/traccar/forward/NetworkForwarder.java b/src/main/java/org/traccar/forward/NetworkForwarder.java new file mode 100644 index 000000000..6abf2b7ba --- /dev/null +++ b/src/main/java/org/traccar/forward/NetworkForwarder.java @@ -0,0 +1,66 @@ +/* + * Copyright 2023 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.forward; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.traccar.config.Config; +import org.traccar.config.Keys; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; + +@Singleton +public class NetworkForwarder { + + private static final Logger LOGGER = LoggerFactory.getLogger(NetworkForwarder.class); + + private final InetAddress destination; + private final DatagramSocket connectionUdp; + private final Map connectionsTcp = new HashMap<>(); + + @Inject + public NetworkForwarder(Config config) throws IOException { + destination = InetAddress.getByName(config.getString(Keys.SERVER_FORWARD)); + connectionUdp = new DatagramSocket(); + } + + public void forward(InetSocketAddress source, int port, boolean datagram, byte[] data) { + try { + if (datagram) { + connectionUdp.send(new DatagramPacket(data, data.length, destination, port)); + } else { + Socket connectionTcp = connectionsTcp.get(source); + if (connectionTcp == null || connectionTcp.isClosed()) { + connectionTcp = new Socket(destination, port); + connectionsTcp.put(source, connectionTcp); + } + connectionTcp.getOutputStream().write(data); + } + } catch (IOException e) { + LOGGER.warn("Network forwarding error", e); + } + } + +} diff --git a/src/main/java/org/traccar/handler/NetworkForwarderHandler.java b/src/main/java/org/traccar/handler/NetworkForwarderHandler.java new file mode 100644 index 000000000..a3792a3e5 --- /dev/null +++ b/src/main/java/org/traccar/handler/NetworkForwarderHandler.java @@ -0,0 +1,64 @@ +/* + * Copyright 2023 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import org.traccar.forward.NetworkForwarder; + +import javax.inject.Inject; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class NetworkForwarderHandler extends ChannelInboundHandlerAdapter { + + private final int port; + + private NetworkForwarder networkForwarder; + + public NetworkForwarderHandler(int port) { + this.port = port; + } + + @Inject + public void setNetworkForwarder(NetworkForwarder networkForwarder) { + this.networkForwarder = networkForwarder; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + boolean datagram = ctx.channel() instanceof DatagramChannel; + SocketAddress remoteAddress; + ByteBuf buffer; + if (datagram) { + DatagramPacket message = (DatagramPacket) msg; + remoteAddress = message.recipient(); + buffer = message.content(); + } else { + remoteAddress = ctx.channel().remoteAddress(); + buffer = (ByteBuf) msg; + } + + byte[] data = new byte[buffer.readableBytes()]; + buffer.getBytes(buffer.readerIndex(), data); + networkForwarder.forward((InetSocketAddress) remoteAddress, port, datagram, data); + super.channelRead(ctx, msg); + } + +} -- cgit v1.2.3