Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import jakarta.inject.Inject;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;

import io.a2a.server.ExtendedAgentCard;
Expand Down Expand Up @@ -242,37 +243,39 @@ private Flow.Publisher<SendStreamingMessageResponse> convertToSendStreamingMessa
// We can't use the normal convertingProcessor since that propagates any errors as an error handled
// via Subscriber.onError() rather than as part of the SendStreamingResponse payload
return ZeroPublisher.create(createTubeConfig(), tube -> {
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
CompletableFuture.runAsync(() -> {
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(StreamingEventKind item) {
tube.send(new SendStreamingMessageResponse(requestId, item));
subscription.request(1);
}
@Override
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using tabs here? Please replace with spaces if so

public void onNext(StreamingEventKind item) {
tube.send(new SendStreamingMessageResponse(requestId, item));
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof JSONRPCError jsonrpcError) {
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
} else {
tube.send(
new SendStreamingMessageResponse(
requestId, new
InternalError(throwable.getMessage())));
@Override
public void onError(Throwable throwable) {
if (throwable instanceof JSONRPCError jsonrpcError) {
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
} else {
tube.send(
new SendStreamingMessageResponse(
requestId, new
InternalError(throwable.getMessage())));
}
onComplete();
}
onComplete();
}

@Override
public void onComplete() {
tube.complete();
}
@Override
public void onComplete() {
tube.complete();
}
});
});
Comment on lines +246 to 279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The CompletableFuture.runAsync call is 'fire-and-forget'. If an exception occurs during the execution of the lambda (e.g., if publisher.subscribe() throws an exception), the returned CompletableFuture will complete exceptionally, but this is not being handled. This could lead to the downstream subscriber hanging indefinitely as the tube would never be failed or completed.

It's important to handle potential exceptions from the asynchronous operation to ensure the stream is properly terminated. You can do this by chaining .exceptionally() to the CompletableFuture.

                CompletableFuture.runAsync(() -> {
                    publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
                        Flow.Subscription subscription;
                        @Override
                        public void onSubscribe(Flow.Subscription subscription) {
                            this.subscription = subscription;
                            subscription.request(1);
                        }

                        @Override
                        public void onNext(StreamingEventKind item) {
                            tube.send(new SendStreamingMessageResponse(requestId, item));
                            subscription.request(1);
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            if (throwable instanceof JSONRPCError jsonrpcError) {
                                tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
                            } else {
                                tube.send(
                                        new SendStreamingMessageResponse(
                                                requestId, new
                                                InternalError(throwable.getMessage())));
                            }
                            onComplete();
                        }

                        @Override
                        public void onComplete() {
                            tube.complete();
                        }
                    });
                }).exceptionally(throwable -> {
                    tube.fail(throwable);
                    return null;
                });

});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use static imports

import java.util.concurrent.atomic.AtomicReference;

import io.a2a.server.ServerCallContext;
Expand Down Expand Up @@ -276,7 +273,7 @@ public void testOnMessageErrorMocks() {
}

@Test
public void testOnMessageStreamNewMessageSuccess() {
public void testOnMessageStreamNewMessageSuccess() throws InterruptedException {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler);
agentExecutorExecute = (context, eventQueue) -> {
eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
Expand Down Expand Up @@ -321,6 +318,8 @@ public void onComplete() {
}
});

latch.await();

// The Python implementation has several events emitted since it uses mocks. Also, in the
// implementation, a Message is considered a 'final' Event in EventConsumer.consumeAll()
// so there would be no more Events.
Expand Down Expand Up @@ -366,6 +365,7 @@ public void testOnMessageStreamNewMessageSuccessMocks() {
response = handler.onMessageSendStream(request, callContext);
}

CompletableFuture<Void> future = new CompletableFuture<>();
List<Event> results = new ArrayList<>();

response.subscribe(new Flow.Subscriber<SendStreamingMessageResponse>() {
Expand All @@ -385,15 +385,17 @@ public void onNext(SendStreamingMessageResponse item) {

@Override
public void onError(Throwable throwable) {

future.completeExceptionally(throwable);
}

@Override
public void onComplete() {

future.complete(null);
}
});

future.join();

assertEquals(events, results);
}

Expand Down Expand Up @@ -511,6 +513,7 @@ public void testOnMessageStreamNewMessageExistingTaskSuccessMocks() {
response = handler.onMessageSendStream(request, callContext);
}

CompletableFuture<Void> future = new CompletableFuture<>();
List<Event> results = new ArrayList<>();

// Unlike testOnMessageStreamNewMessageExistingTaskSuccess() the ZeroPublisher.fromIterable()
Expand All @@ -533,15 +536,17 @@ public void onNext(SendStreamingMessageResponse item) {

@Override
public void onError(Throwable throwable) {

future.completeExceptionally(throwable);
}

@Override
public void onComplete() {

future.complete(null);
}
});

future.join();

assertEquals(events, results);
}

Expand Down Expand Up @@ -717,7 +722,7 @@ public void testOnResubscribeExistingTaskSuccess() {
callContext);
assertNull(smr.getError());


CompletableFuture<Void> future = new CompletableFuture<>();
List<StreamingEventKind> results = new ArrayList<>();

response.subscribe(new Flow.Subscriber<>() {
Expand All @@ -738,14 +743,18 @@ public void onNext(SendStreamingMessageResponse item) {
@Override
public void onError(Throwable throwable) {
subscription.cancel();
future.completeExceptionally(throwable);
}

@Override
public void onComplete() {
subscription.cancel();
future.complete(null);
}
});

future.join();

// The Python implementation has several events emitted since it uses mocks.
//
// See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation
Expand Down Expand Up @@ -783,6 +792,7 @@ public void testOnResubscribeExistingTaskSuccessMocks() throws Exception {
response = handler.onResubscribeToTask(request, callContext);
}

CompletableFuture<Void> future = new CompletableFuture<>();
List<StreamingEventKind> results = new ArrayList<>();

// Unlike testOnResubscribeExistingTaskSuccess() the ZeroPublisher.fromIterable()
Expand All @@ -806,14 +816,18 @@ public void onNext(SendStreamingMessageResponse item) {
@Override
public void onError(Throwable throwable) {
subscription.cancel();
future.completeExceptionally(throwable);
}

@Override
public void onComplete() {
subscription.cancel();
future.complete(null);
}
});

future.join();

// The Python implementation has several events emitted since it uses mocks.
//
// See testOnMessageStreamNewMessageExistingTaskSuccessMocks() for a test more similar to the Python implementation
Expand Down Expand Up @@ -1147,6 +1161,7 @@ public void testOnMessageStreamTaskIdMismatch() {
SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null));
Flow.Publisher<SendStreamingMessageResponse> response = handler.onMessageSendStream(request, callContext);

CompletableFuture<Void> future = new CompletableFuture<>();
List<SendStreamingMessageResponse> results = new ArrayList<>();
AtomicReference<Throwable> error = new AtomicReference<>();

Expand All @@ -1168,14 +1183,18 @@ public void onNext(SendStreamingMessageResponse item) {
public void onError(Throwable throwable) {
error.set(throwable);
subscription.cancel();
future.completeExceptionally(throwable);
}

@Override
public void onComplete() {
subscription.cancel();
future.complete(null);
}
});

future.join();

assertNull(error.get());
assertEquals(1, results.size());
assertInstanceOf(InternalError.class, results.get(0).getError());
Expand Down