Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import com.fasterxml.jackson.databind.JsonNode;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -17,8 +16,10 @@
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.io.JsonEOFException;
import com.fasterxml.jackson.databind.JsonNode;
import io.a2a.server.ExtendedAgentCard;
import io.a2a.server.requesthandlers.JSONRPCHandler;
import io.a2a.server.util.async.Internal;
import io.a2a.spec.AgentCard;
import io.a2a.spec.CancelTaskRequest;
import io.a2a.spec.GetTaskPushNotificationConfigRequest;
Expand All @@ -44,7 +45,6 @@
import io.a2a.spec.TaskResubscriptionRequest;
import io.a2a.spec.UnsupportedOperationError;
import io.a2a.util.Utils;
import io.a2a.server.util.async.Internal;
import io.quarkus.vertx.web.Body;
import io.quarkus.vertx.web.ReactiveRoutes;
import io.quarkus.vertx.web.Route;
Expand All @@ -69,6 +69,7 @@ public class A2AServerRoutes {
Instance<AgentCard> extendedAgentCard;

// Hook so testing can wait until the MultiSseSupport is subscribed.
// Without this we get intermittent failures
private static volatile Runnable streamingMultiSseSupportSubscribedRunnable;

@Inject
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package io.a2a.server.apps.quarkus;

import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static jakarta.ws.rs.core.MediaType.TEXT_PLAIN;

import java.util.concurrent.atomic.AtomicInteger;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import io.a2a.server.apps.common.TestUtilsBean;
import io.a2a.spec.Task;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.util.Utils;
import io.quarkus.vertx.web.Body;
import io.quarkus.vertx.web.Param;
import io.quarkus.vertx.web.Route;
import io.vertx.ext.web.RoutingContext;

/**
* Exposes the {@link TestUtilsBean} via REST using Quarkus Reactive Routes
*/
@Singleton
public class A2ATestRoutes {
@Inject
TestUtilsBean testUtilsBean;

@Inject
A2AServerRoutes a2AServerRoutes;

AtomicInteger streamingSubscribedCount = new AtomicInteger(0);

@PostConstruct
public void init() {
A2AServerRoutes.setStreamingMultiSseSupportSubscribedRunnable(() -> streamingSubscribedCount.incrementAndGet());
}


@Route(path = "/test/task", methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
public void saveTask(@Body String body, RoutingContext rc) {
try {
Task task = Utils.OBJECT_MAPPER.readValue(body, Task.class);
testUtilsBean.saveTask(task);
rc.response()
.setStatusCode(200)
.end();
} catch (Throwable t) {
errorResponse(t, rc);
}
}

@Route(path = "/test/task/:taskId", methods = {Route.HttpMethod.GET}, produces = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
public void getTask(@Param String taskId, RoutingContext rc) {
try {
Task task = testUtilsBean.getTask(taskId);
if (task == null) {
rc.response()
.setStatusCode(404)
.end();
return;
}
rc.response()
.setStatusCode(200)
.putHeader(CONTENT_TYPE, APPLICATION_JSON)
.end(Utils.OBJECT_MAPPER.writeValueAsString(task));

} catch (Throwable t) {
errorResponse(t, rc);
}
}

@Route(path = "/test/task/:taskId", methods = {Route.HttpMethod.DELETE}, type = Route.HandlerType.BLOCKING)
public void deleteTask(@Param String taskId, RoutingContext rc) {
try {
Task task = testUtilsBean.getTask(taskId);
if (task == null) {
rc.response()
.setStatusCode(404)
.end();
return;
}
testUtilsBean.deleteTask(taskId);
rc.response()
.setStatusCode(200)
.end();
} catch (Throwable t) {
errorResponse(t, rc);
}
}

@Route(path = "/test/queue/ensure/:taskId", methods = {Route.HttpMethod.POST})
public void ensureTaskQueue(@Param String taskId, RoutingContext rc) {
try {
testUtilsBean.ensureQueue(taskId);
rc.response()
.setStatusCode(200)
.end();
} catch (Throwable t) {
errorResponse(t, rc);
}
}

@Route(path = "/test/queue/enqueueTaskStatusUpdateEvent/:taskId", methods = {Route.HttpMethod.POST})
public void enqueueTaskStatusUpdateEvent(@Param String taskId, @Body String body, RoutingContext rc) {

try {
TaskStatusUpdateEvent event = Utils.OBJECT_MAPPER.readValue(body, TaskStatusUpdateEvent.class);
testUtilsBean.enqueueEvent(taskId, event);
rc.response()
.setStatusCode(200)
.end();
} catch (Throwable t) {
errorResponse(t, rc);
}
}

@Route(path = "/test/queue/enqueueTaskArtifactUpdateEvent/:taskId", methods = {Route.HttpMethod.POST})
public void enqueueTaskArtifactUpdateEvent(@Param String taskId, @Body String body, RoutingContext rc) {

try {
TaskArtifactUpdateEvent event = Utils.OBJECT_MAPPER.readValue(body, TaskArtifactUpdateEvent.class);
testUtilsBean.enqueueEvent(taskId, event);
rc.response()
.setStatusCode(200)
.end();
} catch (Throwable t) {
errorResponse(t, rc);
}
}

@Route(path = "/test/streamingSubscribedCount", methods = {Route.HttpMethod.GET}, produces = {TEXT_PLAIN})
public void getStreamingSubscribedCount(RoutingContext rc) {
rc.response()
.setStatusCode(200)
.end(String.valueOf(streamingSubscribedCount.get()));
}

private void errorResponse(Throwable t, RoutingContext rc) {
t.printStackTrace();
rc.response()
.setStatusCode(500)
.putHeader(CONTENT_TYPE, TEXT_PLAIN)
.end();
}

}
Original file line number Diff line number Diff line change
@@ -1,37 +1,12 @@
package io.a2a.server.apps.quarkus;

import jakarta.inject.Inject;

import io.a2a.server.apps.common.AbstractA2AServerTest;
import io.a2a.server.events.InMemoryQueueManager;
import io.a2a.server.tasks.TaskStore;
import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class QuarkusA2AServerTest extends AbstractA2AServerTest {

@Inject
TaskStore taskStore;

@Inject
InMemoryQueueManager queueManager;

public QuarkusA2AServerTest() {
super(8081);
}

@Override
protected TaskStore getTaskStore() {
return taskStore;
}

@Override
protected InMemoryQueueManager getQueueManager() {
return queueManager;
}

@Override
protected void setStreamingSubscribedRunnable(Runnable runnable) {
A2AServerRoutes.setStreamingMultiSseSupportSubscribedRunnable(runnable);
}
}
Loading