diff --git a/transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java index e4e11f294..fb86ee49d 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java +++ b/transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java @@ -6,6 +6,7 @@ import jakarta.inject.Inject; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow; import io.a2a.server.ExtendedAgentCard; @@ -242,37 +243,39 @@ private Flow.Publisher convertToSendStreamingMessa // We can't use the normal convertingProcessor since that propagates any errors as an error handled // via Subscriber.onError() rather than as part of the SendStreamingResponse payload return ZeroPublisher.create(createTubeConfig(), tube -> { - publisher.subscribe(new Flow.Subscriber() { - Flow.Subscription subscription; - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } + CompletableFuture.runAsync(() -> { + publisher.subscribe(new Flow.Subscriber() { + Flow.Subscription subscription; + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } - @Override - public void onNext(StreamingEventKind item) { - tube.send(new SendStreamingMessageResponse(requestId, item)); - subscription.request(1); - } + @Override + public void onNext(StreamingEventKind item) { + tube.send(new SendStreamingMessageResponse(requestId, item)); + subscription.request(1); + } - @Override - public void onError(Throwable throwable) { - if (throwable instanceof JSONRPCError jsonrpcError) { - tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError)); - } else { - tube.send( - new SendStreamingMessageResponse( - requestId, new - InternalError(throwable.getMessage()))); + @Override + public void onError(Throwable throwable) { + if (throwable instanceof JSONRPCError jsonrpcError) { + tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError)); + } else { + tube.send( + new SendStreamingMessageResponse( + requestId, new + InternalError(throwable.getMessage()))); + } + onComplete(); } - onComplete(); - } - @Override - public void onComplete() { - tube.complete(); - } + @Override + public void onComplete() { + tube.complete(); + } + }); }); }); } diff --git a/transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java index a645c9b37..571e42612 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java @@ -8,10 +8,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.Flow; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import io.a2a.server.ServerCallContext; @@ -276,7 +273,7 @@ public void testOnMessageErrorMocks() { } @Test - public void testOnMessageStreamNewMessageSuccess() { + public void testOnMessageStreamNewMessageSuccess() throws InterruptedException { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); @@ -321,6 +318,8 @@ public void onComplete() { } }); + latch.await(); + // The Python implementation has several events emitted since it uses mocks. Also, in the // implementation, a Message is considered a 'final' Event in EventConsumer.consumeAll() // so there would be no more Events. @@ -366,6 +365,7 @@ public void testOnMessageStreamNewMessageSuccessMocks() { response = handler.onMessageSendStream(request, callContext); } + CompletableFuture future = new CompletableFuture<>(); List results = new ArrayList<>(); response.subscribe(new Flow.Subscriber() { @@ -385,15 +385,17 @@ public void onNext(SendStreamingMessageResponse item) { @Override public void onError(Throwable throwable) { - + future.completeExceptionally(throwable); } @Override public void onComplete() { - + future.complete(null); } }); + future.join(); + assertEquals(events, results); } @@ -511,6 +513,7 @@ public void testOnMessageStreamNewMessageExistingTaskSuccessMocks() { response = handler.onMessageSendStream(request, callContext); } + CompletableFuture future = new CompletableFuture<>(); List results = new ArrayList<>(); // Unlike testOnMessageStreamNewMessageExistingTaskSuccess() the ZeroPublisher.fromIterable() @@ -533,15 +536,17 @@ public void onNext(SendStreamingMessageResponse item) { @Override public void onError(Throwable throwable) { - + future.completeExceptionally(throwable); } @Override public void onComplete() { - + future.complete(null); } }); + future.join(); + assertEquals(events, results); } @@ -717,7 +722,7 @@ public void testOnResubscribeExistingTaskSuccess() { callContext); assertNull(smr.getError()); - + CompletableFuture future = new CompletableFuture<>(); List results = new ArrayList<>(); response.subscribe(new Flow.Subscriber<>() { @@ -738,14 +743,18 @@ public void onNext(SendStreamingMessageResponse item) { @Override public void onError(Throwable throwable) { subscription.cancel(); + future.completeExceptionally(throwable); } @Override public void onComplete() { subscription.cancel(); + future.complete(null); } }); + future.join(); + // The Python implementation has several events emitted since it uses mocks. // // See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation @@ -783,6 +792,7 @@ public void testOnResubscribeExistingTaskSuccessMocks() throws Exception { response = handler.onResubscribeToTask(request, callContext); } + CompletableFuture future = new CompletableFuture<>(); List results = new ArrayList<>(); // Unlike testOnResubscribeExistingTaskSuccess() the ZeroPublisher.fromIterable() @@ -806,14 +816,18 @@ public void onNext(SendStreamingMessageResponse item) { @Override public void onError(Throwable throwable) { subscription.cancel(); + future.completeExceptionally(throwable); } @Override public void onComplete() { subscription.cancel(); + future.complete(null); } }); + future.join(); + // The Python implementation has several events emitted since it uses mocks. // // See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation @@ -1147,6 +1161,7 @@ public void testOnMessageStreamTaskIdMismatch() { SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null)); Flow.Publisher response = handler.onMessageSendStream(request, callContext); + CompletableFuture future = new CompletableFuture<>(); List results = new ArrayList<>(); AtomicReference error = new AtomicReference<>(); @@ -1168,14 +1183,18 @@ public void onNext(SendStreamingMessageResponse item) { public void onError(Throwable throwable) { error.set(throwable); subscription.cancel(); + future.completeExceptionally(throwable); } @Override public void onComplete() { subscription.cancel(); + future.complete(null); } }); + future.join(); + assertNull(error.get()); assertEquals(1, results.size()); assertInstanceOf(InternalError.class, results.get(0).getError());