Skip to content

Commit 95c3a79

Browse files
committed
Revisit the semaphore handling
1 parent 2532703 commit 95c3a79

File tree

2 files changed

+46
-34
lines changed

2 files changed

+46
-34
lines changed

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -309,19 +309,28 @@ void distributeToChildren(EventQueueItem item) {
309309
int childCount = children.size();
310310
if (LOGGER.isDebugEnabled()) {
311311
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
312-
taskId, item.getEvent().getClass().getSimpleName(), childCount);
312+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
313313
}
314314
children.forEach(child -> {
315315
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
316-
taskId, item.getEvent().getClass().getSimpleName());
316+
taskId, item.getEvent().getClass().getSimpleName());
317317
child.internalEnqueueItem(item);
318318
});
319319
if (LOGGER.isDebugEnabled()) {
320320
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
321-
taskId, item.getEvent().getClass().getSimpleName(), childCount);
321+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
322322
}
323323
}
324324

325+
/**
326+
* Release the semaphore after event processing is complete.
327+
* Called by MainEventBusProcessor in finally block to ensure release even on exceptions.
328+
* Balances the acquire() in enqueueEvent() - protects MainEventBus throughput.
329+
*/
330+
void releaseSemaphore() {
331+
semaphore.release();
332+
}
333+
325334
/**
326335
* Get the count of active child queues.
327336
* Used for testing to verify reference counting mechanism.
@@ -391,17 +400,12 @@ public void enqueueItem(EventQueueItem item) {
391400

392401
private void internalEnqueueItem(EventQueueItem item) {
393402
// Internal method called by MainEventBusProcessor to add to local queue
403+
// Note: Semaphore is managed by parent MainQueue (acquire/release), not ChildQueue
394404
Event event = item.getEvent();
395405
if (isClosed()) {
396406
LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event);
397407
return;
398408
}
399-
try {
400-
semaphore.acquire();
401-
} catch (InterruptedException e) {
402-
Thread.currentThread().interrupt();
403-
throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
404-
}
405409
queue.add(item);
406410
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
407411
}
@@ -418,7 +422,7 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
418422
if (item != null) {
419423
Event event = item.getEvent();
420424
LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
421-
semaphore.release();
425+
// Note: Semaphore is managed by parent MainQueue, not released here
422426
}
423427
return item;
424428
}
@@ -428,7 +432,7 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
428432
if (item != null) {
429433
Event event = item.getEvent();
430434
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
431-
semaphore.release();
435+
// Note: Semaphore is managed by parent MainQueue, not released here
432436
} else {
433437
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
434438
}

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -140,32 +140,40 @@ private void processEvent(MainEventBusContext context) {
140140
LOGGER.debug("MainEventBusProcessor: Processing event for task {}: {} (queue type: {})",
141141
taskId, event.getClass().getSimpleName(), eventQueue.getClass().getSimpleName());
142142

143-
// Step 1: Update TaskStore FIRST (persistence before clients see it)
144-
updateTaskStore(taskId, event);
145-
146-
// Step 2: Send push notification AFTER persistence (ensures notification sees latest state)
147-
sendPushNotification(taskId);
148-
149-
// Step 3: Then distribute to ChildQueues (clients see it AFTER persistence + notification)
150-
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
151-
int childCount = mainQueue.getChildCount();
152-
LOGGER.debug("MainEventBusProcessor: Distributing event to {} children for task {}", childCount, taskId);
153-
mainQueue.distributeToChildren(context.eventQueueItem());
154-
LOGGER.debug("MainEventBusProcessor: Distributed event {} to {} children for task {}",
155-
event.getClass().getSimpleName(), childCount, taskId);
156-
} else {
157-
LOGGER.warn("MainEventBusProcessor: Expected MainQueue but got {} for task {}",
158-
eventQueue.getClass().getSimpleName(), taskId);
159-
}
143+
try {
144+
// Step 1: Update TaskStore FIRST (persistence before clients see it)
145+
updateTaskStore(taskId, event);
146+
147+
// Step 2: Send push notification AFTER persistence (ensures notification sees latest state)
148+
sendPushNotification(taskId);
149+
150+
// Step 3: Then distribute to ChildQueues (clients see it AFTER persistence + notification)
151+
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
152+
int childCount = mainQueue.getChildCount();
153+
LOGGER.debug("MainEventBusProcessor: Distributing event to {} children for task {}", childCount, taskId);
154+
mainQueue.distributeToChildren(context.eventQueueItem());
155+
LOGGER.debug("MainEventBusProcessor: Distributed event {} to {} children for task {}",
156+
event.getClass().getSimpleName(), childCount, taskId);
157+
} else {
158+
LOGGER.warn("MainEventBusProcessor: Expected MainQueue but got {} for task {}",
159+
eventQueue.getClass().getSimpleName(), taskId);
160+
}
160161

161-
LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", taskId);
162+
LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", taskId);
162163

163-
// Step 4: Notify callback after all processing is complete
164-
callback.onEventProcessed(taskId, event);
164+
// Step 4: Notify callback after all processing is complete
165+
callback.onEventProcessed(taskId, event);
165166

166-
// Step 5: If this is a final event, notify task finalization
167-
if (isFinalEvent(event)) {
168-
callback.onTaskFinalized(taskId);
167+
// Step 5: If this is a final event, notify task finalization
168+
if (isFinalEvent(event)) {
169+
callback.onTaskFinalized(taskId);
170+
}
171+
} finally {
172+
// ALWAYS release semaphore, even if processing fails
173+
// Balances the acquire() in MainQueue.enqueueEvent()
174+
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
175+
mainQueue.releaseSemaphore();
176+
}
169177
}
170178
}
171179

0 commit comments

Comments
 (0)