Skip to content

Commit 09cca84

Browse files
kabirchenkangqiang
andauthored
fix: JSONRPCHandler's republisher should happen in a separate thread. (#247)
FIxes #224 Also, add a test for JSONRPCHandler. GrpcHandler already uses a separate thread, but add a similar test there. --------- Co-authored-by: chenkangqiang <[email protected]>
1 parent 0fb4115 commit 09cca84

File tree

3 files changed

+206
-39
lines changed

3 files changed

+206
-39
lines changed

transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.List;
77
import java.util.concurrent.CountDownLatch;
88
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.AtomicBoolean;
910

1011
import com.google.protobuf.Empty;
1112
import com.google.protobuf.Struct;
@@ -744,6 +745,69 @@ public void testOnGetAuthenticatedExtendedAgentCard() throws Exception {
744745
// TODO - getting the authenticated extended agent card isn't support for gRPC right now
745746
}
746747

748+
@Test
749+
public void testStreamingDoesNotBlockMainThread() throws Exception {
750+
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler);
751+
752+
// Track if the main thread gets blocked during streaming
753+
AtomicBoolean eventReceived = new AtomicBoolean(false);
754+
CountDownLatch streamStarted = new CountDownLatch(1);
755+
GrpcHandler.setStreamingSubscribedRunnable(streamStarted::countDown);
756+
CountDownLatch eventProcessed = new CountDownLatch(1);
757+
758+
agentExecutorExecute = (context, eventQueue) -> {
759+
// Wait a bit to ensure the main thread continues
760+
try {
761+
Thread.sleep(100);
762+
} catch (InterruptedException e) {
763+
Thread.currentThread().interrupt();
764+
}
765+
eventQueue.enqueueEvent(context.getMessage());
766+
};
767+
768+
// Start streaming with a custom StreamObserver
769+
List<StreamResponse> results = new ArrayList<>();
770+
List<Throwable> errors = new ArrayList<>();
771+
StreamObserver<StreamResponse> streamObserver = new StreamObserver<>() {
772+
@Override
773+
public void onNext(StreamResponse streamResponse) {
774+
results.add(streamResponse);
775+
eventReceived.set(true);
776+
eventProcessed.countDown();
777+
}
778+
779+
@Override
780+
public void onError(Throwable throwable) {
781+
errors.add(throwable);
782+
eventProcessed.countDown();
783+
}
784+
785+
@Override
786+
public void onCompleted() {
787+
eventProcessed.countDown();
788+
}
789+
};
790+
791+
sendStreamingMessageRequest(handler, streamObserver);
792+
793+
// The main thread should not be blocked - we should be able to continue immediately
794+
Assertions.assertTrue(streamStarted.await(100, TimeUnit.MILLISECONDS),
795+
"Streaming subscription should start quickly without blocking main thread");
796+
797+
// This proves the main thread is not blocked - we can do other work
798+
// Simulate main thread doing other work
799+
Thread.sleep(50);
800+
801+
// Wait for the actual event processing to complete
802+
Assertions.assertTrue(eventProcessed.await(2, TimeUnit.SECONDS),
803+
"Event should be processed within reasonable time");
804+
805+
// Verify we received the event and no errors occurred
806+
Assertions.assertTrue(eventReceived.get(), "Should have received streaming event");
807+
Assertions.assertTrue(errors.isEmpty(), "Should not have any errors");
808+
Assertions.assertEquals(1, results.size(), "Should have received exactly one event");
809+
}
810+
747811
private StreamRecorder<SendMessageResponse> sendMessageRequest(GrpcHandler handler) throws Exception {
748812
SendMessageRequest request = SendMessageRequest.newBuilder()
749813
.setRequest(GRPC_MESSAGE)

transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import jakarta.inject.Inject;
77

88
import java.util.List;
9+
import java.util.concurrent.CompletableFuture;
910
import java.util.concurrent.Flow;
1011

1112
import io.a2a.server.ExtendedAgentCard;
@@ -242,37 +243,39 @@ private Flow.Publisher<SendStreamingMessageResponse> convertToSendStreamingMessa
242243
// We can't use the normal convertingProcessor since that propagates any errors as an error handled
243244
// via Subscriber.onError() rather than as part of the SendStreamingResponse payload
244245
return ZeroPublisher.create(createTubeConfig(), tube -> {
245-
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
246-
Flow.Subscription subscription;
247-
@Override
248-
public void onSubscribe(Flow.Subscription subscription) {
249-
this.subscription = subscription;
250-
subscription.request(1);
251-
}
246+
CompletableFuture.runAsync(() -> {
247+
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
248+
Flow.Subscription subscription;
249+
@Override
250+
public void onSubscribe(Flow.Subscription subscription) {
251+
this.subscription = subscription;
252+
subscription.request(1);
253+
}
252254

253-
@Override
254-
public void onNext(StreamingEventKind item) {
255-
tube.send(new SendStreamingMessageResponse(requestId, item));
256-
subscription.request(1);
257-
}
255+
@Override
256+
public void onNext(StreamingEventKind item) {
257+
tube.send(new SendStreamingMessageResponse(requestId, item));
258+
subscription.request(1);
259+
}
258260

259-
@Override
260-
public void onError(Throwable throwable) {
261-
if (throwable instanceof JSONRPCError jsonrpcError) {
262-
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
263-
} else {
264-
tube.send(
265-
new SendStreamingMessageResponse(
266-
requestId, new
267-
InternalError(throwable.getMessage())));
261+
@Override
262+
public void onError(Throwable throwable) {
263+
if (throwable instanceof JSONRPCError jsonrpcError) {
264+
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
265+
} else {
266+
tube.send(
267+
new SendStreamingMessageResponse(
268+
requestId, new
269+
InternalError(throwable.getMessage())));
270+
}
271+
onComplete();
268272
}
269-
onComplete();
270-
}
271273

272-
@Override
273-
public void onComplete() {
274-
tube.complete();
275-
}
274+
@Override
275+
public void onComplete() {
276+
tube.complete();
277+
}
278+
});
276279
});
277280
});
278281
}

0 commit comments

Comments
 (0)