Skip to content

Commit 76e84e5

Browse files
chenkangqiangkabir
authored andcommitted
fix: JSONRPCHandler stream block
1 parent ef190ab commit 76e84e5

File tree

2 files changed

+63
-44
lines changed

2 files changed

+63
-44
lines changed

transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import jakarta.inject.Inject;
77

88
import java.util.List;
9+
import java.util.concurrent.CompletableFuture;
910
import java.util.concurrent.Flow;
1011

1112
import io.a2a.server.ExtendedAgentCard;
@@ -242,37 +243,39 @@ private Flow.Publisher<SendStreamingMessageResponse> convertToSendStreamingMessa
242243
// We can't use the normal convertingProcessor since that propagates any errors as an error handled
243244
// via Subscriber.onError() rather than as part of the SendStreamingResponse payload
244245
return ZeroPublisher.create(createTubeConfig(), tube -> {
245-
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
246-
Flow.Subscription subscription;
247-
@Override
248-
public void onSubscribe(Flow.Subscription subscription) {
249-
this.subscription = subscription;
250-
subscription.request(1);
251-
}
246+
CompletableFuture.runAsync(() -> {
247+
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
248+
Flow.Subscription subscription;
249+
@Override
250+
public void onSubscribe(Flow.Subscription subscription) {
251+
this.subscription = subscription;
252+
subscription.request(1);
253+
}
252254

253-
@Override
254-
public void onNext(StreamingEventKind item) {
255-
tube.send(new SendStreamingMessageResponse(requestId, item));
256-
subscription.request(1);
257-
}
255+
@Override
256+
public void onNext(StreamingEventKind item) {
257+
tube.send(new SendStreamingMessageResponse(requestId, item));
258+
subscription.request(1);
259+
}
258260

259-
@Override
260-
public void onError(Throwable throwable) {
261-
if (throwable instanceof JSONRPCError jsonrpcError) {
262-
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
263-
} else {
264-
tube.send(
265-
new SendStreamingMessageResponse(
266-
requestId, new
267-
InternalError(throwable.getMessage())));
261+
@Override
262+
public void onError(Throwable throwable) {
263+
if (throwable instanceof JSONRPCError jsonrpcError) {
264+
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
265+
} else {
266+
tube.send(
267+
new SendStreamingMessageResponse(
268+
requestId, new
269+
InternalError(throwable.getMessage())));
270+
}
271+
onComplete();
268272
}
269-
onComplete();
270-
}
271273

272-
@Override
273-
public void onComplete() {
274-
tube.complete();
275-
}
274+
@Override
275+
public void onComplete() {
276+
tube.complete();
277+
}
278+
});
276279
});
277280
});
278281
}

transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@
88
import java.util.Collections;
99
import java.util.List;
1010
import java.util.Map;
11-
import java.util.concurrent.CountDownLatch;
12-
import java.util.concurrent.Executors;
13-
import java.util.concurrent.Flow;
14-
import java.util.concurrent.TimeUnit;
11+
import java.util.concurrent.*;
1512
import java.util.concurrent.atomic.AtomicReference;
1613

1714
import io.a2a.server.ServerCallContext;
@@ -32,14 +29,12 @@
3229
import io.a2a.spec.Event;
3330
import io.a2a.spec.GetAuthenticatedExtendedCardRequest;
3431
import io.a2a.spec.GetAuthenticatedExtendedCardResponse;
35-
import io.a2a.spec.GetTaskPushNotificationConfigParams;
3632
import io.a2a.spec.GetTaskPushNotificationConfigRequest;
3733
import io.a2a.spec.GetTaskPushNotificationConfigResponse;
3834
import io.a2a.spec.GetTaskRequest;
3935
import io.a2a.spec.GetTaskResponse;
4036
import io.a2a.spec.InternalError;
4137
import io.a2a.spec.InvalidRequestError;
42-
import io.a2a.spec.ListTaskPushNotificationConfigParams;
4338
import io.a2a.spec.ListTaskPushNotificationConfigRequest;
4439
import io.a2a.spec.ListTaskPushNotificationConfigResponse;
4540
import io.a2a.spec.Message;
@@ -277,7 +272,7 @@ public void testOnMessageErrorMocks() {
277272
}
278273

279274
@Test
280-
public void testOnMessageStreamNewMessageSuccess() {
275+
public void testOnMessageStreamNewMessageSuccess() throws InterruptedException {
281276
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler);
282277
agentExecutorExecute = (context, eventQueue) -> {
283278
eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
@@ -322,6 +317,8 @@ public void onComplete() {
322317
}
323318
});
324319

320+
latch.await();
321+
325322
// The Python implementation has several events emitted since it uses mocks. Also, in the
326323
// implementation, a Message is considered a 'final' Event in EventConsumer.consumeAll()
327324
// so there would be no more Events.
@@ -367,6 +364,7 @@ public void testOnMessageStreamNewMessageSuccessMocks() {
367364
response = handler.onMessageSendStream(request, callContext);
368365
}
369366

