Skip to content

Commit c326084

Browse files
committed
Move sepaphore into MainQueue
1 parent cfd539f commit c326084

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ public abstract class EventQueue implements AutoCloseable {
2121
public static final int DEFAULT_QUEUE_SIZE = 1000;
2222

2323
private final int queueSize;
24-
protected final Semaphore semaphore;
2524
private volatile boolean closed = false;
2625

2726
protected EventQueue() {
@@ -33,7 +32,6 @@ protected EventQueue(int queueSize) {
3332
throw new IllegalArgumentException("Queue size must be greater than 0");
3433
}
3534
this.queueSize = queueSize;
36-
this.semaphore = new Semaphore(queueSize, true);
3735
LOGGER.trace("Creating {} with queue size: {}", this, queueSize);
3836
}
3937

@@ -179,6 +177,7 @@ protected void doClose(boolean immediate) {
179177

180178
static class MainQueue extends EventQueue {
181179
private final List<ChildQueue> children = new CopyOnWriteArrayList<>();
180+
protected final Semaphore semaphore;
182181
private final CountDownLatch pollingStartedLatch = new CountDownLatch(1);
183182
private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
184183
private final EventEnqueueHook enqueueHook;
@@ -189,6 +188,7 @@ static class MainQueue extends EventQueue {
189188

190189
MainQueue(int queueSize, EventEnqueueHook hook, String taskId, List<Runnable> onCloseCallbacks, TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
191190
super(queueSize);
191+
this.semaphore = new Semaphore(queueSize, true);
192192
this.enqueueHook = hook;
193193
this.taskId = taskId;
194194
this.onCloseCallbacks = List.copyOf(onCloseCallbacks); // Defensive copy
@@ -406,8 +406,12 @@ private void internalEnqueueItem(EventQueueItem item) {
406406
LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event);
407407
return;
408408
}
409-
queue.add(item);
410-
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
409+
if (!queue.offer(item)) {
410+
LOGGER.warn("ChildQueue {} is full. Closing immediately.", this);
411+
close(true); // immediate close
412+
} else {
413+
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
414+
}
411415
}
412416

413417
@Override
@@ -422,7 +426,6 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
422426
if (item != null) {
423427
Event event = item.getEvent();
424428
LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
425-
// Note: Semaphore is managed by parent MainQueue, not released here
426429
}
427430
return item;
428431
}
@@ -432,7 +435,6 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
432435
if (item != null) {
433436
Event event = item.getEvent();
434437
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
435-
// Note: Semaphore is managed by parent MainQueue, not released here
436438
} else {
437439
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
438440
}
@@ -475,8 +477,6 @@ protected void doClose(boolean immediate) {
475477
// Immediate close: clear pending events from local queue
476478
int clearedCount = queue.size();
477479
queue.clear();
478-
// Release semaphore permits for cleared events to prevent deadlock
479-
semaphore.release(clearedCount);
480480
LOGGER.debug("Cleared {} events from ChildQueue for immediate close: {}", clearedCount, this);
481481
}
482482
// For graceful close, let the queue drain naturally through normal consumption

0 commit comments

Comments
 (0)