Skip to content

Commit ff79b6d

Browse files
committed
Clean up event queue logging a bit
1 parent 954d41d commit ff79b6d

File tree

6 files changed

+37
-19
lines changed

6 files changed

+37
-19
lines changed

core/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
public class EventConsumer {
1717
private final EventQueue queue;
1818
private Throwable error;
19-
private final Executor executor = Executors.newCachedThreadPool();
20-
2119

2220
private static final String ERROR_MSG = "Agent did not return any response";
2321
private static final int NO_WAIT = -1;

core/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.concurrent.CopyOnWriteArrayList;
77
import java.util.concurrent.CountDownLatch;
88
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.AtomicBoolean;
910

1011
import io.a2a.util.TempLoggerWrapper;
1112
import org.slf4j.Logger;
@@ -35,7 +36,9 @@ public static EventQueue create() {
3536
return new MainQueue();
3637
}
3738

38-
abstract CountDownLatch getPollingStartedLatch();
39+
public abstract void awaitQueuePollerStart() throws InterruptedException ;
40+
41+
abstract void signalQueuePollerStarted();
3942

4043
public void enqueueEvent(Event event) {
4144
if (closed) {
@@ -71,13 +74,12 @@ public Event dequeueEvent(int waitMilliSeconds) throws EventQueueClosedException
7174
}
7275
return event;
7376
} catch (InterruptedException e) {
74-
log.debug("Interrupted {}", this);
77+
log.debug("Interrupted dequeue (waiting) {}", this);
7578
Thread.currentThread().interrupt();
7679
return null;
7780
}
7881
} finally {
79-
log.debug("Signalling that queue polling started {}", this);
80-
getPollingStartedLatch().countDown();
82+
signalQueuePollerStarted();
8183
}
8284
}
8385

@@ -103,7 +105,8 @@ public void close() {
103105

104106
static class MainQueue extends EventQueue {
105107
private final List<ChildQueue> children = new CopyOnWriteArrayList<>();
106-
private CountDownLatch pollingStartedLatch = new CountDownLatch(1);
108+
private final CountDownLatch pollingStartedLatch = new CountDownLatch(1);
109+
private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
107110

108111
EventQueue tap() {
109112
ChildQueue child = new ChildQueue(this);
@@ -116,11 +119,22 @@ public void enqueueEvent(Event event) {
116119
children.forEach(eq -> eq.internalEnqueueEvent(event));
117120
}
118121

119-
CountDownLatch getPollingStartedLatch() {
120-
return pollingStartedLatch;
122+
@Override
123+
public void awaitQueuePollerStart() throws InterruptedException {
124+
log.debug("Waiting for queue poller to start om {}", this);
125+
pollingStartedLatch.await(10, TimeUnit.SECONDS);
126+
log.debug("Queue poller started on {}", this);
121127
}
122128

123-
129+
@Override
130+
void signalQueuePollerStarted() {
131+
if (pollingStarted.get()) {
132+
return;
133+
}
134+
log.debug("Signalling that queue polling started {}", this);
135+
pollingStartedLatch.countDown();
136+
pollingStarted.set(true);
137+
}
124138

125139
@Override
126140
public void close() {
@@ -151,8 +165,13 @@ EventQueue tap() {
151165
}
152166

153167
@Override
154-
CountDownLatch getPollingStartedLatch() {
155-
return parent.getPollingStartedLatch();
168+
public void awaitQueuePollerStart() throws InterruptedException {
169+
parent.awaitQueuePollerStart();
170+
}
171+
172+
@Override
173+
void signalQueuePollerStarted() {
174+
parent.signalQueuePollerStarted();
156175
}
157176
}
158177
}

core/src/main/java/io/a2a/server/events/InMemoryQueueManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77

88
import jakarta.enterprise.context.ApplicationScoped;
99

10+
import io.a2a.util.TempLoggerWrapper;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
1014
@ApplicationScoped
1115
public class InMemoryQueueManager implements QueueManager {
12-
13-
1416
private final Map<String, EventQueue> queues = Collections.synchronizedMap(new HashMap<>());
1517

1618
@Override
@@ -63,7 +65,7 @@ public EventQueue createOrTap(String taskId) {
6365
}
6466

6567
@Override
66-
public void signalPollingStarted(EventQueue eventQueue) throws InterruptedException {
67-
eventQueue.getPollingStartedLatch().await(10, TimeUnit.SECONDS);
68+
public void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedException {
69+
eventQueue.awaitQueuePollerStart();
6870
}
6971
}

core/src/main/java/io/a2a/server/events/QueueManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ public interface QueueManager {
1111

1212
EventQueue createOrTap(String taskId);
1313

14-
void signalPollingStarted(EventQueue eventQueue) throws InterruptedException;
14+
void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedException;
1515
}

core/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestCont
355355
public void run() {
356356
agentExecutor.execute(requestContext, queue);
357357
try {
358-
queueManager.signalPollingStarted(queue);
358+
queueManager.awaitQueuePollerStart(queue);
359359
} catch (InterruptedException e) {
360360
Thread.currentThread().interrupt();
361361
}

core/src/main/java/io/a2a/server/requesthandlers/JSONRPCHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.a2a.server.requesthandlers;
22

3-
import static io.a2a.util.AsyncUtils.convertingProcessor;
43
import static io.a2a.util.AsyncUtils.createTubeConfig;
54

65
import java.util.concurrent.Flow;

0 commit comments

Comments
 (0)