Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import jakarta.inject.Inject;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;

import io.a2a.server.PublicAgentCard;
Expand Down Expand Up @@ -212,37 +213,39 @@ private Flow.Publisher<SendStreamingMessageResponse> convertToSendStreamingMessa
// We can't use the normal convertingProcessor since that propagates any errors as an error handled
// via Subscriber.onError() rather than as part of the SendStreamingResponse payload
return ZeroPublisher.create(createTubeConfig(), tube -> {
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(StreamingEventKind item) {
tube.send(new SendStreamingMessageResponse(requestId, item));
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof JSONRPCError jsonrpcError) {
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
} else {
tube.send(
new SendStreamingMessageResponse(
requestId, new
InternalError(throwable.getMessage())));
CompletableFuture.runAsync(() -> {
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
onComplete();
}

@Override
public void onComplete() {
tube.complete();
}
@Override
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using tabs here? Please replace with spaces if so

public void onNext(StreamingEventKind item) {
tube.send(new SendStreamingMessageResponse(requestId, item));
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof JSONRPCError jsonrpcError) {
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
} else {
tube.send(
new SendStreamingMessageResponse(
requestId, new
InternalError(throwable.getMessage())));
}
onComplete();
}

@Override
public void onComplete() {
tube.complete();
}
});
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

While using CompletableFuture.runAsync is a good way to avoid blocking the current thread, any RuntimeException thrown by publisher.subscribe() will be swallowed. This would cause the returned Publisher to never complete or emit any items, potentially causing the client to hang. It's safer to handle potential exceptions from the asynchronous operation and propagate them to the tube.

                CompletableFuture.runAsync(() -> {
                    publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
                        Flow.Subscription subscription;
                        @Override
                        public void onSubscribe(Flow.Subscription subscription) {
                            this.subscription = subscription;
                            subscription.request(1);
                        }

                        @Override
                        public void onNext(StreamingEventKind item) {
                            tube.send(new SendStreamingMessageResponse(requestId, item));
                            subscription.request(1);
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            if (throwable instanceof JSONRPCError jsonrpcError) {
                                tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
                            } else {
                                tube.send(
                                        new SendStreamingMessageResponse(
                                                requestId, new
                                                InternalError(throwable.getMessage())));
                            }
                            onComplete();
                        }

                        @Override
                        public void onComplete() {
                            tube.complete();
                        }
                    });
                }).whenComplete((res, err) -> {
                    if (err != null) {
                        tube.fail(err);
                    }
                });

});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use static imports

import java.util.concurrent.atomic.AtomicReference;

import io.a2a.server.ServerCallContext;
Expand Down Expand Up @@ -269,7 +266,7 @@ public void testOnMessageErrorMocks() {
}

@Test
public void testOnMessageStreamNewMessageSuccess() {
public void testOnMessageStreamNewMessageSuccess() throws InterruptedException {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler);
agentExecutorExecute = (context, eventQueue) -> {
eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
Expand Down Expand Up @@ -314,6 +311,8 @@ public void onComplete() {
}
});

latch.await();

// The Python implementation has several events emitted since it uses mocks. Also, in the
// implementation, a Message is considered a 'final' Event in EventConsumer.consumeAll()
// so there would be no more Events.
Expand Down Expand Up @@ -359,6 +358,7 @@ public void testOnMessageStreamNewMessageSuccessMocks() {
response = handler.onMessageSendStream(request, callContext);
}

CompletableFuture<Void> future = new CompletableFuture<>();
List<Event> results = new ArrayList<>();

response.subscribe(new Flow.Subscriber<SendStreamingMessageResponse>() {
Expand All @@ -378,15 +378,17 @@ public void onNext(SendStreamingMessageResponse item) {

@Override
public void onError(Throwable throwable) {

future.completeExceptionally(throwable);
}

@Override
public void onComplete() {

future.complete(null);
}
});

future.join();

Assertions.assertEquals(events, results);
}

Expand Down Expand Up @@ -504,6 +506,7 @@ public void testOnMessageStreamNewMessageExistingTaskSuccessMocks() {
response = handler.onMessageSendStream(request, callContext);
}

CompletableFuture<Void> future = new CompletableFuture<>();
List<Event> results = new ArrayList<>();

// Unlike testOnMessageStreamNewMessageExistingTaskSuccess() the ZeroPublisher.fromIterable()
Expand All @@ -526,15 +529,17 @@ public void onNext(SendStreamingMessageResponse item) {

@Override
public void onError(Throwable throwable) {

future.completeExceptionally(throwable);
}

@Override
public void onComplete() {

future.complete(null);
}
});

future.join();

Assertions.assertEquals(events, results);
}

Expand Down Expand Up @@ -710,7 +715,7 @@ public void testOnResubscribeExistingTaskSuccess() {
callContext);
Assertions.assertNull(smr.getError());


CompletableFuture<Void> future = new CompletableFuture<>();
List<StreamingEventKind> results = new ArrayList<>();

response.subscribe(new Flow.Subscriber<>() {
Expand All @@ -731,14 +736,18 @@ public void onNext(SendStreamingMessageResponse item) {
@Override
public void onError(Throwable throwable) {
subscription.cancel();
future.completeExceptionally(throwable);
}

@Override
public void onComplete() {
subscription.cancel();
future.complete(null);
}
});

future.join();

// The Python implementation has several events emitted since it uses mocks.
//
// See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation
Expand Down Expand Up @@ -776,6 +785,7 @@ public void testOnResubscribeExistingTaskSuccessMocks() throws Exception {
response = handler.onResubscribeToTask(request, callContext);
}

CompletableFuture<Void> future = new CompletableFuture<>();
List<StreamingEventKind> results = new ArrayList<>();

// Unlike testOnResubscribeExistingTaskSuccess() the ZeroPublisher.fromIterable()
Expand All @@ -799,14 +809,18 @@ public void onNext(SendStreamingMessageResponse item) {
@Override
public void onError(Throwable throwable) {
subscription.cancel();
future.completeExceptionally(throwable);
}

@Override
public void onComplete() {
subscription.cancel();
future.complete(null);
}
});

future.join();

// The Python implementation has several events emitted since it uses mocks.
//
// See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation
Expand Down Expand Up @@ -1140,6 +1154,7 @@ public void testOnMessageStreamTaskIdMismatch() {
SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null));
Flow.Publisher<SendStreamingMessageResponse> response = handler.onMessageSendStream(request, callContext);

CompletableFuture<Void> future = new CompletableFuture<>();
List<SendStreamingMessageResponse> results = new ArrayList<>();
AtomicReference<Throwable> error = new AtomicReference<>();

Expand All @@ -1161,14 +1176,18 @@ public void onNext(SendStreamingMessageResponse item) {
public void onError(Throwable throwable) {
error.set(throwable);
subscription.cancel();
future.completeExceptionally(throwable);
}

@Override
public void onComplete() {
subscription.cancel();
future.complete(null);
}
});

future.join();

Assertions.assertNull(error.get());
Assertions.assertEquals(1, results.size());
Assertions.assertInstanceOf(InternalError.class, results.get(0).getError());
Expand Down
Loading