Skip to content

Commit eee3bb8

Browse files
authored
test: add test for streaming with multiple events (#326)
## Summary Implements tests for streaming message scenarios where multiple events are received from the server, addressing Issue #52. This test validates the proper handling of event streams in AI agent-to-agent communication using the A2A Protocol. ## Overview The `testOnMessageStreamNewMessageMultipleEventsSuccess()` method verifies that the JSON-RPC handler correctly processes a sequence of multiple events during streaming communication: 1. **Task Event (WORKING state)** - Initial task state update 2. **TaskArtifactUpdateEvent** - Artifact generation during task execution 3. **TaskStatusUpdateEvent (COMPLETED state)** - Final task completion signal - [x] Follow the [`CONTRIBUTING` Guide](../CONTRIBUTING.md). - [x] Make your Pull Request title in the <https://www.conventionalcommits.org/> specification. - Important Prefixes for [release-please](https://github.com/googleapis/release-please): - `fix:` which represents bug fixes, and correlates to a [SemVer](https://semver.org/) patch. - `feat:` represents a new feature, and correlates to a SemVer minor. - `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking change (indicated by the `!`) and will result in a SemVer major. - [x] Ensure the tests pass - [x] Appropriate READMEs were updated (if necessary) Fixes #52 🦕 ---
1 parent 75a9459 commit eee3bb8

File tree

1 file changed

+108
-0
lines changed

1 file changed

+108
-0
lines changed

transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,114 @@ public void onComplete() {
335335
Assertions.assertSame(message, results.get(0));
336336
}
337337

338+
@Test
339+
public void testOnMessageStreamNewMessageMultipleEventsSuccess() throws InterruptedException {
340+
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler);
341+
342+
// Create multiple events to be sent during streaming
343+
Task taskEvent = new Task.Builder(MINIMAL_TASK)
344+
.status(new TaskStatus(TaskState.WORKING))
345+
.build();
346+
347+
TaskArtifactUpdateEvent artifactEvent = new TaskArtifactUpdateEvent.Builder()
348+
.taskId(MINIMAL_TASK.getId())
349+
.contextId(MINIMAL_TASK.getContextId())
350+
.artifact(new Artifact.Builder()
351+
.artifactId("artifact-1")
352+
.parts(new TextPart("Generated artifact content"))
353+
.build())
354+
.build();
355+
356+
TaskStatusUpdateEvent statusEvent = new TaskStatusUpdateEvent.Builder()
357+
.taskId(MINIMAL_TASK.getId())
358+
.contextId(MINIMAL_TASK.getContextId())
359+
.status(new TaskStatus(TaskState.COMPLETED))
360+
.build();
361+
362+
// Configure the agent executor to enqueue multiple events
363+
agentExecutorExecute = (context, eventQueue) -> {
364+
// Enqueue the task with WORKING state
365+
eventQueue.enqueueEvent(taskEvent);
366+
// Enqueue an artifact update event
367+
eventQueue.enqueueEvent(artifactEvent);
368+
// Enqueue a status update event to complete the task (this is the "final" event)
369+
eventQueue.enqueueEvent(statusEvent);
370+
};
371+
372+
Message message = new Message.Builder(MESSAGE)
373+
.taskId(MINIMAL_TASK.getId())
374+
.contextId(MINIMAL_TASK.getContextId())
375+
.build();
376+
377+
SendStreamingMessageRequest request = new SendStreamingMessageRequest(
378+
"1", new MessageSendParams(message, null, null));
379+
Flow.Publisher<SendStreamingMessageResponse> response = handler.onMessageSendStream(request, callContext);
380+
381+
List<StreamingEventKind> results = new ArrayList<>();
382+
CountDownLatch latch = new CountDownLatch(3); // Expect 3 events
383+
AtomicReference<Throwable> error = new AtomicReference<>();
384+
385+
response.subscribe(new Flow.Subscriber<>() {
386+
private Flow.Subscription subscription;
387+
388+
@Override
389+
public void onSubscribe(Flow.Subscription subscription) {
390+
this.subscription = subscription;
391+
subscription.request(1);
392+
}
393+
394+
@Override
395+
public void onNext(SendStreamingMessageResponse item) {
396+
results.add(item.getResult());
397+
subscription.request(1);
398+
latch.countDown();
399+
}
400+
401+
@Override
402+
public void onError(Throwable throwable) {
403+
error.set(throwable);
404+
subscription.cancel();
405+
// Release latch to prevent timeout
406+
while (latch.getCount() > 0) {
407+
latch.countDown();
408+
}
409+
}
410+
411+
@Override
412+
public void onComplete() {
413+
subscription.cancel();
414+
}
415+
});
416+
417+
// Wait for all events to be received
418+
Assertions.assertTrue(latch.await(2, TimeUnit.SECONDS),
419+
"Expected to receive 3 events within timeout");
420+
421+
// Assert no error occurred during streaming
422+
Assertions.assertNull(error.get(), "No error should occur during streaming");
423+
424+
// Verify that all 3 events were received
425+
assertEquals(3, results.size(), "Should have received exactly 3 events");
426+
427+
// Verify the first event is the task
428+
Task receivedTask = assertInstanceOf(Task.class, results.get(0), "First event should be a Task");
429+
assertEquals(MINIMAL_TASK.getId(), receivedTask.getId());
430+
assertEquals(MINIMAL_TASK.getContextId(), receivedTask.getContextId());
431+
assertEquals(TaskState.WORKING, receivedTask.getStatus().state());
432+
433+
// Verify the second event is the artifact update
434+
TaskArtifactUpdateEvent receivedArtifact = assertInstanceOf(TaskArtifactUpdateEvent.class, results.get(1),
435+
"Second event should be a TaskArtifactUpdateEvent");
436+
assertEquals(MINIMAL_TASK.getId(), receivedArtifact.getTaskId());
437+
assertEquals("artifact-1", receivedArtifact.getArtifact().artifactId());
438+
439+
// Verify the third event is the status update
440+
TaskStatusUpdateEvent receivedStatus = assertInstanceOf(TaskStatusUpdateEvent.class, results.get(2),
441+
"Third event should be a TaskStatusUpdateEvent");
442+
assertEquals(MINIMAL_TASK.getId(), receivedStatus.getTaskId());
443+
assertEquals(TaskState.COMPLETED, receivedStatus.getStatus().state());
444+
}
445+
338446
@Test
339447
public void testOnMessageStreamNewMessageSuccessMocks() {
340448
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler);

0 commit comments

Comments
 (0)