Skip to content

Commit fc23321

Browse files
committed
Propagate events
1 parent 9d6a6ba commit fc23321

File tree

6 files changed

+105
-48
lines changed

6 files changed

+105
-48
lines changed

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,19 @@ void setUp() {
6767
.build();
6868
}
6969

70+
/**
71+
* Helper to create a test event with the specified taskId.
72+
* This ensures taskId consistency between queue creation and event creation.
73+
*/
74+
private TaskStatusUpdateEvent createEventForTask(String taskId) {
75+
return new TaskStatusUpdateEvent.Builder()
76+
.taskId(taskId)
77+
.contextId("test-context")
78+
.status(new TaskStatus(TaskState.SUBMITTED))
79+
.isFinal(false)
80+
.build();
81+
}
82+
7083
@AfterEach
7184
void tearDown() {
7285
if (mainEventBusProcessor != null) {
@@ -130,21 +143,23 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep
130143
@Test
131144
void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException {
132145
String taskId = "test-task-4";
146+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
133147
EventQueue queue = queueManager.createOrTap(taskId);
134148

135-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
149+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
136150
queueManager.onReplicatedEvent(replicatedEvent);
137151

138152
// Use retry logic to handle async MainEventBusProcessor distribution
139153
EventQueueItem item = dequeueEventWithRetry(queue, 5000);
140154
assertNotNull(item, "Event should be available in queue");
141155
Event dequeuedEvent = item.getEvent();
142-
assertEquals(testEvent, dequeuedEvent);
156+
assertEquals(eventForTask, dequeuedEvent);
143157
}
144158

145159
@Test
146160
void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
147161
String taskId = "non-existent-task";
162+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
148163

149164
// Verify no queue exists initially
150165
assertNull(queueManager.get(taskId));
@@ -158,7 +173,7 @@ void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
158173
EventQueue mainQueue = queueManager.get(taskId);
159174
assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
160175

161-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
176+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
162177

163178
// Process the replicated event
164179
assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
@@ -168,7 +183,7 @@ void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
168183
EventQueueItem item = dequeueEventWithRetry(childQueue, 5000);
169184
assertNotNull(item, "Event should be available in queue");
170185
Event dequeuedEvent = item.getEvent();
171-
assertEquals(testEvent, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
186+
assertEquals(eventForTask, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
172187
}
173188

174189
@Test
@@ -348,6 +363,7 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
348363
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
349364

350365
String taskId = "active-task";
366+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
351367

352368
// Verify no queue exists initially
353369
assertNull(queueManager.get(taskId));
@@ -362,15 +378,15 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
362378
assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
363379

364380
// Process a replicated event for an active task
365-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
381+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
366382
queueManager.onReplicatedEvent(replicatedEvent);
367383

368384
// Verify the event was enqueued and distributed to our ChildQueue
369385
// Use retry logic to handle async MainEventBusProcessor distribution
370386
EventQueueItem item = dequeueEventWithRetry(childQueue, 5000);
371387
assertNotNull(item, "Event should be available in queue");
372388
Event dequeuedEvent = item.getEvent();
373-
assertEquals(testEvent, dequeuedEvent, "Event should be enqueued for active task");
389+
assertEquals(eventForTask, dequeuedEvent, "Event should be enqueued for active task");
374390
}
375391

376392

@@ -482,10 +498,11 @@ void testQueueClosedEventJsonSerialization() throws Exception {
482498
@Test
483499
void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedException {
484500
String taskId = "remote-close-test";
501+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
485502
EventQueue queue = queueManager.createOrTap(taskId);
486503

487504
// Enqueue a normal event
488-
queue.enqueueEvent(testEvent);
505+
queue.enqueueEvent(eventForTask);
489506

490507
// Simulate receiving QueueClosedEvent from remote node
491508
QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
@@ -495,7 +512,7 @@ void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedExcept
495512
// Dequeue the normal event first (use retry for async processing)
496513
EventQueueItem item1 = dequeueEventWithRetry(queue, 5000);
497514
assertNotNull(item1, "First event should be available");
498-
assertEquals(testEvent, item1.getEvent());
515+
assertEquals(eventForTask, item1.getEvent());
499516

500517
// Next dequeue should get the QueueClosedEvent (use retry for async processing)
501518
EventQueueItem item2 = dequeueEventWithRetry(queue, 5000);

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.a2a.server.tasks.TaskStore;
1111
import io.a2a.spec.A2AServerException;
1212
import io.a2a.spec.Event;
13+
import io.a2a.spec.InternalError;
1314
import io.a2a.spec.Task;
1415
import io.a2a.spec.TaskArtifactUpdateEvent;
1516
import io.a2a.spec.TaskStatusUpdateEvent;
@@ -140,39 +141,66 @@ private void processEvent(MainEventBusContext context) {
140141
LOGGER.debug("MainEventBusProcessor: Processing event for task {}: {} (queue type: {})",
141142
taskId, event.getClass().getSimpleName(), eventQueue.getClass().getSimpleName());
142143

144+
Event eventToDistribute = null;
143145
try {
144146
// Step 1: Update TaskStore FIRST (persistence before clients see it)
145-
updateTaskStore(taskId, event);
147+
// If this throws, we distribute an error to ensure "persist before client visibility"
146148

147-
// Step 2: Send push notification AFTER persistence (ensures notification sees latest state)
148-
sendPushNotification(taskId);
149+
try {
150+
updateTaskStore(taskId, event);
151+
eventToDistribute = event; // Success - distribute original event
152+
} catch (InternalError e) {
153+
// Persistence failed - create error event to distribute instead
154+
LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e);
155+
String errorMessage = "Failed to persist event: " + e.getMessage();
156+
eventToDistribute = e;
157+
} catch (Exception e) {
158+
LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e);
159+
String errorMessage = "Failed to persist event: " + e.getMessage();
160+
eventToDistribute = new InternalError(errorMessage);
161+
}
162+
163+
// Step 2: Send push notification AFTER successful persistence
164+
if (eventToDistribute == event) {
165+
sendPushNotification(taskId);
166+
}
149167

150-
// Step 3: Then distribute to ChildQueues (clients see it AFTER persistence + notification)
168+
// Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt)
151169
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
152170
int childCount = mainQueue.getChildCount();
153-
LOGGER.debug("MainEventBusProcessor: Distributing event to {} children for task {}", childCount, taskId);
154-
mainQueue.distributeToChildren(context.eventQueueItem());
155-
LOGGER.debug("MainEventBusProcessor: Distributed event {} to {} children for task {}",
156-
event.getClass().getSimpleName(), childCount, taskId);
171+
LOGGER.debug("MainEventBusProcessor: Distributing {} to {} children for task {}",
172+
eventToDistribute.getClass().getSimpleName(), childCount, taskId);
173+
// Create new EventQueueItem with the event to distribute (original or error)
174+
EventQueueItem itemToDistribute = new LocalEventQueueItem(eventToDistribute);
175+
mainQueue.distributeToChildren(itemToDistribute);
176+
LOGGER.debug("MainEventBusProcessor: Distributed {} to {} children for task {}",
177+
eventToDistribute.getClass().getSimpleName(), childCount, taskId);
157178
} else {
158179
LOGGER.warn("MainEventBusProcessor: Expected MainQueue but got {} for task {}",
159180
eventQueue.getClass().getSimpleName(), taskId);
160181
}
161182

162183
LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", taskId);
163184

164-
// Step 4: Notify callback after all processing is complete
165-
callback.onEventProcessed(taskId, event);
166-
167-
// Step 5: If this is a final event, notify task finalization
168-
if (isFinalEvent(event)) {
169-
callback.onTaskFinalized(taskId);
170-
}
171185
} finally {
172-
// ALWAYS release semaphore, even if processing fails
173-
// Balances the acquire() in MainQueue.enqueueEvent()
174-
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
175-
mainQueue.releaseSemaphore();
186+
try {
187+
// Step 4: Notify callback after all processing is complete
188+
// Call callback with the distributed event (original or error)
189+
if (eventToDistribute != null) {
190+
callback.onEventProcessed(taskId, eventToDistribute);
191+
192+
// Step 5: If this is a final event, notify task finalization
193+
// Only for successful persistence (not for errors)
194+
if (eventToDistribute == event && isFinalEvent(event)) {
195+
callback.onTaskFinalized(taskId);
196+
}
197+
}
198+
} finally {
199+
// ALWAYS release semaphore, even if processing fails
200+
// Balances the acquire() in MainQueue.enqueueEvent()
201+
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
202+
mainQueue.releaseSemaphore();
203+
}
176204
}
177205
}
178206
}
@@ -184,8 +212,15 @@ private void processEvent(MainEventBusContext context) {
184212
* which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent).
185213
* This leverages existing TaskManager logic for status updates, artifact appending, message history, etc.
186214
* </p>
215+
* <p>
216+
* If persistence fails, the exception is propagated to processEvent() which distributes an
217+
* InternalError to clients instead of the original event, ensuring "persist before visibility".
218+
* See Gemini's comment: https://github.com/a2aproject/a2a-java/pull/515#discussion_r2604621833
219+
* </p>
220+
*
221+
* @throws InternalError if persistence fails
187222
*/
188-
private void updateTaskStore(String taskId, Event event) {
223+
private void updateTaskStore(String taskId, Event event) throws InternalError {
189224
try {
190225
// Extract contextId from event (all relevant events have it)
191226
String contextId = extractContextId(event);
@@ -197,12 +232,14 @@ private void updateTaskStore(String taskId, Event event) {
197232
taskManager.process(event);
198233
LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {}",
199234
taskId, event.getClass().getSimpleName());
200-
} catch (A2AServerException e) {
235+
} catch (InternalError e) {
201236
LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e);
202-
// Don't rethrow - we still want to distribute to ChildQueues
237+
// Rethrow to prevent distributing unpersisted event to clients
238+
throw e;
203239
} catch (Exception e) {
204240
LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e);
205-
// Don't rethrow - we still want to distribute to ChildQueues
241+
// Rethrow to prevent distributing unpersisted event to clients
242+
throw new InternalError("TaskStore persistence failed: " + e.getMessage());
206243
}
207244
}
208245

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
127127

