From f37734db8425e5a1c660d554a6f3bf56af1c2bca Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Wed, 27 Dec 2023 18:02:18 -0800 Subject: Option to observe data logs --- src/main/java/org/traccar/BasePipelineFactory.java | 6 +- src/main/java/org/traccar/api/AsyncSocket.java | 40 ++++++++++---- .../traccar/handler/StandardLoggingHandler.java | 64 ++++++++++++---------- src/main/java/org/traccar/model/LogRecord.java | 63 +++++++++++++++++++++ .../org/traccar/session/ConnectionManager.java | 34 ++++++++---- src/main/java/org/traccar/session/Endpoint.java | 58 -------------------- 6 files changed, 152 insertions(+), 113 deletions(-) create mode 100644 src/main/java/org/traccar/model/LogRecord.java delete mode 100644 src/main/java/org/traccar/session/Endpoint.java (limited to 'src') diff --git a/src/main/java/org/traccar/BasePipelineFactory.java b/src/main/java/org/traccar/BasePipelineFactory.java index 5b48f3d15..ca4a4ae63 100644 --- a/src/main/java/org/traccar/BasePipelineFactory.java +++ b/src/main/java/org/traccar/BasePipelineFactory.java @@ -124,7 +124,11 @@ public abstract class BasePipelineFactory extends ChannelInitializer { pipeline.addLast(handler); } pipeline.addLast(new NetworkMessageHandler()); - pipeline.addLast(new StandardLoggingHandler(protocol)); + + var loggingHandler = new StandardLoggingHandler(protocol); + injector.injectMembers(loggingHandler); + pipeline.addLast(loggingHandler); + if (!connector.isDatagram() && !config.getBoolean(Keys.SERVER_INSTANT_ACKNOWLEDGEMENT)) { pipeline.addLast(new AcknowledgementHandler()); } diff --git a/src/main/java/org/traccar/api/AsyncSocket.java b/src/main/java/org/traccar/api/AsyncSocket.java index 5fc4b4412..f5fbcbf62 100644 --- a/src/main/java/org/traccar/api/AsyncSocket.java +++ b/src/main/java/org/traccar/api/AsyncSocket.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 - 2022 Anton Tananaev (anton@traccar.org) + * Copyright 2015 - 2023 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,16 +22,17 @@ import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.traccar.helper.model.PositionUtil; -import org.traccar.session.ConnectionManager; import org.traccar.model.Device; import org.traccar.model.Event; +import org.traccar.model.LogRecord; import org.traccar.model.Position; +import org.traccar.session.ConnectionManager; import org.traccar.storage.Storage; import org.traccar.storage.StorageException; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.UpdateListener { @@ -41,12 +42,15 @@ public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.U private static final String KEY_DEVICES = "devices"; private static final String KEY_POSITIONS = "positions"; private static final String KEY_EVENTS = "events"; + private static final String KEY_LOGS = "logs"; private final ObjectMapper objectMapper; private final ConnectionManager connectionManager; private final Storage storage; private final long userId; + private boolean includeLogs; + public AsyncSocket(ObjectMapper objectMapper, ConnectionManager connectionManager, Storage storage, long userId) { this.objectMapper = objectMapper; this.connectionManager = connectionManager; @@ -75,6 +79,17 @@ public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.U connectionManager.removeListener(userId, this); } + @Override + public void onWebSocketText(String message) { + super.onWebSocketText(message); + + try { + includeLogs = objectMapper.readTree(message).get("logs").asBoolean(); + } catch (JsonProcessingException e) { + LOGGER.warn("Socket JSON parsing error", e); + } + } + @Override public void onKeepalive() { sendData(new HashMap<>()); @@ -82,23 +97,24 @@ public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.U @Override public void onUpdateDevice(Device device) { - Map> data = new HashMap<>(); - data.put(KEY_DEVICES, Collections.singletonList(device)); - sendData(data); + sendData(Map.of(KEY_DEVICES, List.of(device))); } @Override public void onUpdatePosition(Position position) { - Map> data = new HashMap<>(); - data.put(KEY_POSITIONS, Collections.singletonList(position)); - sendData(data); + sendData(Map.of(KEY_POSITIONS, List.of(position))); } @Override public void onUpdateEvent(Event event) { - Map> data = new HashMap<>(); - data.put(KEY_EVENTS, Collections.singletonList(event)); - sendData(data); + sendData(Map.of(KEY_EVENTS, List.of(event))); + } + + @Override + public void onUpdateLog(LogRecord record) { + if (includeLogs) { + sendData(Map.of(KEY_LOGS, List.of(record))); + } } private void sendData(Map> data) { diff --git a/src/main/java/org/traccar/handler/StandardLoggingHandler.java b/src/main/java/org/traccar/handler/StandardLoggingHandler.java index 84492e2a5..b495747d3 100644 --- a/src/main/java/org/traccar/handler/StandardLoggingHandler.java +++ b/src/main/java/org/traccar/handler/StandardLoggingHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 - 2022 Anton Tananaev (anton@traccar.org) + * Copyright 2019 - 2023 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,68 +20,72 @@ import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import jakarta.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.traccar.NetworkMessage; import org.traccar.helper.NetworkUtil; +import org.traccar.model.LogRecord; +import org.traccar.session.ConnectionManager; import java.net.InetSocketAddress; -import java.net.SocketAddress; public class StandardLoggingHandler extends ChannelDuplexHandler { private static final Logger LOGGER = LoggerFactory.getLogger(StandardLoggingHandler.class); private final String protocol; + private ConnectionManager connectionManager; public StandardLoggingHandler(String protocol) { this.protocol = protocol; } + @Inject + public void setConnectionManager(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - log(ctx, false, msg); + LogRecord record = createLogRecord(msg); + log(ctx, false, record); super.channelRead(ctx, msg); + if (record != null) { + connectionManager.updateLog(record); + } } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - log(ctx, true, msg); + log(ctx, true, createLogRecord(msg)); super.write(ctx, msg, promise); } - public void log(ChannelHandlerContext ctx, boolean downstream, Object o) { - if (o instanceof NetworkMessage) { - NetworkMessage networkMessage = (NetworkMessage) o; + private LogRecord createLogRecord(Object msg) { + if (msg instanceof NetworkMessage) { + NetworkMessage networkMessage = (NetworkMessage) msg; if (networkMessage.getMessage() instanceof ByteBuf) { - log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage()); + LogRecord record = new LogRecord(); + record.setAddress((InetSocketAddress) networkMessage.getRemoteAddress()); + record.setData(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage())); + return record; } - } else if (o instanceof ByteBuf) { - log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o); } + return null; } - public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) { - StringBuilder message = new StringBuilder(); - - message.append("[").append(NetworkUtil.session(ctx.channel())).append(": "); - message.append(protocol); - if (downstream) { - message.append(" > "); - } else { - message.append(" < "); + private void log(ChannelHandlerContext ctx, boolean downstream, LogRecord record) { + if (record != null) { + StringBuilder message = new StringBuilder(); + message.append("[").append(NetworkUtil.session(ctx.channel())).append(": "); + message.append(protocol); + message.append(downstream ? " > " : " < "); + message.append(record.getAddress().getHostString()); + message.append("] "); + message.append(record.getData()); + LOGGER.info(message.toString()); } - - if (remoteAddress instanceof InetSocketAddress) { - message.append(((InetSocketAddress) remoteAddress).getHostString()); - } else { - message.append("unknown"); - } - message.append("] "); - - message.append(ByteBufUtil.hexDump(buf)); - - LOGGER.info(message.toString()); } } diff --git a/src/main/java/org/traccar/model/LogRecord.java b/src/main/java/org/traccar/model/LogRecord.java new file mode 100644 index 000000000..3feaadec2 --- /dev/null +++ b/src/main/java/org/traccar/model/LogRecord.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.model; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.net.InetSocketAddress; + +public class LogRecord { + + private InetSocketAddress address; + + public void setAddress(InetSocketAddress address) { + this.address = address; + } + + @JsonIgnore + public InetSocketAddress getAddress() { + return address; + } + + public int getPort() { + return address.getPort(); + } + + public String getHost() { + return address.getHostString(); + } + + private String uniqueId; + + public String getUniqueId() { + return uniqueId; + } + + public void setUniqueId(String uniqueId) { + this.uniqueId = uniqueId; + } + + private String data; + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } + +} diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java index 3716fdf9a..7541cedfb 100644 --- a/src/main/java/org/traccar/session/ConnectionManager.java +++ b/src/main/java/org/traccar/session/ConnectionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 - 2022 Anton Tananaev (anton@traccar.org) + * Copyright 2015 - 2023 Anton Tananaev (anton@traccar.org) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.traccar.database.NotificationManager; import org.traccar.model.BaseModel; import org.traccar.model.Device; import org.traccar.model.Event; +import org.traccar.model.LogRecord; import org.traccar.model.Position; import org.traccar.model.User; import org.traccar.session.cache.CacheManager; @@ -64,7 +65,7 @@ public class ConnectionManager implements BroadcastInterface { private final long deviceTimeout; private final Map sessionsByDeviceId = new ConcurrentHashMap<>(); - private final Map> sessionsByEndpoint = new ConcurrentHashMap<>(); + private final Map> sessionsByEndpoint = new ConcurrentHashMap<>(); private final Config config; private final CacheManager cacheManager; @@ -104,9 +105,8 @@ public class ConnectionManager implements BroadcastInterface { Protocol protocol, Channel channel, SocketAddress remoteAddress, String... uniqueIds) throws Exception { - Endpoint endpoint = new Endpoint(channel, remoteAddress); Map endpointSessions = sessionsByEndpoint.getOrDefault( - endpoint, new ConcurrentHashMap<>()); + remoteAddress, new ConcurrentHashMap<>()); uniqueIds = Arrays.stream(uniqueIds).filter(Objects::nonNull).toArray(String[]::new); if (uniqueIds.length > 0) { @@ -133,19 +133,18 @@ public class ConnectionManager implements BroadcastInterface { DeviceSession oldSession = sessionsByDeviceId.remove(device.getId()); if (oldSession != null) { - Endpoint oldEndpoint = new Endpoint(oldSession.getChannel(), oldSession.getRemoteAddress()); - Map oldEndpointSessions = sessionsByEndpoint.get(oldEndpoint); + Map oldEndpointSessions = sessionsByEndpoint.get(oldSession.getRemoteAddress()); if (oldEndpointSessions != null && oldEndpointSessions.size() > 1) { oldEndpointSessions.remove(device.getUniqueId()); } else { - sessionsByEndpoint.remove(oldEndpoint); + sessionsByEndpoint.remove(oldSession.getRemoteAddress()); } } DeviceSession deviceSession = new DeviceSession( device.getId(), device.getUniqueId(), protocol, channel, remoteAddress); endpointSessions.put(device.getUniqueId(), deviceSession); - sessionsByEndpoint.put(endpoint, endpointSessions); + sessionsByEndpoint.put(remoteAddress, endpointSessions); sessionsByDeviceId.put(device.getId(), deviceSession); if (oldSession == null) { @@ -182,8 +181,7 @@ public class ConnectionManager implements BroadcastInterface { } public void deviceDisconnected(Channel channel, boolean supportsOffline) { - Endpoint endpoint = new Endpoint(channel, channel.remoteAddress()); - Map endpointSessions = sessionsByEndpoint.remove(endpoint); + Map endpointSessions = sessionsByEndpoint.remove(channel.remoteAddress()); if (endpointSessions != null) { for (DeviceSession deviceSession : endpointSessions.values()) { if (supportsOffline) { @@ -204,8 +202,7 @@ public class ConnectionManager implements BroadcastInterface { DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId); if (deviceSession != null) { cacheManager.removeDevice(deviceId); - Endpoint endpoint = new Endpoint(deviceSession.getChannel(), deviceSession.getRemoteAddress()); - sessionsByEndpoint.computeIfPresent(endpoint, (e, sessions) -> { + sessionsByEndpoint.computeIfPresent(deviceSession.getRemoteAddress(), (e, sessions) -> { sessions.remove(deviceSession.getUniqueId()); return sessions.isEmpty() ? null : sessions; }); @@ -337,11 +334,24 @@ public class ConnectionManager implements BroadcastInterface { } } + public synchronized void updateLog(LogRecord record) { + var sessions = sessionsByEndpoint.getOrDefault(record.getAddress(), Map.of()); + for (var session : sessions.entrySet()) { + record.setUniqueId(session.getKey()); + for (long userId : deviceUsers.getOrDefault(session.getValue().getDeviceId(), Set.of())) { + for (UpdateListener listener : listeners.getOrDefault(userId, Set.of())) { + listener.onUpdateLog(record); + } + } + } + } + public interface UpdateListener { void onKeepalive(); void onUpdateDevice(Device device); void onUpdatePosition(Position position); void onUpdateEvent(Event event); + void onUpdateLog(LogRecord record); } public synchronized void addListener(long userId, UpdateListener listener) throws StorageException { diff --git a/src/main/java/org/traccar/session/Endpoint.java b/src/main/java/org/traccar/session/Endpoint.java deleted file mode 100644 index 76aac3444..000000000 --- a/src/main/java/org/traccar/session/Endpoint.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2022 Anton Tananaev (anton@traccar.org) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.traccar.session; - -import io.netty.channel.Channel; - -import java.net.SocketAddress; -import java.util.Objects; - -public class Endpoint { - - private final Channel channel; - private final SocketAddress remoteAddress; - - public Endpoint(Channel channel, SocketAddress remoteAddress) { - this.channel = channel; - this.remoteAddress = remoteAddress; - } - - public Channel getChannel() { - return channel; - } - - public SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Endpoint endpoint = (Endpoint) o; - return channel.equals(endpoint.channel) && remoteAddress.equals(endpoint.remoteAddress); - } - - @Override - public int hashCode() { - return Objects.hash(channel, remoteAddress); - } - -} -- cgit v1.2.3