Skip to content

Commit 179acc7

Browse files
committed
Add hook to make sure subscrber has started
Tests which need to send events can use this to make sure there is a subscription before sending events on the queue This should make testResubscribeExistingTaskSuccess(0 more stable
1 parent 6f3ed75 commit 179acc7

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

quarkus-sdk/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public class A2AServerRoutes {
6868
@ExtendedAgentCard
6969
Instance<AgentCard> extendedAgentCard;
7070

71+
// Hook so testing can wait until the MultiSseSupport is subscribes.
72+
private static volatile Runnable streamingMultiSseSupportSubscribedRunnable;
73+
7174
private final Executor executor = Executors.newCachedThreadPool();
7275

7376
@Route(path = "/", methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
@@ -221,6 +224,10 @@ private static boolean isNonStreamingRequest(String requestBody) {
221224
requestBody.contains(A2A.GET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD);
222225
}
223226

227+
static void setStreamingMultiSseSupportSubscribedRunnable(Runnable runnable) {
228+
streamingMultiSseSupportSubscribedRunnable = runnable;
229+
}
230+
224231
// Port of import io.quarkus.vertx.web.runtime.MultiSseSupport, which is considered internal API
225232
private static class MultiSseSupport {
226233

@@ -255,6 +262,12 @@ public static void write(Multi<Buffer> multi, RoutingContext rc) {
255262
public void onSubscribe(Flow.Subscription subscription) {
256263
this.upstream = subscription;
257264
this.upstream.request(1);
265+
266+
// Notify tests that we are subscribed
267+
Runnable runnable = streamingMultiSseSupportSubscribedRunnable;
268+
if (runnable != null) {
269+
runnable.run();
270+
}
258271
}
259272

260273
@Override

quarkus-sdk/src/test/java/io/a2a/server/apps/quarkus/A2AServerResourceTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -623,13 +623,15 @@ public void testResubscribeExistingTaskSuccess() throws Exception {
623623
// resubscribe to the task, requires the task and its queue to still be active
624624
TaskResubscriptionRequest taskResubscriptionRequest = new TaskResubscriptionRequest("1", new TaskIdParams(MINIMAL_TASK.getId()));
625625

626+
// Count down the latch when the MultiSseSupport on the server has started subscribing
627+
A2AServerRoutes.setStreamingMultiSseSupportSubscribedRunnable(taskResubscriptionRequestSent::countDown);
628+
626629
CompletableFuture<HttpResponse<Stream<String>>> responseFuture = initialiseStreamingRequest(taskResubscriptionRequest, null);
627630

628-
//CountDownLatch latch = new CountDownLatch(1);
629631
AtomicReference<Throwable> errorRef = new AtomicReference<>();
630-
taskResubscriptionRequestSent.countDown();
631632

632633
responseFuture.thenAccept(response -> {
634+
633635
if (response.statusCode() != 200) {
634636
//errorRef.set(new IllegalStateException("Status code was " + response.statusCode()));
635637
throw new IllegalStateException("Status code was " + response.statusCode());
@@ -660,14 +662,11 @@ public void testResubscribeExistingTaskSuccess() throws Exception {
660662
if (!isStreamClosedError(t)) {
661663
errorRef.set(t);
662664
}
663-
//latch.countDown();
664665
return null;
665666
});
666667

667668
try {
668669
taskResubscriptionRequestSent.await();
669-
// sleep to ensure that the events are sent after the client request is made
670-
Thread.sleep(1000);
671670
List<Event> events = List.of(
672671
new TaskArtifactUpdateEvent.Builder()
673672
.taskId(MINIMAL_TASK.getId())
@@ -713,6 +712,7 @@ public void testResubscribeExistingTaskSuccess() throws Exception {
713712
assertEquals(TaskState.COMPLETED, taskStatusUpdateEvent.getStatus().state());
714713
assertNotNull(taskStatusUpdateEvent.getStatus().timestamp());
715714
} finally {
715+
A2AServerRoutes.setStreamingMultiSseSupportSubscribedRunnable(null);
716716
taskStore.delete(MINIMAL_TASK.getId());
717717
executorService.shutdown();
718718
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {

0 commit comments

Comments
 (0)