Skip to content

Commit aa4f604

Browse files
committed
TCK fixes
1 parent c326084 commit aa4f604

File tree

4 files changed

+24
-3
lines changed

4 files changed

+24
-3
lines changed

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
6464
}
6565
event = item.getEvent();
6666

67+
// Defensive logging for error handling
6768
if (event instanceof Throwable thr) {
69+
LOGGER.info("EventConsumer detected Throwable event: {} - triggering tube.fail()",
70+
thr.getClass().getSimpleName());
6871
tube.fail(thr);
6972
return;
7073
}

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ private void processEvent(MainEventBusContext context) {
166166
}
167167

168168
// Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt)
169+
if (eventToDistribute == null) {
170+
LOGGER.error("MainEventBusProcessor: eventToDistribute is NULL for task {} - this should never happen!", taskId);
171+
eventToDistribute = new InternalError("Internal error: event processing failed");
172+
}
173+
169174
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
170175
int childCount = mainQueue.getChildCount();
171176
LOGGER.debug("MainEventBusProcessor: Distributing {} to {} children for task {}",

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
* This interface is primarily intended for testing, allowing tests to synchronize
99
* with the asynchronous MainEventBusProcessor. Production code should not rely on this.
1010
* </p>
11-
* <p>
1211
* Usage in tests:
1312
* <pre>
1413
* {@code
@@ -31,7 +30,6 @@
3130
* }
3231
* }
3332
* </pre>
34-
* </p>
3533
*/
3634
public interface MainEventBusProcessorCallback {
3735

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
9090
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError {
9191
Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
9292
AtomicReference<Message> message = new AtomicReference<>();
93+
AtomicReference<Task> capturedTask = new AtomicReference<>(); // Capture Task events
9394
AtomicBoolean interrupted = new AtomicBoolean(false);
9495
AtomicReference<Throwable> errorRef = new AtomicReference<>();
9596
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
@@ -123,6 +124,11 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
123124
return false;
124125
}
125126

127+
// Capture Task events (especially for new tasks where taskManager.getTask() would return null)
128+
if (event instanceof Task t) {
129+
capturedTask.set(t);
130+
}
131+
126132
// TaskStore update moved to MainEventBusProcessor
127133

128134
// Determine interrupt behavior
@@ -223,8 +229,17 @@ else if (blocking) {
223229
Utils.rethrow(error);
224230
}
225231

232+
// Return Message if captured, otherwise Task if captured, otherwise fetch from TaskStore
233+
EventKind eventKind = message.get();
234+
if (eventKind == null) {
235+
eventKind = capturedTask.get();
236+
}
237+
if (eventKind == null) {
238+
eventKind = taskManager.getTask();
239+
}
240+
226241
return new EventTypeAndInterrupt(
227-
message.get() != null ? message.get() : taskManager.getTask(),
242+
eventKind,
228243
interrupted.get(),
229244
consumptionCompletionFuture);
230245
}

0 commit comments

Comments
 (0)