From 2a0b2dee5eecd55f86c51fe2a25f7ac7484d6bde Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Tue, 21 Mar 2023 13:40:50 -0700 Subject: Response after position processed --- .../traccar/handler/AcknowledgementHandler.java | 121 +++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 src/main/java/org/traccar/handler/AcknowledgementHandler.java (limited to 'src/main/java/org/traccar/handler/AcknowledgementHandler.java') diff --git a/src/main/java/org/traccar/handler/AcknowledgementHandler.java b/src/main/java/org/traccar/handler/AcknowledgementHandler.java new file mode 100644 index 000000000..c3f6038de --- /dev/null +++ b/src/main/java/org/traccar/handler/AcknowledgementHandler.java @@ -0,0 +1,121 @@ +/* + * Copyright 2023 Anton Tananaev (anton@traccar.org) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.traccar.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +public class AcknowledgementHandler extends ChannelOutboundHandlerAdapter { + + private static final Logger LOGGER = LoggerFactory.getLogger(AcknowledgementHandler.class); + + public interface Event { + } + + public static class EventReceived implements Event { + } + + public static class EventDecoded implements Event { + private final Collection objects; + + public EventDecoded(Collection objects) { + this.objects = objects; + } + + public Collection getObjects() { + return objects; + } + } + + public static class EventHandled implements Event { + private final Object object; + + public EventHandled(Object object) { + this.object = object; + } + + public Object getObject() { + return object; + } + } + + private static class Entry { + private final Object message; + private final ChannelPromise promise; + + private Entry(Object message, ChannelPromise promise) { + this.message = message; + this.promise = promise; + } + + public Object getMessage() { + return message; + } + + public ChannelPromise getPromise() { + return promise; + } + } + + private List queue; + private final Set waiting = new HashSet<>(); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + List output = new LinkedList<>(); + synchronized (this) { + if (msg instanceof Event) { + if (msg instanceof EventReceived) { + LOGGER.debug("Event received"); + if (queue == null) { + queue = new LinkedList<>(); + } + } else if (msg instanceof EventDecoded) { + EventDecoded event = (EventDecoded) msg; + LOGGER.debug("Event decoded {}", event.getObjects().size()); + waiting.addAll(event.getObjects()); + } else if (msg instanceof EventHandled) { + EventHandled event = (EventHandled) msg; + LOGGER.debug("Event handled"); + waiting.remove(event.getObject()); + } + if (!(msg instanceof EventReceived) && waiting.isEmpty()) { + output.addAll(queue); + queue = null; + } + } else if (queue != null) { + LOGGER.debug("Message queued"); + queue.add(new Entry(msg, promise)); + } else { + LOGGER.debug("Message sent"); + output.add(new Entry(msg, promise)); + } + } + for (Entry entry : output) { + ctx.write(entry.getMessage(), entry.getPromise()); + } + } + +} -- cgit v1.2.3 From a07e078645418720ad4bbe4ce352cae607c42175 Mon Sep 17 00:00:00 2001 From: Anton Tananaev Date: Tue, 21 Mar 2023 13:44:14 -0700 Subject: Fix lint check --- src/main/java/org/traccar/handler/AcknowledgementHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/main/java/org/traccar/handler/AcknowledgementHandler.java') diff --git a/src/main/java/org/traccar/handler/AcknowledgementHandler.java b/src/main/java/org/traccar/handler/AcknowledgementHandler.java index c3f6038de..4c1085998 100644 --- a/src/main/java/org/traccar/handler/AcknowledgementHandler.java +++ b/src/main/java/org/traccar/handler/AcknowledgementHandler.java @@ -61,7 +61,7 @@ public class AcknowledgementHandler extends ChannelOutboundHandlerAdapter { } } - private static class Entry { + private static final class Entry { private final Object message; private final ChannelPromise promise; -- cgit v1.2.3