aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Tananaev <anton@traccar.org>2023-12-27 18:02:18 -0800
committerAnton Tananaev <anton@traccar.org>2023-12-27 18:02:18 -0800
commitf37734db8425e5a1c660d554a6f3bf56af1c2bca (patch)
treeccea7c822115c52cab56e39a4f6f8a0a47e5a2cd
parent5cebbfccae29dad81d89591a798d6aa29dd42b75 (diff)
downloadtrackermap-server-f37734db8425e5a1c660d554a6f3bf56af1c2bca.tar.gz
trackermap-server-f37734db8425e5a1c660d554a6f3bf56af1c2bca.tar.bz2
trackermap-server-f37734db8425e5a1c660d554a6f3bf56af1c2bca.zip
Option to observe data logs
-rw-r--r--src/main/java/org/traccar/BasePipelineFactory.java6
-rw-r--r--src/main/java/org/traccar/api/AsyncSocket.java40
-rw-r--r--src/main/java/org/traccar/handler/StandardLoggingHandler.java64
-rw-r--r--src/main/java/org/traccar/model/LogRecord.java63
-rw-r--r--src/main/java/org/traccar/session/ConnectionManager.java34
-rw-r--r--src/main/java/org/traccar/session/Endpoint.java58
6 files changed, 152 insertions, 113 deletions
diff --git a/src/main/java/org/traccar/BasePipelineFactory.java b/src/main/java/org/traccar/BasePipelineFactory.java
index 5b48f3d15..ca4a4ae63 100644
--- a/src/main/java/org/traccar/BasePipelineFactory.java
+++ b/src/main/java/org/traccar/BasePipelineFactory.java
@@ -124,7 +124,11 @@ public abstract class BasePipelineFactory extends ChannelInitializer<Channel> {
pipeline.addLast(handler);
}
pipeline.addLast(new NetworkMessageHandler());
- pipeline.addLast(new StandardLoggingHandler(protocol));
+
+ var loggingHandler = new StandardLoggingHandler(protocol);
+ injector.injectMembers(loggingHandler);
+ pipeline.addLast(loggingHandler);
+
if (!connector.isDatagram() && !config.getBoolean(Keys.SERVER_INSTANT_ACKNOWLEDGEMENT)) {
pipeline.addLast(new AcknowledgementHandler());
}
diff --git a/src/main/java/org/traccar/api/AsyncSocket.java b/src/main/java/org/traccar/api/AsyncSocket.java
index 5fc4b4412..f5fbcbf62 100644
--- a/src/main/java/org/traccar/api/AsyncSocket.java
+++ b/src/main/java/org/traccar/api/AsyncSocket.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015 - 2022 Anton Tananaev (anton@traccar.org)
+ * Copyright 2015 - 2023 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,16 +22,17 @@ import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.helper.model.PositionUtil;
-import org.traccar.session.ConnectionManager;
import org.traccar.model.Device;
import org.traccar.model.Event;
+import org.traccar.model.LogRecord;
import org.traccar.model.Position;
+import org.traccar.session.ConnectionManager;
import org.traccar.storage.Storage;
import org.traccar.storage.StorageException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.UpdateListener {
@@ -41,12 +42,15 @@ public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.U
private static final String KEY_DEVICES = "devices";
private static final String KEY_POSITIONS = "positions";
private static final String KEY_EVENTS = "events";
+ private static final String KEY_LOGS = "logs";
private final ObjectMapper objectMapper;
private final ConnectionManager connectionManager;
private final Storage storage;
private final long userId;
+ private boolean includeLogs;
+
public AsyncSocket(ObjectMapper objectMapper, ConnectionManager connectionManager, Storage storage, long userId) {
this.objectMapper = objectMapper;
this.connectionManager = connectionManager;
@@ -76,29 +80,41 @@ public class AsyncSocket extends WebSocketAdapter implements ConnectionManager.U
}
@Override
+ public void onWebSocketText(String message) {
+ super.onWebSocketText(message);
+
+ try {
+ includeLogs = objectMapper.readTree(message).get("logs").asBoolean();
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Socket JSON parsing error", e);
+ }
+ }
+
+ @Override
public void onKeepalive() {
sendData(new HashMap<>());
}
@Override
public void onUpdateDevice(Device device) {
- Map<String, Collection<?>> data = new HashMap<>();
- data.put(KEY_DEVICES, Collections.singletonList(device));
- sendData(data);
+ sendData(Map.of(KEY_DEVICES, List.of(device)));
}
@Override
public void onUpdatePosition(Position position) {
- Map<String, Collection<?>> data = new HashMap<>();
- data.put(KEY_POSITIONS, Collections.singletonList(position));
- sendData(data);
+ sendData(Map.of(KEY_POSITIONS, List.of(position)));
}
@Override
public void onUpdateEvent(Event event) {
- Map<String, Collection<?>> data = new HashMap<>();
- data.put(KEY_EVENTS, Collections.singletonList(event));
- sendData(data);
+ sendData(Map.of(KEY_EVENTS, List.of(event)));
+ }
+
+ @Override
+ public void onUpdateLog(LogRecord record) {
+ if (includeLogs) {
+ sendData(Map.of(KEY_LOGS, List.of(record)));
+ }
}
private void sendData(Map<String, Collection<?>> data) {
diff --git a/src/main/java/org/traccar/handler/StandardLoggingHandler.java b/src/main/java/org/traccar/handler/StandardLoggingHandler.java
index 84492e2a5..b495747d3 100644
--- a/src/main/java/org/traccar/handler/StandardLoggingHandler.java
+++ b/src/main/java/org/traccar/handler/StandardLoggingHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2019 - 2022 Anton Tananaev (anton@traccar.org)
+ * Copyright 2019 - 2023 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,68 +20,72 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.NetworkMessage;
import org.traccar.helper.NetworkUtil;
+import org.traccar.model.LogRecord;
+import org.traccar.session.ConnectionManager;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
public class StandardLoggingHandler extends ChannelDuplexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(StandardLoggingHandler.class);
private final String protocol;
+ private ConnectionManager connectionManager;
public StandardLoggingHandler(String protocol) {
this.protocol = protocol;
}
+ @Inject
+ public void setConnectionManager(ConnectionManager connectionManager) {
+ this.connectionManager = connectionManager;
+ }
+
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- log(ctx, false, msg);
+ LogRecord record = createLogRecord(msg);
+ log(ctx, false, record);
super.channelRead(ctx, msg);
+ if (record != null) {
+ connectionManager.updateLog(record);
+ }
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- log(ctx, true, msg);
+ log(ctx, true, createLogRecord(msg));
super.write(ctx, msg, promise);
}
- public void log(ChannelHandlerContext ctx, boolean downstream, Object o) {
- if (o instanceof NetworkMessage) {
- NetworkMessage networkMessage = (NetworkMessage) o;
+ private LogRecord createLogRecord(Object msg) {
+ if (msg instanceof NetworkMessage) {
+ NetworkMessage networkMessage = (NetworkMessage) msg;
if (networkMessage.getMessage() instanceof ByteBuf) {
- log(ctx, downstream, networkMessage.getRemoteAddress(), (ByteBuf) networkMessage.getMessage());
+ LogRecord record = new LogRecord();
+ record.setAddress((InetSocketAddress) networkMessage.getRemoteAddress());
+ record.setData(ByteBufUtil.hexDump((ByteBuf) networkMessage.getMessage()));
+ return record;
}
- } else if (o instanceof ByteBuf) {
- log(ctx, downstream, ctx.channel().remoteAddress(), (ByteBuf) o);
}
+ return null;
}
- public void log(ChannelHandlerContext ctx, boolean downstream, SocketAddress remoteAddress, ByteBuf buf) {
- StringBuilder message = new StringBuilder();
-
- message.append("[").append(NetworkUtil.session(ctx.channel())).append(": ");
- message.append(protocol);
- if (downstream) {
- message.append(" > ");
- } else {
- message.append(" < ");
+ private void log(ChannelHandlerContext ctx, boolean downstream, LogRecord record) {
+ if (record != null) {
+ StringBuilder message = new StringBuilder();
+ message.append("[").append(NetworkUtil.session(ctx.channel())).append(": ");
+ message.append(protocol);
+ message.append(downstream ? " > " : " < ");
+ message.append(record.getAddress().getHostString());
+ message.append("] ");
+ message.append(record.getData());
+ LOGGER.info(message.toString());
}
-
- if (remoteAddress instanceof InetSocketAddress) {
- message.append(((InetSocketAddress) remoteAddress).getHostString());
- } else {
- message.append("unknown");
- }
- message.append("] ");
-
- message.append(ByteBufUtil.hexDump(buf));
-
- LOGGER.info(message.toString());
}
}
diff --git a/src/main/java/org/traccar/model/LogRecord.java b/src/main/java/org/traccar/model/LogRecord.java
new file mode 100644
index 000000000..3feaadec2
--- /dev/null
+++ b/src/main/java/org/traccar/model/LogRecord.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2023 Anton Tananaev (anton@traccar.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.traccar.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.net.InetSocketAddress;
+
+public class LogRecord {
+
+ private InetSocketAddress address;
+
+ public void setAddress(InetSocketAddress address) {
+ this.address = address;
+ }
+
+ @JsonIgnore
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ public int getPort() {
+ return address.getPort();
+ }
+
+ public String getHost() {
+ return address.getHostString();
+ }
+
+ private String uniqueId;
+
+ public String getUniqueId() {
+ return uniqueId;
+ }
+
+ public void setUniqueId(String uniqueId) {
+ this.uniqueId = uniqueId;
+ }
+
+ private String data;
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+}
diff --git a/src/main/java/org/traccar/session/ConnectionManager.java b/src/main/java/org/traccar/session/ConnectionManager.java
index 3716fdf9a..7541cedfb 100644
--- a/src/main/java/org/traccar/session/ConnectionManager.java
+++ b/src/main/java/org/traccar/session/ConnectionManager.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015 - 2022 Anton Tananaev (anton@traccar.org)
+ * Copyright 2015 - 2023 Anton Tananaev (anton@traccar.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ import org.traccar.database.NotificationManager;
import org.traccar.model.BaseModel;
import org.traccar.model.Device;
import org.traccar.model.Event;
+import org.traccar.model.LogRecord;
import org.traccar.model.Position;
import org.traccar.model.User;
import org.traccar.session.cache.CacheManager;
@@ -64,7 +65,7 @@ public class ConnectionManager implements BroadcastInterface {
private final long deviceTimeout;
private final Map<Long, DeviceSession> sessionsByDeviceId = new ConcurrentHashMap<>();
- private final Map<Endpoint, Map<String, DeviceSession>> sessionsByEndpoint = new ConcurrentHashMap<>();
+ private final Map<SocketAddress, Map<String, DeviceSession>> sessionsByEndpoint = new ConcurrentHashMap<>();
private final Config config;
private final CacheManager cacheManager;
@@ -104,9 +105,8 @@ public class ConnectionManager implements BroadcastInterface {
Protocol protocol, Channel channel, SocketAddress remoteAddress,
String... uniqueIds) throws Exception {
- Endpoint endpoint = new Endpoint(channel, remoteAddress);
Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.getOrDefault(
- endpoint, new ConcurrentHashMap<>());
+ remoteAddress, new ConcurrentHashMap<>());
uniqueIds = Arrays.stream(uniqueIds).filter(Objects::nonNull).toArray(String[]::new);
if (uniqueIds.length > 0) {
@@ -133,19 +133,18 @@ public class ConnectionManager implements BroadcastInterface {
DeviceSession oldSession = sessionsByDeviceId.remove(device.getId());
if (oldSession != null) {
- Endpoint oldEndpoint = new Endpoint(oldSession.getChannel(), oldSession.getRemoteAddress());
- Map<String, DeviceSession> oldEndpointSessions = sessionsByEndpoint.get(oldEndpoint);
+ Map<String, DeviceSession> oldEndpointSessions = sessionsByEndpoint.get(oldSession.getRemoteAddress());
if (oldEndpointSessions != null && oldEndpointSessions.size() > 1) {
oldEndpointSessions.remove(device.getUniqueId());
} else {
- sessionsByEndpoint.remove(oldEndpoint);
+ sessionsByEndpoint.remove(oldSession.getRemoteAddress());
}
}
DeviceSession deviceSession = new DeviceSession(
device.getId(), device.getUniqueId(), protocol, channel, remoteAddress);
endpointSessions.put(device.getUniqueId(), deviceSession);
- sessionsByEndpoint.put(endpoint, endpointSessions);
+ sessionsByEndpoint.put(remoteAddress, endpointSessions);
sessionsByDeviceId.put(device.getId(), deviceSession);
if (oldSession == null) {
@@ -182,8 +181,7 @@ public class ConnectionManager implements BroadcastInterface {
}
public void deviceDisconnected(Channel channel, boolean supportsOffline) {
- Endpoint endpoint = new Endpoint(channel, channel.remoteAddress());
- Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.remove(endpoint);
+ Map<String, DeviceSession> endpointSessions = sessionsByEndpoint.remove(channel.remoteAddress());
if (endpointSessions != null) {
for (DeviceSession deviceSession : endpointSessions.values()) {
if (supportsOffline) {
@@ -204,8 +202,7 @@ public class ConnectionManager implements BroadcastInterface {
DeviceSession deviceSession = sessionsByDeviceId.remove(deviceId);
if (deviceSession != null) {
cacheManager.removeDevice(deviceId);
- Endpoint endpoint = new Endpoint(deviceSession.getChannel(), deviceSession.getRemoteAddress());
- sessionsByEndpoint.computeIfPresent(endpoint, (e, sessions) -> {
+ sessionsByEndpoint.computeIfPresent(deviceSession.getRemoteAddress(), (e, sessions) -> {
sessions.remove(deviceSession.getUniqueId());
return sessions.isEmpty() ? null : sessions;
});
@@ -337,11 +334,24 @@ public class ConnectionManager implements BroadcastInterface {
}
}
+ public synchronized void updateLog(LogRecord record) {
+ var sessions = sessionsByEndpoint.getOrDefault(record.getAddress(), Map.of());
+ for (var session : sessions.entrySet()) {
+ record.setUniqueId(session.getKey());
+ for (long userId : deviceUsers.getOrDefault(session.getValue().getDeviceId(), Set.of())) {
+ for (UpdateListener listener : listeners.getOrDefault(userId, Set.of())) {
+ listener.onUpdateLog(record);
+ }
+ }
+ }
+ }
+
public interface UpdateListener {
void onKeepalive();
void onUpdateDevice(Device device);
void onUpdatePosition(Position position);
void onUpdateEvent(Event event);
+ void onUpdateLog(LogRecord record);
}
public synchronized void addListener(long userId, UpdateListener listener) throws StorageException {
diff --git a/src/main/java/org/traccar/session/Endpoint.java b/src/main/java/org/traccar/session/Endpoint.java
deleted file mode 100644
index 76aac3444..000000000
--- a/src/main/java/org/traccar/session/Endpoint.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2022 Anton Tananaev (anton@traccar.org)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.traccar.session;
-
-import io.netty.channel.Channel;
-
-import java.net.SocketAddress;
-import java.util.Objects;
-
-public class Endpoint {
-
- private final Channel channel;
- private final SocketAddress remoteAddress;
-
- public Endpoint(Channel channel, SocketAddress remoteAddress) {
- this.channel = channel;
- this.remoteAddress = remoteAddress;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public SocketAddress getRemoteAddress() {
- return remoteAddress;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Endpoint endpoint = (Endpoint) o;
- return channel.equals(endpoint.channel) && remoteAddress.equals(endpoint.remoteAddress);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(channel, remoteAddress);
- }
-
-}