367+
CompletableFuture<Void> future = new CompletableFuture<>();
370368
List<Event> results = new ArrayList<>();
371369

372370
response.subscribe(new Flow.Subscriber<SendStreamingMessageResponse>() {
@@ -386,16 +384,17 @@ public void onNext(SendStreamingMessageResponse item) {
386384

387385
@Override
388386
public void onError(Throwable throwable) {
389-
387+
future.completeExceptionally(throwable);
390388
}
391389

392390
@Override
393391
public void onComplete() {
394-
392+
future.complete(null);
395393
}
396394
});
397395

398-
assertEquals(events, results);
396+
future.join();
397+
Assertions.assertEquals(events, results);
399398
}
400399

401400
@Test
@@ -512,6 +511,7 @@ public void testOnMessageStreamNewMessageExistingTaskSuccessMocks() {
512511
response = handler.onMessageSendStream(request, callContext);
513512
}
514513

514+
CompletableFuture<Void> future = new CompletableFuture<>();
515515
List<Event> results = new ArrayList<>();
516516

517517
// Unlike testOnMessageStreamNewMessageExistingTaskSuccess() the ZeroPublisher.fromIterable()
@@ -534,16 +534,18 @@ public void onNext(SendStreamingMessageResponse item) {
534534

535535
@Override
536536
public void onError(Throwable throwable) {
537-
537+
future.completeExceptionally(throwable);
538538
}
539539

540540
@Override
541541
public void onComplete() {
542-
542+
future.complete(null);
543543
}
544544
});
545545

546-
assertEquals(events, results);
546+
future.join();
547+
548+
Assertions.assertEquals(events, results);
547549
}
548550

549551

@@ -721,7 +723,7 @@ public void testOnResubscribeExistingTaskSuccess() {
721723
callContext);
722724
assertNull(smr.getError());
723725

724-
726+
CompletableFuture<Void> future = new CompletableFuture<>();
725727
List<StreamingEventKind> results = new ArrayList<>();
726728

727729
response.subscribe(new Flow.Subscriber<>() {
@@ -742,14 +744,18 @@ public void onNext(SendStreamingMessageResponse item) {
742744
@Override
743745
public void onError(Throwable throwable) {
744746
subscription.cancel();
747+
future.completeExceptionally(throwable);
745748
}
746749

747750
@Override
748751
public void onComplete() {
749752
subscription.cancel();
753+
future.complete(null);
750754
}
751755
});
752756

757+
future.join();
758+
753759
// The Python implementation has several events emitted since it uses mocks.
754760
//
755761
// See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation
@@ -787,6 +793,7 @@ public void testOnResubscribeExistingTaskSuccessMocks() throws Exception {
787793
response = handler.onResubscribeToTask(request, callContext);
788794
}
789795

796+
CompletableFuture<Void> future = new CompletableFuture<>();
790797
List<StreamingEventKind> results = new ArrayList<>();
791798

792799
// Unlike testOnResubscribeExistingTaskSuccess() the ZeroPublisher.fromIterable()
@@ -810,14 +817,18 @@ public void onNext(SendStreamingMessageResponse item) {
810817
@Override
811818
public void onError(Throwable throwable) {
812819
subscription.cancel();
820+
future.completeExceptionally(throwable);
813821
}
814822

815823
@Override
816824
public void onComplete() {
817825
subscription.cancel();
826+
future.complete(null);
818827
}
819828
});
820829

830+
future.join();
831+
821832
// The Python implementation has several events emitted since it uses mocks.
822833
//
823834
// See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation
@@ -1151,6 +1162,7 @@ public void testOnMessageStreamTaskIdMismatch() {
11511162
SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null));
11521163
Flow.Publisher<SendStreamingMessageResponse> response = handler.onMessageSendStream(request, callContext);
11531164

1165+
CompletableFuture<Void> future = new CompletableFuture<>();
11541166
List<SendStreamingMessageResponse> results = new ArrayList<>();
11551167
AtomicReference<Throwable> error = new AtomicReference<>();
11561168

@@ -1172,17 +1184,21 @@ public void onNext(SendStreamingMessageResponse item) {
11721184
public void onError(Throwable throwable) {
11731185
error.set(throwable);
11741186
subscription.cancel();
1187+
future.completeExceptionally(throwable);
11751188
}
11761189

11771190
@Override
11781191
public void onComplete() {
11791192
subscription.cancel();
1193+
future.complete(null);
11801194
}
11811195
});
11821196

1183-
assertNull(error.get());
1184-
assertEquals(1, results.size());
1185-
assertInstanceOf(InternalError.class, results.get(0).getError());
1197+
future.join();
1198+
1199+
Assertions.assertNull(error.get());
1200+
Assertions.assertEquals(1, results.size());
1201+
Assertions.assertInstanceOf(InternalError.class, results.get(0).getError());
11861202
}
11871203

11881204
@Test

0 commit comments

Comments
 (0)