Skip to content

Commit fb627c6

Browse files
author
chenkangqiang
committed
fix: JSONRPCHandler stream block
1 parent 81b4a6b commit fb627c6

File tree

2 files changed

+61
-39
lines changed

2 files changed

+61
-39
lines changed

transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import jakarta.inject.Inject;
66

77
import java.util.List;
8+
import java.util.concurrent.CompletableFuture;
89
import java.util.concurrent.Flow;
910

1011
import io.a2a.server.PublicAgentCard;
@@ -212,37 +213,39 @@ private Flow.Publisher<SendStreamingMessageResponse> convertToSendStreamingMessa
212213
// We can't use the normal convertingProcessor since that propagates any errors as an error handled
213214
// via Subscriber.onError() rather than as part of the SendStreamingResponse payload
214215
return ZeroPublisher.create(createTubeConfig(), tube -> {
215-
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
216-
Flow.Subscription subscription;
217-
@Override
218-
public void onSubscribe(Flow.Subscription subscription) {
219-
this.subscription = subscription;
220-
subscription.request(1);
221-
}
222-
223-
@Override
224-
public void onNext(StreamingEventKind item) {
225-
tube.send(new SendStreamingMessageResponse(requestId, item));
226-
subscription.request(1);
227-
}
228-
229-
@Override
230-
public void onError(Throwable throwable) {
231-
if (throwable instanceof JSONRPCError jsonrpcError) {
232-
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
233-
} else {
234-
tube.send(
235-
new SendStreamingMessageResponse(
236-
requestId, new
237-
InternalError(throwable.getMessage())));
216+
CompletableFuture.runAsync(() -> {
217+
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
218+
Flow.Subscription subscription;
219+
@Override
220+
public void onSubscribe(Flow.Subscription subscription) {
221+
this.subscription = subscription;
222+
subscription.request(1);
238223
}
239-
onComplete();
240-
}
241224

242-
@Override
243-
public void onComplete() {
244-
tube.complete();
245-
}
225+
@Override
226+
public void onNext(StreamingEventKind item) {
227+
tube.send(new SendStreamingMessageResponse(requestId, item));
228+
subscription.request(1);
229+
}
230+
231+
@Override
232+
public void onError(Throwable throwable) {
233+
if (throwable instanceof JSONRPCError jsonrpcError) {
234+
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
235+
} else {
236+
tube.send(
237+
new SendStreamingMessageResponse(
238+
requestId, new
239+
InternalError(throwable.getMessage())));
240+
}
241+
onComplete();
242+
}
243+
244+
@Override
245+
public void onComplete() {
246+
tube.complete();
247+
}
248+
});
246249
});
247250
});
248251
}

transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
import java.util.Collections;
55
import java.util.List;
66
import java.util.Map;
7-
import java.util.concurrent.CountDownLatch;
8-
import java.util.concurrent.Executors;
9-
import java.util.concurrent.Flow;
10-
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.*;
118
import java.util.concurrent.atomic.AtomicReference;
129

1310
import io.a2a.server.ServerCallContext;
@@ -269,7 +266,7 @@ public void testOnMessageErrorMocks() {
269266
}
270267

271268
@Test
272-
public void testOnMessageStreamNewMessageSuccess() {
269+
public void testOnMessageStreamNewMessageSuccess() throws InterruptedException {
273270
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler);
274271
agentExecutorExecute = (context, eventQueue) -> {
275272
eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
@@ -314,6 +311,8 @@ public void onComplete() {
314311
}
315312
});
316313

314+
latch.await();
315+
317316
// The Python implementation has several events emitted since it uses mocks. Also, in the
318317
// implementation, a Message is considered a 'final' Event in EventConsumer.consumeAll()
319318
// so there would be no more Events.
@@ -359,6 +358,7 @@ public void testOnMessageStreamNewMessageSuccessMocks() {
359358
response = handler.onMessageSendStream(request, callContext);
360359
}
361360

361+
CompletableFuture<Void> future = new CompletableFuture<>();
362362
List<Event> results = new ArrayList<>();
363363

