Skip to content

Commit 8f5db10

Browse files
committed
TCK fixes
1 parent 29f0b97 commit 8f5db10

File tree

4 files changed

+26
-13
lines changed

4 files changed

+26
-13
lines changed

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
6565
}
6666
event = item.getEvent();
6767

68+
// Defensive logging for error handling
6869
if (event instanceof Throwable thr) {
70+
LOGGER.info("EventConsumer detected Throwable event: {} - triggering tube.fail()",
71+
thr.getClass().getSimpleName());
6972
tube.fail(thr);
7073
return;
7174
}

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ private void processEvent(MainEventBusContext context) {
167167
}
168168

169169
// Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt)
170+
if (eventToDistribute == null) {
171+
LOGGER.error("MainEventBusProcessor: eventToDistribute is NULL for task {} - this should never happen!", taskId);
172+
eventToDistribute = new InternalError("Internal error: event processing failed");
173+
}
174+
170175
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
171176
int childCount = mainQueue.getChildCount();
172177
LOGGER.debug("MainEventBusProcessor: Distributing {} to {} children for task {}",

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
* This interface is primarily intended for testing, allowing tests to synchronize
99
* with the asynchronous MainEventBusProcessor. Production code should not rely on this.
1010
* </p>
11-
* <p>
1211
* Usage in tests:
1312
* <pre>
1413
* {@code
@@ -31,7 +30,6 @@
3130
* }
3231
* }
3332
* </pre>
34-
* </p>
3533
*/
3634
public interface MainEventBusProcessorCallback {
3735

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.a2a.spec.A2AError;
1717
import io.a2a.spec.Event;
1818
import io.a2a.spec.EventKind;
19+
import io.a2a.spec.InternalError;
1920
import io.a2a.spec.Message;
2021
import io.a2a.spec.Task;
2122
import io.a2a.spec.TaskState;
@@ -96,6 +97,7 @@ public EventKind consumeAll(EventConsumer consumer) throws A2AError {
9697
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws A2AError {
9798
Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
9899
AtomicReference<Message> message = new AtomicReference<>();
100+
AtomicReference<Task> capturedTask = new AtomicReference<>(); // Capture Task events
99101
AtomicBoolean interrupted = new AtomicBoolean(false);
100102
AtomicReference<Throwable> errorRef = new AtomicReference<>();
101103
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
@@ -129,6 +131,11 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
129131
return false;
130132
}
131133

134+
// Capture Task events (especially for new tasks where taskManager.getTask() would return null)
135+
if (event instanceof Task t) {
136+
capturedTask.set(t);
137+
}
138+
132139
// TaskStore update moved to MainEventBusProcessor
133140

134141
// Determine interrupt behavior
@@ -229,20 +236,20 @@ else if (blocking) {
229236
Utils.rethrow(error);
230237
}
231238

232-
EventKind eventType;
233-
Message msg = message.get();
234-
if (msg != null) {
235-
eventType = msg;
236-
} else {
237-
Task task = taskManager.getTask();
238-
if (task == null) {
239-
throw new io.a2a.spec.InternalError("No task or message available after consuming events");
240-
}
241-
eventType = task;
239+
// Return Message if captured, otherwise Task if captured, otherwise fetch from TaskStore
240+
EventKind eventKind = message.get();
241+
if (eventKind == null) {
242+
eventKind = capturedTask.get();
243+
}
244+
if (eventKind == null) {
245+
eventKind = taskManager.getTask();
246+
}
247+
if (eventKind == null) {
248+
throw new InternalError("Could not find a Task/Message for " + taskManager.getTaskId());
242249
}
243250

244251
return new EventTypeAndInterrupt(
245-
eventType,
252+
eventKind,
246253
interrupted.get(),
247254
consumptionCompletionFuture);
248255
}

0 commit comments

Comments
 (0)