128128
// Determine interrupt behavior
129129
boolean shouldInterrupt = false;
130-
boolean continueInBackground = false;
131130
boolean isFinalEvent = (event instanceof Task task && task.getStatus().state().isFinal())
132131
|| (event instanceof TaskStatusUpdateEvent tsue && tsue.isFinal());
133132
boolean isAuthRequired = (event instanceof Task task && task.getStatus().state() == TaskState.AUTH_REQUIRED)
@@ -142,19 +141,16 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
142141
// new request is expected in order for the agent to make progress,
143142
// so the agent should exit.
144143
shouldInterrupt = true;
145-
continueInBackground = true;
146144
}
147145
else if (!blocking) {
148146
// For non-blocking calls, interrupt as soon as a task is available.
149147
shouldInterrupt = true;
150-
continueInBackground = true;
151148
}
152149
else if (blocking) {
153150
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
154151
// Python's async consumption doesn't block threads, but Java's does
155152
// So we interrupt to return quickly, then rely on background consumption
156153
shouldInterrupt = true;
157-
continueInBackground = true;
158154
if (LOGGER.isDebugEnabled()) {
159155
LOGGER.debug("Blocking call for task {}: {} event, returning with background consumption",
160156
taskIdForLogging(), isFinalEvent ? "final" : "non-final");

server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
public class EventConsumerTest {
3838

3939
private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
40+
private static final String TASK_ID = "123"; // Must match MINIMAL_TASK id
4041

4142
private EventQueue eventQueue;
4243
private EventConsumer eventConsumer;
@@ -70,6 +71,7 @@ public void init() {
7071
EventQueueUtil.start(mainEventBusProcessor);
7172

7273
eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
74+
.taskId(TASK_ID)
7375
.mainEventBus(mainEventBus)
7476
.build().tap();
7577
eventConsumer = new EventConsumer(eventQueue);
@@ -116,15 +118,15 @@ public void testConsumeAllMultipleEvents() throws JsonProcessingException {
116118
List<Event> events = List.of(
117119
Utils.unmarshalFrom(MINIMAL_TASK, Task.class),
118120
new TaskArtifactUpdateEvent.Builder()
119-
.taskId("task-123")
121+
.taskId(TASK_ID)
120122
.contextId("session-xyz")
121123
.artifact(new Artifact.Builder()
122124
.artifactId("11")
123125
.parts(new TextPart("text"))
124126
.build())
125127
.build(),
126128
new TaskStatusUpdateEvent.Builder()
127-
.taskId("task-123")
129+
.taskId(TASK_ID)
128130
.contextId("session-xyz")
129131
.status(new TaskStatus(TaskState.WORKING))
130132
.isFinal(true)
@@ -177,15 +179,15 @@ public void testConsumeUntilMessage() throws Exception {
177179
List<Event> events = List.of(
178180
Utils.unmarshalFrom(MINIMAL_TASK, Task.class),
179181
new TaskArtifactUpdateEvent.Builder()
180-
.taskId("task-123")
182+
.taskId(TASK_ID)
181183
.contextId("session-xyz")
182184
.artifact(new Artifact.Builder()
183185
.artifactId("11")
184186
.parts(new TextPart("text"))
185187
.build())
186188
.build(),
187189
new TaskStatusUpdateEvent.Builder()
188-
.taskId("task-123")
190+
.taskId(TASK_ID)
189191
.contextId("session-xyz")
190192
.status(new TaskStatus(TaskState.WORKING))
191193
.isFinal(true)

0 commit comments

Comments
 (0)