364364
response.subscribe(new Flow.Subscriber<SendStreamingMessageResponse>() {
@@ -378,15 +378,17 @@ public void onNext(SendStreamingMessageResponse item) {
378378

379379
@Override
380380
public void onError(Throwable throwable) {
381-
381+
future.completeExceptionally(throwable);
382382
}
383383

384384
@Override
385385
public void onComplete() {
386-
386+
future.complete(null);
387387
}
388388
});
389389

390+
future.join();
391+
390392
Assertions.assertEquals(events, results);
391393
}
392394

@@ -504,6 +506,7 @@ public void testOnMessageStreamNewMessageExistingTaskSuccessMocks() {
504506
response = handler.onMessageSendStream(request, callContext);
505507
}
506508

509+
CompletableFuture<Void> future = new CompletableFuture<>();
507510
List<Event> results = new ArrayList<>();
508511

509512
// Unlike testOnMessageStreamNewMessageExistingTaskSuccess() the ZeroPublisher.fromIterable()
@@ -526,15 +529,17 @@ public void onNext(SendStreamingMessageResponse item) {
526529

527530
@Override
528531
public void onError(Throwable throwable) {
529-
532+
future.completeExceptionally(throwable);
530533
}
531534

532535
@Override
533536
public void onComplete() {
534-
537+
future.complete(null);
535538
}
536539
});
537540

541+
future.join();
542+
538543
Assertions.assertEquals(events, results);
539544
}
540545

@@ -710,7 +715,7 @@ public void testOnResubscribeExistingTaskSuccess() {
710715
callContext);
711716
Assertions.assertNull(smr.getError());
712717

713-
718+
CompletableFuture<Void> future = new CompletableFuture<>();
714719
List<StreamingEventKind> results = new ArrayList<>();
715720

716721
response.subscribe(new Flow.Subscriber<>() {
@@ -731,14 +736,18 @@ public void onNext(SendStreamingMessageResponse item) {
731736
@Override
732737
public void onError(Throwable throwable) {
733738
subscription.cancel();
739+
future.completeExceptionally(throwable);
734740
}
735741

736742
@Override
737743
public void onComplete() {
738744
subscription.cancel();
745+
future.complete(null);
739746
}
740747
});
741748

749+
future.join();
750+
742751
// The Python implementation has several events emitted since it uses mocks.
743752
//
744753
// See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation
@@ -776,6 +785,7 @@ public void testOnResubscribeExistingTaskSuccessMocks() throws Exception {
776785
response = handler.onResubscribeToTask(request, callContext);
777786
}
778787

788+
CompletableFuture<Void> future = new CompletableFuture<>();
779789
List<StreamingEventKind> results = new ArrayList<>();
780790

781791
// Unlike testOnResubscribeExistingTaskSuccess() the ZeroPublisher.fromIterable()
@@ -799,14 +809,18 @@ public void onNext(SendStreamingMessageResponse item) {
799809
@Override
800810
public void onError(Throwable throwable) {
801811
subscription.cancel();
812+
future.completeExceptionally(throwable);
802813
}
803814

804815
@Override
805816
public void onComplete() {
806817
subscription.cancel();
818+
future.complete(null);
807819
}
808820
});
809821

822+
future.join();
823+
810824
// The Python implementation has several events emitted since it uses mocks.
811825
//
812826
// See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation
@@ -1140,6 +1154,7 @@ public void testOnMessageStreamTaskIdMismatch() {
11401154
SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null));
11411155
Flow.Publisher<SendStreamingMessageResponse> response = handler.onMessageSendStream(request, callContext);
11421156

1157+
CompletableFuture<Void> future = new CompletableFuture<>();
11431158
List<SendStreamingMessageResponse> results = new ArrayList<>();
11441159
AtomicReference<Throwable> error = new AtomicReference<>();
11451160

@@ -1161,14 +1176,18 @@ public void onNext(SendStreamingMessageResponse item) {
11611176
public void onError(Throwable throwable) {
11621177
error.set(throwable);
11631178
subscription.cancel();
1179+
future.completeExceptionally(throwable);
11641180
}
11651181

11661182
@Override
11671183
public void onComplete() {
11681184
subscription.cancel();
1185+
future.complete(null);
11691186
}
11701187
});
11711188

1189+
future.join();
1190+
11721191
Assertions.assertNull(error.get());
11731192
Assertions.assertEquals(1, results.size());
11741193
Assertions.assertInstanceOf(InternalError.class, results.get(0).getError());

0 commit comments

Comments
 (0)