Skip to content

Commit f902f18

Browse files
authored
feat: Rework tests so they don't rely on running in container (#185)
# Description While running the tests in a container is possible in Quarkus, and in WildFly (with Arquillian), this might not be possible for all Java runtimes. Hence, this PR changes the tests so the interactions with the TaskStore, QueueManager, and being notified about the subscription to a streaming request happening. Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Follow the [`CONTRIBUTING` Guide](../CONTRIBUTING.md). - [x] Make your Pull Request title in the <https://www.conventionalcommits.org/> specification. - Important Prefixes for [release-please](https://github.com/googleapis/release-please): - `fix:` which represents bug fixes, and correlates to a [SemVer](https://semver.org/) patch. - `feat:` represents a new feature, and correlates to a SemVer minor. - `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking change (indicated by the `!`) and will result in a SemVer major. - [x] Ensure the tests pass - [x] Appropriate READMEs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent c3e7f9e commit f902f18

File tree

5 files changed

+389
-71
lines changed

5 files changed

+389
-71
lines changed

reference-impl/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.util.concurrent.atomic.AtomicLong;
99
import java.util.function.Function;
1010

11-
import com.fasterxml.jackson.databind.JsonNode;
1211
import jakarta.enterprise.inject.Instance;
1312
import jakarta.inject.Inject;
1413
import jakarta.inject.Singleton;
@@ -17,8 +16,10 @@
1716
import com.fasterxml.jackson.core.JsonParseException;
1817
import com.fasterxml.jackson.core.JsonProcessingException;
1918
import com.fasterxml.jackson.core.io.JsonEOFException;
19+
import com.fasterxml.jackson.databind.JsonNode;
2020
import io.a2a.server.ExtendedAgentCard;
2121
import io.a2a.server.requesthandlers.JSONRPCHandler;
22+
import io.a2a.server.util.async.Internal;
2223
import io.a2a.spec.AgentCard;
2324
import io.a2a.spec.CancelTaskRequest;
2425
import io.a2a.spec.GetTaskPushNotificationConfigRequest;
@@ -44,7 +45,6 @@
4445
import io.a2a.spec.TaskResubscriptionRequest;
4546
import io.a2a.spec.UnsupportedOperationError;
4647
import io.a2a.util.Utils;
47-
import io.a2a.server.util.async.Internal;
4848
import io.quarkus.vertx.web.Body;
4949
import io.quarkus.vertx.web.ReactiveRoutes;
5050
import io.quarkus.vertx.web.Route;
@@ -69,6 +69,7 @@ public class A2AServerRoutes {
6969
Instance<AgentCard> extendedAgentCard;
7070

7171
// Hook so testing can wait until the MultiSseSupport is subscribed.
72+
// Without this we get intermittent failures
7273
private static volatile Runnable streamingMultiSseSupportSubscribedRunnable;
7374

7475
@Inject
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package io.a2a.server.apps.quarkus;
2+
3+
import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
4+
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
5+
import static jakarta.ws.rs.core.MediaType.TEXT_PLAIN;
6+
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import jakarta.annotation.PostConstruct;
10+
import jakarta.inject.Inject;
11+
import jakarta.inject.Singleton;
12+
13+
import io.a2a.server.apps.common.TestUtilsBean;
14+
import io.a2a.spec.Task;
15+
import io.a2a.spec.TaskArtifactUpdateEvent;
16+
import io.a2a.spec.TaskStatusUpdateEvent;
17+
import io.a2a.util.Utils;
18+
import io.quarkus.vertx.web.Body;
19+
import io.quarkus.vertx.web.Param;
20+
import io.quarkus.vertx.web.Route;
21+
import io.vertx.ext.web.RoutingContext;
22+
23+
/**
24+
* Exposes the {@link TestUtilsBean} via REST using Quarkus Reactive Routes
25+
*/
26+
@Singleton
27+
public class A2ATestRoutes {
28+
@Inject
29+
TestUtilsBean testUtilsBean;
30+
31+
@Inject
32+
A2AServerRoutes a2AServerRoutes;
33+
34+
AtomicInteger streamingSubscribedCount = new AtomicInteger(0);
35+
36+
@PostConstruct
37+
public void init() {
38+
A2AServerRoutes.setStreamingMultiSseSupportSubscribedRunnable(() -> streamingSubscribedCount.incrementAndGet());
39+
}
40+
41+
42+
@Route(path = "/test/task", methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
43+
public void saveTask(@Body String body, RoutingContext rc) {
44+
try {
45+
Task task = Utils.OBJECT_MAPPER.readValue(body, Task.class);
46+
testUtilsBean.saveTask(task);
47+
rc.response()
48+
.setStatusCode(200)
49+
.end();
50+
} catch (Throwable t) {
51+
errorResponse(t, rc);
52+
}
53+
}
54+
55+
@Route(path = "/test/task/:taskId", methods = {Route.HttpMethod.GET}, produces = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
56+
public void getTask(@Param String taskId, RoutingContext rc) {
57+
try {
58+
Task task = testUtilsBean.getTask(taskId);
59+
if (task == null) {
60+
rc.response()
61+
.setStatusCode(404)
62+
.end();
63+
return;
64+
}
65+
rc.response()
66+
.setStatusCode(200)
67+
.putHeader(CONTENT_TYPE, APPLICATION_JSON)
68+
.end(Utils.OBJECT_MAPPER.writeValueAsString(task));
69+
70+
} catch (Throwable t) {
71+
errorResponse(t, rc);
72+
}
73+
}
74+
75+
@Route(path = "/test/task/:taskId", methods = {Route.HttpMethod.DELETE}, type = Route.HandlerType.BLOCKING)
76+
public void deleteTask(@Param String taskId, RoutingContext rc) {
77+
try {
78+
Task task = testUtilsBean.getTask(taskId);
79+
if (task == null) {
80+
rc.response()
81+
.setStatusCode(404)
82+
.end();
83+
return;
84+
}
85+
testUtilsBean.deleteTask(taskId);
86+
rc.response()
87+
.setStatusCode(200)
88+
.end();
89+
} catch (Throwable t) {
90+
errorResponse(t, rc);
91+
}
92+
}
93+
94+
@Route(path = "/test/queue/ensure/:taskId", methods = {Route.HttpMethod.POST})
95+
public void ensureTaskQueue(@Param String taskId, RoutingContext rc) {
96+
try {
97+
testUtilsBean.ensureQueue(taskId);
98+
rc.response()
99+
.setStatusCode(200)
100+
.end();
101+
} catch (Throwable t) {
102+
errorResponse(t, rc);
103+
}
104+
}
105+
106+
@Route(path = "/test/queue/enqueueTaskStatusUpdateEvent/:taskId", methods = {Route.HttpMethod.POST})
107+
public void enqueueTaskStatusUpdateEvent(@Param String taskId, @Body String body, RoutingContext rc) {
108+
109+
try {
110+
TaskStatusUpdateEvent event = Utils.OBJECT_MAPPER.readValue(body, TaskStatusUpdateEvent.class);
111+
testUtilsBean.enqueueEvent(taskId, event);
112+
rc.response()
113+
.setStatusCode(200)
114+
.end();
115+
} catch (Throwable t) {
116+
errorResponse(t, rc);
117+
}
118+
}
119+
120+
@Route(path = "/test/queue/enqueueTaskArtifactUpdateEvent/:taskId", methods = {Route.HttpMethod.POST})
121+
public void enqueueTaskArtifactUpdateEvent(@Param String taskId, @Body String body, RoutingContext rc) {
122+
123+
try {
124+
TaskArtifactUpdateEvent event = Utils.OBJECT_MAPPER.readValue(body, TaskArtifactUpdateEvent.class);
125+
testUtilsBean.enqueueEvent(taskId, event);
126+
rc.response()
127+
.setStatusCode(200)
128+
.end();
129+
} catch (Throwable t) {
130+
errorResponse(t, rc);
131+
}
132+
}
133+
134+
@Route(path = "/test/streamingSubscribedCount", methods = {Route.HttpMethod.GET}, produces = {TEXT_PLAIN})
135+
public void getStreamingSubscribedCount(RoutingContext rc) {
136+
rc.response()
137+
.setStatusCode(200)
138+
.end(String.valueOf(streamingSubscribedCount.get()));
139+
}
140+
141+
private void errorResponse(Throwable t, RoutingContext rc) {
142+
t.printStackTrace();
143+
rc.response()
144+
.setStatusCode(500)
145+
.putHeader(CONTENT_TYPE, TEXT_PLAIN)
146+
.end();
147+
}
148+
149+
}
Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,12 @@
11
package io.a2a.server.apps.quarkus;
22

3-
import jakarta.inject.Inject;
4-
53
import io.a2a.server.apps.common.AbstractA2AServerTest;
6-
import io.a2a.server.events.InMemoryQueueManager;
7-
import io.a2a.server.tasks.TaskStore;
84
import io.quarkus.test.junit.QuarkusTest;
95

106
@QuarkusTest
117
public class QuarkusA2AServerTest extends AbstractA2AServerTest {
128

13-
@Inject
14-
TaskStore taskStore;
15-
16-
@Inject
17-
InMemoryQueueManager queueManager;
18-
199
public QuarkusA2AServerTest() {
2010
super(8081);
2111
}
22-
23-
@Override
24-
protected TaskStore getTaskStore() {
25-
return taskStore;
26-
}
27-
28-
@Override
29-
protected InMemoryQueueManager getQueueManager() {
30-
return queueManager;
31-
}
32-
33-
@Override
34-
protected void setStreamingSubscribedRunnable(Runnable runnable) {
35-
A2AServerRoutes.setStreamingMultiSseSupportSubscribedRunnable(runnable);
36-
}
3712
}

0 commit comments

Comments
 (0)