diff --git a/a2a-java-sdk-server-jakarta.iml b/a2a-java-sdk-server-jakarta.iml
deleted file mode 100644
index 4d4ebba..0000000
--- a/a2a-java-sdk-server-jakarta.iml
+++ /dev/null
@@ -1,9 +0,0 @@
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/grpc/pom.xml b/grpc/pom.xml
new file mode 100644
index 0000000..fe131f6
--- /dev/null
+++ b/grpc/pom.xml
@@ -0,0 +1,53 @@
+
+
+ 4.0.0
+
+
+ org.wildfly.extras.a2a
+ a2a-java-sdk-server-jakarta-parent
+ 0.2.6.Beta1-SNAPSHOT
+
+
+ a2a-java-sdk-server-jakarta-grpc
+ jar
+
+ WildFly Extras - Java A2A SDK for Jakarta gRPC
+ Java SDK for the Agent2Agent Protocol (A2A) - SDK - Jakarta - gRPC integration
+
+
+
+
+ io.github.a2asdk
+ a2a-java-sdk-transport-grpc
+ ${version.sdk}
+ provided
+
+
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+ provided
+
+
+ jakarta.enterprise
+ jakarta.enterprise.cdi-api
+ provided
+
+
+ jakarta.inject
+ jakarta.inject-api
+ provided
+
+
+
+
+ io.github.a2asdk
+ a2a-java-sdk-server-common
+ ${version.sdk}
+ provided
+
+
+
\ No newline at end of file
diff --git a/grpc/src/main/java/org/wildfly/extras/a2a/server/apps/grpc/GrpcBeanInitializer.java b/grpc/src/main/java/org/wildfly/extras/a2a/server/apps/grpc/GrpcBeanInitializer.java
new file mode 100644
index 0000000..e7a8fd4
--- /dev/null
+++ b/grpc/src/main/java/org/wildfly/extras/a2a/server/apps/grpc/GrpcBeanInitializer.java
@@ -0,0 +1,55 @@
+package org.wildfly.extras.a2a.server.apps.grpc;
+
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.Initialized;
+import jakarta.enterprise.event.Observes;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+
+import io.a2a.server.PublicAgentCard;
+import io.a2a.server.requesthandlers.RequestHandler;
+import io.a2a.spec.AgentCard;
+import io.a2a.transport.grpc.handler.CallContextFactory;
+
+/**
+ * Bean initializer that observes application startup events.
+ *
+ * Since CDI is not available on gRPC threads, we capture the CDI beans
+ * during application startup and store them statically for use by
+ * the WildFly gRPC subsystem.
+ */
+@ApplicationScoped
+public class GrpcBeanInitializer {
+
+ @Inject
+ @PublicAgentCard
+ AgentCard agentCard;
+
+ @Inject
+ RequestHandler requestHandler;
+
+ @Inject
+ Instance callContextFactory;
+
+ /**
+ * Observes the application startup event to eagerly initialize the gRPC cache.
+ */
+ public void onStartup(@Observes @Initialized(ApplicationScoped.class) Object init) {
+ System.out.println("*** GrpcBeanInitializer.onStartup() called - ApplicationScoped initialized ***");
+ try {
+ // Cache CDI beans for gRPC threads to use since CDI is not available on those threads
+ CallContextFactory ccf = callContextFactory.isUnsatisfied() ? null : callContextFactory.get();
+ WildFlyGrpcHandler.setStaticBeans(agentCard, requestHandler, ccf);
+ System.out.println("*** GrpcBeanInitializer successfully cached beans: agentCard=" + agentCard + ", requestHandler=" + requestHandler + ", callContextFactory=" + ccf + " ***");
+ } catch (Exception e) {
+ System.err.println("*** GrpcBeanInitializer.onStartup() failed: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @PreDestroy
+ public void cleanup() {
+ WildFlyGrpcHandler.setStaticBeans(null, null, null);
+ }
+}
\ No newline at end of file
diff --git a/grpc/src/main/java/org/wildfly/extras/a2a/server/apps/grpc/WildFlyGrpcHandler.java b/grpc/src/main/java/org/wildfly/extras/a2a/server/apps/grpc/WildFlyGrpcHandler.java
new file mode 100644
index 0000000..aaccd1f
--- /dev/null
+++ b/grpc/src/main/java/org/wildfly/extras/a2a/server/apps/grpc/WildFlyGrpcHandler.java
@@ -0,0 +1,58 @@
+package org.wildfly.extras.a2a.server.apps.grpc;
+
+import io.a2a.server.requesthandlers.RequestHandler;
+import io.a2a.spec.AgentCard;
+import io.a2a.transport.grpc.handler.CallContextFactory;
+import io.a2a.transport.grpc.handler.GrpcHandler;
+
+/**
+ * WildFly gRPC Handler that uses static cache for CDI beans.
+ *
+ * The WildFly gRPC subsystem instantiates this class directly using
+ * reflection and the default constructor, bypassing CDI completely.
+ *
+ * Since CDI is not available on gRPC threads, we use static cache
+ * populated during application startup when CDI is available.
+ */
+public class WildFlyGrpcHandler extends GrpcHandler {
+
+ // Static cache populated during application startup by GrpcBeanInitializer
+ private static volatile AgentCard staticAgentCard;
+ private static volatile RequestHandler staticRequestHandler;
+ private static volatile CallContextFactory staticCallContextFactory;
+
+ public WildFlyGrpcHandler() {
+ // Default constructor - the only one used by WildFly gRPC subsystem
+ }
+
+ /**
+ * Called by GrpcBeanInitializer during CDI initialization to cache beans
+ * for use by gRPC threads where CDI is not available.
+ */
+ static void setStaticBeans(AgentCard agentCard, RequestHandler requestHandler, CallContextFactory callContextFactory) {
+ staticAgentCard = agentCard;
+ staticRequestHandler = requestHandler;
+ staticCallContextFactory = callContextFactory;
+ }
+
+ @Override
+ protected RequestHandler getRequestHandler() {
+ if (staticRequestHandler == null) {
+ throw new RuntimeException("RequestHandler not available. ApplicationStartup may not have run yet.");
+ }
+ return staticRequestHandler;
+ }
+
+ @Override
+ protected AgentCard getAgentCard() {
+ if (staticAgentCard == null) {
+ throw new RuntimeException("AgentCard not available. ApplicationStartup may not have run yet.");
+ }
+ return staticAgentCard;
+ }
+
+ @Override
+ protected CallContextFactory getCallContextFactory() {
+ return staticCallContextFactory; // Can be null if not configured
+ }
+}
diff --git a/grpc/src/main/resources/META-INF/beans.xml b/grpc/src/main/resources/META-INF/beans.xml
new file mode 100644
index 0000000..9b2940f
--- /dev/null
+++ b/grpc/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,6 @@
+
+
+
\ No newline at end of file
diff --git a/impl/pom.xml b/impl/pom.xml
index cf32bd8..15704ac 100644
--- a/impl/pom.xml
+++ b/impl/pom.xml
@@ -20,7 +20,7 @@
io.github.a2asdk
- a2a-java-sdk-server-common
+ a2a-java-sdk-transport-jsonrpc
jakarta.enterprise
diff --git a/impl/src/main/java/org/wildfly/extras/a2a/server/apps/jakarta/A2AServerResource.java b/impl/src/main/java/org/wildfly/extras/a2a/server/apps/jakarta/A2AServerResource.java
index 93a0939..a3e33f8 100644
--- a/impl/src/main/java/org/wildfly/extras/a2a/server/apps/jakarta/A2AServerResource.java
+++ b/impl/src/main/java/org/wildfly/extras/a2a/server/apps/jakarta/A2AServerResource.java
@@ -29,7 +29,6 @@
import io.a2a.server.ServerCallContext;
import io.a2a.server.auth.UnauthenticatedUser;
import io.a2a.server.auth.User;
-import io.a2a.server.requesthandlers.JSONRPCHandler;
import io.a2a.server.util.async.Internal;
import io.a2a.spec.AgentCard;
import io.a2a.spec.CancelTaskRequest;
@@ -56,11 +55,12 @@
import io.a2a.spec.StreamingJSONRPCRequest;
import io.a2a.spec.TaskResubscriptionRequest;
import io.a2a.spec.UnsupportedOperationError;
+import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Path("/")
-public class A2AServerResource {
+ public class A2AServerResource {
private static final Logger LOGGER = LoggerFactory.getLogger(A2AServerResource.class);
diff --git a/pom.xml b/pom.xml
index e6d7143..e2a69b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,9 +45,11 @@
${project.build.directory}${file.separator}wildfly
36.0.1.Final
- ${project.version}
+ 0.3.0.Beta1-SNAPSHOT
1.1.1
+
+ 1.73.0
1.0.0.Final
2.2
@@ -58,6 +60,7 @@
10.0.0.Final
3.3.4
5.5.5
+ 0.1.12.Final
${project.build.directory}${file.separator}wildfly
@@ -117,6 +120,21 @@
a2a-java-sdk-spec
${version.sdk}
+
+ io.github.a2asdk
+ a2a-java-sdk-spec-grpc
+ ${version.sdk}
+
+
+ io.github.a2asdk
+ a2a-java-sdk-transport-grpc
+ ${version.sdk}
+
+
+ io.github.a2asdk
+ a2a-java-sdk-transport-jsonrpc
+ ${version.sdk}
+
io.github.a2asdk
a2a-tck-server
@@ -162,6 +180,37 @@
${version.hamcrest}
test
+
+
+ io.grpc
+ grpc-protobuf
+ ${version.grpc}
+
+
+ io.grpc
+ grpc-stub
+ ${version.grpc}
+
+
+ io.grpc
+ grpc-netty
+ ${version.grpc}
+
+
+ io.grpc
+ grpc-netty-shaded
+ ${version.grpc}
+
+
+ com.google.protobuf
+ protobuf-java
+ 4.31.1
+
+
+ com.google.guava
+ failureaccess
+ 1.0.2
+
@@ -249,9 +298,11 @@
+ grpc
impl
tck
tests
+ tests-grpc
diff --git a/tests-grpc/pom.xml b/tests-grpc/pom.xml
new file mode 100644
index 0000000..320de64
--- /dev/null
+++ b/tests-grpc/pom.xml
@@ -0,0 +1,256 @@
+
+
+ 4.0.0
+
+
+ org.wildfly.extras.a2a
+ a2a-java-sdk-server-jakarta-parent
+ 0.2.6.Beta1-SNAPSHOT
+
+
+ a2a-java-sdk-server-jakarta-tests-grpc
+
+ jar
+
+ WildFly Extras - Java A2A SDK for Jakarta - gRPC Tests
+ Java SDK for the Agent2Agent Protocol (A2A) - SDK - Jakarta - gRPC Tests
+
+
+
+ --stability=preview
+
+
+
+
+
+ ${project.groupId}
+ a2a-java-sdk-server-jakarta-grpc
+ ${project.version}
+ provided
+
+
+
+
+ io.github.a2asdk
+ a2a-java-sdk-spec-grpc
+ test
+
+
+ io.github.a2asdk
+ a2a-java-sdk-transport-grpc
+ test
+
+
+ io.github.a2asdk
+ a2a-java-sdk-spec
+ test
+
+
+ io.github.a2asdk
+ a2a-java-sdk-server-common
+ test
+
+
+ io.github.a2asdk
+ a2a-java-sdk-tests-server-common
+ test
+
+
+ io.github.a2asdk
+ a2a-java-sdk-tests-server-common
+ test-jar
+ test
+
+
+
+ ${project.groupId}
+ a2a-java-sdk-server-jakarta
+ ${project.version}
+ test
+
+
+
+
+ io.grpc
+ grpc-protobuf
+ test
+
+
+ io.grpc
+ grpc-stub
+ test
+
+
+ io.grpc
+ grpc-netty-shaded
+ test
+
+
+ com.google.guava
+ failureaccess
+ test
+
+
+ com.google.protobuf
+ protobuf-java
+ test
+
+
+
+
+ jakarta.enterprise
+ jakarta.enterprise.cdi-api
+ test
+
+
+ jakarta.inject
+ jakarta.inject-api
+ test
+
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+ test
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+ test
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ test
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ test
+
+
+
+
+ org.jboss.arquillian.junit5
+ arquillian-junit5-container
+ test
+
+
+ org.wildfly.arquillian
+ wildfly-arquillian-container-managed
+ test
+
+
+
+
+ org.jboss.logging
+ commons-logging-jboss-logging
+ test
+
+
+ org.jboss.shrinkwrap
+ shrinkwrap-api
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ org.jboss.threads
+ jboss-threads
+ test
+
+
+ org.hamcrest
+ hamcrest
+ test
+
+
+
+
+
+
+
+ org.wildfly.glow
+ wildfly-glow-arquillian-plugin
+
+
+
+ org.wildfly
+ wildfly-galleon-pack
+ ${version.wildfly}
+
+
+
+ org.wildfly.extras.grpc
+ wildfly-grpc-feature-pack
+ ${version.wildfly.grpc}
+
+
+ standalone.xml
+
+
+ grpc
+
+
+
+
+ scan
+
+ scan
+
+ test-compile
+
+
+
+
+
+
+ org.wildfly.plugins
+ wildfly-maven-plugin
+
+ ${project.build.directory}/glow-scan/provisioning.xml
+ ${jboss.home}
+ ${jboss.home}
+
+
+ true
+ preview
+
+
+
+
+
+
+ test-provisioning
+
+ package
+
+ test-compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ ${jboss.home}
+
+ -Dtest.agent.card.port=8080
+ ${server.jboss.args}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tests-grpc/src/scripts/configure_logger.cli b/tests-grpc/src/scripts/configure_logger.cli
new file mode 100644
index 0000000..a45fb24
--- /dev/null
+++ b/tests-grpc/src/scripts/configure_logger.cli
@@ -0,0 +1,2 @@
+/subsystem=logging/logger=org.jboss.weld:add(level=DEBUG)
+/subsystem=logging/logger=io.a2a:add(level=DEBUG)
\ No newline at end of file
diff --git a/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/A2ATestResource.java b/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/A2ATestResource.java
new file mode 100644
index 0000000..95b3865
--- /dev/null
+++ b/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/A2ATestResource.java
@@ -0,0 +1,132 @@
+package org.wildfly.extras.a2a.server.grpc.wildfly;
+
+import static jakarta.ws.rs.core.MediaType.TEXT_PLAIN;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.DELETE;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.POST;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.HttpHeaders;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+
+import io.a2a.server.apps.common.TestUtilsBean;
+import io.a2a.spec.PushNotificationConfig;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskStatusUpdateEvent;
+import io.a2a.transport.grpc.handler.GrpcHandler;
+import io.a2a.util.Utils;
+
+@Path("/test")
+@ApplicationScoped
+public class A2ATestResource {
+ @Inject
+ TestUtilsBean testUtilsBean;
+
+ private final AtomicInteger streamingSubscribedCount = new AtomicInteger(0);
+
+ @PostConstruct
+ public void init() {
+ GrpcHandler.setStreamingSubscribedRunnable(streamingSubscribedCount::incrementAndGet);
+ }
+
+
+ @POST
+ @Path("/task")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response saveTask(String body) throws Exception {
+ Task task = Utils.OBJECT_MAPPER.readValue(body, Task.class);
+ testUtilsBean.saveTask(task);
+ return Response.ok().build();
+ }
+
+ @GET
+ @Path("/task/{taskId}")
+ public Response getTask(@PathParam("taskId") String taskId) throws Exception {
+ Task task = testUtilsBean.getTask(taskId);
+ if (task == null) {
+ return Response.status(404).build();
+ }
+ return Response.ok()
+ .entity(Utils.OBJECT_MAPPER.writeValueAsString(task))
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
+ .build();
+ }
+
+ @DELETE
+ @Path("/task/{taskId}")
+ public Response deleteTask(@PathParam("taskId") String taskId) {
+ Task task = testUtilsBean.getTask(taskId);
+ if (task == null) {
+ return Response.status(404).build();
+ }
+ testUtilsBean.deleteTask(taskId);
+ return Response.ok()
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
+ .build();
+ }
+
+ @POST
+ @Path("/queue/ensure/{taskId}")
+ public Response ensureQueue(@PathParam("taskId") String taskId) {
+ testUtilsBean.ensureQueue(taskId);
+ return Response.ok().build();
+ }
+
+ @POST
+ @Path("/queue/enqueueTaskStatusUpdateEvent/{taskId}")
+ public Response enqueueTaskStatusUpdateEvent(@PathParam("taskId") String taskId, String body) throws Exception {
+ TaskStatusUpdateEvent event = Utils.OBJECT_MAPPER.readValue(body, TaskStatusUpdateEvent.class);
+ testUtilsBean.enqueueEvent(taskId, event);
+ return Response.ok().build();
+ }
+
+ @POST
+ @Path("/queue/enqueueTaskArtifactUpdateEvent/{taskId}")
+ public Response enqueueTaskArtifactUpdateEvent(@PathParam("taskId") String taskId, String body) throws Exception {
+ TaskArtifactUpdateEvent event = Utils.OBJECT_MAPPER.readValue(body, TaskArtifactUpdateEvent.class);
+ testUtilsBean.enqueueEvent(taskId, event);
+ return Response.ok().build();
+ }
+
+ @GET
+ @Path("/streamingSubscribedCount")
+ @Produces(TEXT_PLAIN)
+ public Response getStreamingSubscribedCount() {
+ return Response.ok(String.valueOf(streamingSubscribedCount.get()), TEXT_PLAIN).build();
+ }
+
+ @DELETE
+ @Path("/task/{taskId}/config/{configId}")
+ public Response deleteTaskPushNotificationConfig(@PathParam("taskId") String taskId, @PathParam("configId") String configId) {
+ Task task = testUtilsBean.getTask(taskId);
+ if (task == null) {
+ return Response.status(404).build();
+ }
+ testUtilsBean.deleteTaskPushNotificationConfig(taskId, configId);
+ return Response.ok()
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
+ .build();
+ }
+
+ @POST
+ @Path("/task/{taskId}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response savePushNotificationConfigInStore(@PathParam("taskId") String taskId, String body) throws Exception {
+ PushNotificationConfig notificationConfig = Utils.OBJECT_MAPPER.readValue(body, PushNotificationConfig.class);
+ if (notificationConfig == null) {
+ return Response.status(404).build();
+ }
+ testUtilsBean.saveTaskPushNotificationConfig(taskId, notificationConfig);
+ return Response.ok().build();
+ }
+}
diff --git a/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/RestApplication.java b/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/RestApplication.java
new file mode 100644
index 0000000..a2ee4c9
--- /dev/null
+++ b/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/RestApplication.java
@@ -0,0 +1,8 @@
+package org.wildfly.extras.a2a.server.grpc.wildfly;
+
+import jakarta.ws.rs.ApplicationPath;
+import jakarta.ws.rs.core.Application;
+
+@ApplicationPath("/")
+public class RestApplication extends Application {
+}
\ No newline at end of file
diff --git a/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/WildFlyA2AGrpcTestCase.java b/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/WildFlyA2AGrpcTestCase.java
new file mode 100644
index 0000000..e07c3ee
--- /dev/null
+++ b/tests-grpc/src/test/java/org/wildfly/extras/a2a/server/grpc/wildfly/WildFlyA2AGrpcTestCase.java
@@ -0,0 +1,1321 @@
+package org.wildfly.extras.a2a.server.grpc.wildfly;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.a2a.A2A;
+import io.a2a.client.http.A2AHttpClient;
+import io.a2a.grpc.A2AServiceGrpc;
+import io.a2a.grpc.CancelTaskRequest;
+import io.a2a.grpc.CreateTaskPushNotificationConfigRequest;
+import io.a2a.grpc.DeleteTaskPushNotificationConfigRequest;
+import io.a2a.grpc.GetAgentCardRequest;
+import io.a2a.grpc.GetTaskPushNotificationConfigRequest;
+import io.a2a.grpc.GetTaskRequest;
+import io.a2a.grpc.ListTaskPushNotificationConfigRequest;
+import io.a2a.grpc.ListTaskPushNotificationConfigResponse;
+import io.a2a.grpc.SendMessageRequest;
+import io.a2a.grpc.SendMessageResponse;
+import io.a2a.grpc.StreamResponse;
+import io.a2a.grpc.TaskSubscriptionRequest;
+import io.a2a.grpc.utils.ProtoUtils;
+import io.a2a.server.PublicAgentCard;
+import io.a2a.server.apps.common.AbstractA2AServerTest;
+import io.a2a.spec.Artifact;
+import io.a2a.spec.Event;
+import io.a2a.spec.Message;
+import io.a2a.spec.Part;
+import io.a2a.spec.PushNotificationConfig;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskPushNotificationConfig;
+import io.a2a.spec.TaskState;
+import io.a2a.spec.TaskStatus;
+import io.a2a.spec.TaskStatusUpdateEvent;
+import io.a2a.spec.TextPart;
+import io.a2a.transport.grpc.handler.GrpcHandler;
+import io.a2a.util.Assert;
+import io.a2a.util.Utils;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import mutiny.zero.ZeroPublisher;
+import org.jboss.arquillian.container.test.api.Deployment;
+import org.jboss.arquillian.container.test.api.RunAsClient;
+import org.jboss.arquillian.junit5.ArquillianExtension;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.jboss.shrinkwrap.api.spec.WebArchive;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.wildfly.extras.a2a.server.apps.grpc.WildFlyGrpcHandler;
+
+@ExtendWith(ArquillianExtension.class)
+@RunAsClient
+public class WildFlyA2AGrpcTestCase {
+
+ @Deployment
+ public static WebArchive createDeployment() throws Exception {
+ final JavaArchive[] libraries = List.of(
+ // a2a-java-sdk-server-jakarta-grpc.jar
+ getJarForClass(WildFlyGrpcHandler.class),
+ // a2a-java-sdk-client.jar
+ getJarForClass(A2A.class),
+ // a2a-java-sdk-common.jar
+ getJarForClass(Assert.class),
+ // a2a-java-sdk-http-client
+ getJarForClass(A2AHttpClient.class),
+ // a2a-java-sdk-server-common.jar
+ getJarForClass(PublicAgentCard.class),
+ // a2a-java-sdk-spec.jar
+ getJarForClass(Event.class),
+ //a2a-java-transport-grpc.jar
+ getJarForClass(GrpcHandler.class),
+ // a2a-java-spec-grpc.jar (contains generated gRPC classes)
+ getJarForClass(A2AServiceGrpc.class), // Removing to avoid auto-registration by WildFly gRPC subsystem
+ // protobuf-java.jar - include correct version to match gencode 4.31.1
+ getJarForClass(com.google.protobuf.Message.class),
+ // mutiny-zero.jar. This is provided by some WildFly layers, but not always, and not in
+ // the server provisioned by Glow when inspecting our war
+ getJarForClass(ZeroPublisher.class)).toArray(new JavaArchive[0]);
+
+ return ShrinkWrap.create(WebArchive.class, "ROOT.war")
+ .addAsLibraries(libraries)
+ // Extra dependencies needed by the tests
+ .addPackage(AbstractA2AServerTest.class.getPackage())
+ .addPackage(A2ATestResource.class.getPackage())
+ .addClass(RestApplication.class)
+ .addAsWebInfResource("WEB-INF/web.xml")
+ .addAsWebInfResource("META-INF/beans.xml", "beans.xml");
+ }
+
+ static JavaArchive getJarForClass(Class> clazz) throws Exception {
+ File f = new File(clazz.getProtectionDomain().getCodeSource().getLocation().toURI());
+ return ShrinkWrap.createFromZipFile(JavaArchive.class, f);
+ }
+
+ private ManagedChannel grpcChannel;
+ private A2AServiceGrpc.A2AServiceBlockingStub grpcClient;
+
+ private static final Task MINIMAL_TASK = new Task.Builder()
+ .id("task-123")
+ .contextId("session-xyz")
+ .status(new TaskStatus(TaskState.SUBMITTED))
+ .build();
+
+ private static final Task CANCEL_TASK = new Task.Builder()
+ .id("cancel-task-123")
+ .contextId("session-xyz")
+ .status(new TaskStatus(TaskState.SUBMITTED))
+ .build();
+
+ private static final Task CANCEL_TASK_NOT_SUPPORTED = new Task.Builder()
+ .id("cancel-task-not-supported-123")
+ .contextId("session-xyz")
+ .status(new TaskStatus(TaskState.SUBMITTED))
+ .build();
+
+ private static final Task SEND_MESSAGE_NOT_SUPPORTED = new Task.Builder()
+ .id("task-not-supported-123")
+ .contextId("session-xyz")
+ .status(new TaskStatus(TaskState.SUBMITTED))
+ .build();
+
+ private static final Message MESSAGE = new Message.Builder()
+ .messageId("111")
+ .role(Message.Role.AGENT)
+ .parts(new TextPart("test message"))
+ .build();
+ public static final String APPLICATION_JSON = "application/json";
+
+ private final int serverPort = 8080; // HTTP port (WildFly default)
+ private final int grpcPort = 9555; // gRPC port (from WildFly gRPC configuration)
+
+ @BeforeEach
+ public void setUp() {
+ grpcChannel = ManagedChannelBuilder.forAddress("localhost", grpcPort)
+ .usePlaintext()
+ .build();
+ grpcClient = A2AServiceGrpc.newBlockingStub(grpcChannel);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (grpcChannel != null) {
+ grpcChannel.shutdown();
+ }
+ }
+ @Test
+ public void testTaskStoreMethodsSanityTest() throws Exception {
+ Task task = new Task.Builder(MINIMAL_TASK).id("abcde").build();
+ saveTaskInTaskStore(task);
+ Task saved = getTaskFromTaskStore(task.getId());
+ assertEquals(task.getId(), saved.getId());
+ assertEquals(task.getContextId(), saved.getContextId());
+ assertEquals(task.getStatus().state(), saved.getStatus().state());
+
+ deleteTaskInTaskStore(task.getId());
+ Task saved2 = getTaskFromTaskStore(task.getId());
+ assertNull(saved2);
+ }
+
+ @Test
+ public void testGetTaskSuccess() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ GetTaskRequest request = GetTaskRequest.newBuilder()
+ .setName("tasks/" + MINIMAL_TASK.getId())
+ .build();
+ io.a2a.grpc.Task response = grpcClient.getTask(request);
+ assertEquals("task-123", response.getId());
+ assertEquals("session-xyz", response.getContextId());
+ assertEquals(io.a2a.grpc.TaskState.TASK_STATE_SUBMITTED, response.getStatus().getState());
+ } finally {
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testGetTaskNotFound() throws Exception {
+ assertTrue(getTaskFromTaskStore("non-existent-task") == null);
+ GetTaskRequest request = GetTaskRequest.newBuilder()
+ .setName("tasks/non-existent-task")
+ .build();
+ try {
+ grpcClient.getTask(request);
+ // Should not reach here
+ assertTrue(false, "Expected StatusRuntimeException but method returned normally");
+ } catch (StatusRuntimeException e) {
+ assertEquals(Status.NOT_FOUND.getCode(), e.getStatus().getCode());
+ String description = e.getStatus().getDescription();
+ assertTrue(description != null && description.contains("TaskNotFoundError"));
+ }
+ }
+
+ @Test
+ public void testCancelTaskSuccess() throws Exception {
+ saveTaskInTaskStore(CANCEL_TASK);
+ try {
+ CancelTaskRequest request = CancelTaskRequest.newBuilder()
+ .setName("tasks/" + CANCEL_TASK.getId())
+ .build();
+ io.a2a.grpc.Task response = grpcClient.cancelTask(request);
+ assertEquals(CANCEL_TASK.getId(), response.getId());
+ assertEquals(CANCEL_TASK.getContextId(), response.getContextId());
+ assertEquals(io.a2a.grpc.TaskState.TASK_STATE_CANCELLED, response.getStatus().getState());
+ } finally {
+ deleteTaskInTaskStore(CANCEL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testCancelTaskNotFound() throws Exception {
+ CancelTaskRequest request = CancelTaskRequest.newBuilder()
+ .setName("tasks/non-existent-task")
+ .build();
+ try {
+ grpcClient.cancelTask(request);
+ // Should not reach here
+ assertTrue(false, "Expected StatusRuntimeException but method returned normally");
+ } catch (StatusRuntimeException e) {
+ assertEquals(Status.NOT_FOUND.getCode(), e.getStatus().getCode());
+ String description = e.getStatus().getDescription();
+ assertTrue(description != null && description.contains("TaskNotFoundError"));
+ }
+ }
+
+ @Test
+ public void testCancelTaskNotSupported() throws Exception {
+ saveTaskInTaskStore(CANCEL_TASK_NOT_SUPPORTED);
+ try {
+ CancelTaskRequest request = CancelTaskRequest.newBuilder()
+ .setName("tasks/" + CANCEL_TASK_NOT_SUPPORTED.getId())
+ .build();
+ try {
+ grpcClient.cancelTask(request);
+ // Should not reach here
+ assertTrue(false, "Expected StatusRuntimeException but method returned normally");
+ } catch (StatusRuntimeException e) {
+ assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode());
+ String description = e.getStatus().getDescription();
+ assertTrue(description != null && description.contains("UnsupportedOperationError"));
+ }
+ } finally {
+ deleteTaskInTaskStore(CANCEL_TASK_NOT_SUPPORTED.getId());
+ }
+ }
+
+ @Test
+ public void testSendMessageNewMessageSuccess() throws Exception {
+ assertTrue(getTaskFromTaskStore(MINIMAL_TASK.getId()) == null);
+ Message message = new Message.Builder(MESSAGE)
+ .taskId(MINIMAL_TASK.getId())
+ .contextId(MINIMAL_TASK.getContextId())
+ .build();
+ SendMessageRequest request = SendMessageRequest.newBuilder()
+ .setRequest(ProtoUtils.ToProto.message(message))
+ .build();
+ SendMessageResponse response = grpcClient.sendMessage(request);
+ assertTrue(response.hasMsg());
+ io.a2a.grpc.Message grpcMessage = response.getMsg();
+ // Convert back to spec Message for easier assertions
+ Message messageResponse = ProtoUtils.FromProto.message(grpcMessage);
+ assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId());
+ assertEquals(MESSAGE.getRole(), messageResponse.getRole());
+ Part> part = messageResponse.getParts().get(0);
+ assertEquals(Part.Kind.TEXT, part.getKind());
+ assertEquals("test message", ((TextPart) part).getText());
+ }
+
+ @Test
+ public void testSendMessageExistingTaskSuccess() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ Message message = new Message.Builder(MESSAGE)
+ .taskId(MINIMAL_TASK.getId())
+ .contextId(MINIMAL_TASK.getContextId())
+ .build();
+
+ SendMessageRequest request = SendMessageRequest.newBuilder()
+ .setRequest(ProtoUtils.ToProto.message(message))
+ .build();
+ SendMessageResponse response = grpcClient.sendMessage(request);
+
+ assertTrue(response.hasMsg());
+ io.a2a.grpc.Message grpcMessage = response.getMsg();
+ // Convert back to spec Message for easier assertions
+ Message messageResponse = ProtoUtils.FromProto.message(grpcMessage);
+ assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId());
+ assertEquals(MESSAGE.getRole(), messageResponse.getRole());
+ Part> part = messageResponse.getParts().get(0);
+ assertEquals(Part.Kind.TEXT, part.getKind());
+ assertEquals("test message", ((TextPart) part).getText());
+ } finally {
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testError() throws Exception {
+ Message message = new Message.Builder(MESSAGE)
+ .taskId(SEND_MESSAGE_NOT_SUPPORTED.getId())
+ .contextId(SEND_MESSAGE_NOT_SUPPORTED.getContextId())
+ .build();
+
+ SendMessageRequest request = SendMessageRequest.newBuilder()
+ .setRequest(ProtoUtils.ToProto.message(message))
+ .build();
+
+ try {
+ grpcClient.sendMessage(request);
+ // Should not reach here
+ assertTrue(false, "Expected StatusRuntimeException but method returned normally");
+ } catch (StatusRuntimeException e) {
+ assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode());
+ String description = e.getStatus().getDescription();
+ assertTrue(description != null && description.contains("UnsupportedOperationError"));
+ }
+ }
+
+ @Test
+ public void testGetAgentCard() throws Exception {
+ // Test gRPC getAgentCard method
+ GetAgentCardRequest request = GetAgentCardRequest.newBuilder().build();
+
+ io.a2a.grpc.AgentCard grpcAgentCard = grpcClient.getAgentCard(request);
+
+ // Verify the expected agent card fields directly on the gRPC response
+ assertNotNull(grpcAgentCard);
+ assertEquals("test-card", grpcAgentCard.getName());
+ assertEquals("A test agent card", grpcAgentCard.getDescription());
+ assertEquals("http://localhost:" + serverPort, grpcAgentCard.getUrl()); // Use dynamic port
+ assertEquals("1.0", grpcAgentCard.getVersion());
+ assertEquals("http://example.com/docs", grpcAgentCard.getDocumentationUrl());
+ assertTrue(grpcAgentCard.getCapabilities().getPushNotifications());
+ assertTrue(grpcAgentCard.getCapabilities().getStreaming());
+ // Note: stateTransitionHistory is not present in gRPC AgentCapabilities
+ assertTrue(grpcAgentCard.getSkillsList().isEmpty());
+ }
+
+ @Test
+ public void testGetExtendAgentCardNotSupported() {
+ // NOTE: This test is not applicable to gRPC since extended agent card retrieval
+ // is an HTTP/REST-specific feature that tests the /agent/authenticatedExtendedCard endpoint.
+ // gRPC handles agent capabilities differently through service definitions.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testMalformedJSONRPCRequest() {
+ // NOTE: This test is not applicable to gRPC since it tests JSON-RPC protocol-specific
+ // JSON parsing errors. gRPC uses Protocol Buffers for serialization and has its own
+ // parsing and validation mechanisms.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testInvalidParamsJSONRPCRequest() {
+ // NOTE: This test is not applicable to gRPC since it tests JSON-RPC protocol-specific
+ // parameter validation errors. gRPC uses strongly-typed Protocol Buffer messages
+ // which provide built-in type safety and validation.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testInvalidJSONRPCRequestMissingJsonrpc() {
+ // NOTE: This test is not applicable to gRPC since it tests JSON-RPC protocol-specific
+ // validation of the "jsonrpc" field. gRPC does not use JSON-RPC protocol elements.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testInvalidJSONRPCRequestMissingMethod() {
+ // NOTE: This test is not applicable to gRPC since it tests JSON-RPC protocol-specific
+ // validation of the "method" field. gRPC methods are defined in the service definition
+ // and invoked directly, not through JSON-RPC method names.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testInvalidJSONRPCRequestInvalidId() {
+ // NOTE: This test is not applicable to gRPC since it tests JSON-RPC protocol-specific
+ // validation of the "id" field. gRPC handles request/response correlation differently
+ // through its streaming mechanisms.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testInvalidJSONRPCRequestNonExistentMethod() {
+ // NOTE: This test is not applicable to gRPC since it tests JSON-RPC protocol-specific
+ // method not found errors. gRPC method resolution is handled at the service definition
+ // level and unknown methods result in different error types.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testNonStreamingMethodWithAcceptHeader() throws Exception {
+ // NOTE: This test is not applicable to gRPC since HTTP Accept headers
+ // are an HTTP/REST-specific concept and do not apply to gRPC protocol.
+ // gRPC uses Protocol Buffers for message encoding and doesn't use HTTP content negotiation.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testSetPushNotificationSuccess() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ // Create a PushNotificationConfig with an ID (needed for gRPC conversion)
+ PushNotificationConfig pushConfig = new PushNotificationConfig.Builder()
+ .url("http://example.com")
+ .id(MINIMAL_TASK.getId()) // Using task ID as config ID for simplicity
+ .build();
+ TaskPushNotificationConfig taskPushConfig =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig);
+
+ CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId(MINIMAL_TASK.getId())
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig))
+ .build();
+
+ io.a2a.grpc.TaskPushNotificationConfig response = grpcClient.createTaskPushNotificationConfig(request);
+
+ // Convert back to spec for easier assertions
+ TaskPushNotificationConfig responseConfig = ProtoUtils.FromProto.taskPushNotificationConfig(response);
+ assertEquals(MINIMAL_TASK.getId(), responseConfig.taskId());
+ assertEquals("http://example.com", responseConfig.pushNotificationConfig().url());
+ } finally {
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), MINIMAL_TASK.getId());
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testGetPushNotificationSuccess() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ // First, create a push notification config (same as previous test)
+ PushNotificationConfig pushConfig = new PushNotificationConfig.Builder()
+ .url("http://example.com")
+ .id(MINIMAL_TASK.getId())
+ .build();
+ TaskPushNotificationConfig taskPushConfig =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig);
+
+ CreateTaskPushNotificationConfigRequest createRequest = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId(MINIMAL_TASK.getId())
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig))
+ .build();
+
+ io.a2a.grpc.TaskPushNotificationConfig createResponse = grpcClient.createTaskPushNotificationConfig(createRequest);
+ assertNotNull(createResponse);
+
+ // Now, get the push notification config
+ GetTaskPushNotificationConfigRequest getRequest = GetTaskPushNotificationConfigRequest.newBuilder()
+ .setName("tasks/" + MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + MINIMAL_TASK.getId())
+ .build();
+
+ io.a2a.grpc.TaskPushNotificationConfig getResponse = grpcClient.getTaskPushNotificationConfig(getRequest);
+
+ // Convert back to spec for easier assertions
+ TaskPushNotificationConfig responseConfig = ProtoUtils.FromProto.taskPushNotificationConfig(getResponse);
+ assertEquals(MINIMAL_TASK.getId(), responseConfig.taskId());
+ assertEquals("http://example.com", responseConfig.pushNotificationConfig().url());
+ } finally {
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), MINIMAL_TASK.getId());
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testListPushNotificationConfigWithConfigId() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ // Create first push notification config
+ PushNotificationConfig pushConfig1 = new PushNotificationConfig.Builder()
+ .url("http://example.com")
+ .id("config1")
+ .build();
+ TaskPushNotificationConfig taskPushConfig1 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig1);
+
+ CreateTaskPushNotificationConfigRequest createRequest1 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId("config1")
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig1))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest1);
+
+ // Create second push notification config
+ PushNotificationConfig pushConfig2 = new PushNotificationConfig.Builder()
+ .url("http://example.com")
+ .id("config2")
+ .build();
+ TaskPushNotificationConfig taskPushConfig2 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig2);
+
+ CreateTaskPushNotificationConfigRequest createRequest2 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId("config2")
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig2))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest2);
+
+ // Now, list all push notification configs for the task
+ ListTaskPushNotificationConfigRequest listRequest = ListTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .build();
+
+ ListTaskPushNotificationConfigResponse listResponse = grpcClient.listTaskPushNotificationConfig(listRequest);
+
+ // Verify the response
+ assertEquals(2, listResponse.getConfigsCount());
+
+ // Convert back to spec for easier assertions
+ TaskPushNotificationConfig config1 = ProtoUtils.FromProto.taskPushNotificationConfig(listResponse.getConfigs(0));
+ TaskPushNotificationConfig config2 = ProtoUtils.FromProto.taskPushNotificationConfig(listResponse.getConfigs(1));
+
+ assertEquals(MINIMAL_TASK.getId(), config1.taskId());
+ assertEquals("http://example.com", config1.pushNotificationConfig().url());
+ assertEquals("config1", config1.pushNotificationConfig().id());
+
+ assertEquals(MINIMAL_TASK.getId(), config2.taskId());
+ assertEquals("http://example.com", config2.pushNotificationConfig().url());
+ assertEquals("config2", config2.pushNotificationConfig().id());
+ } finally {
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), "config1");
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), "config2");
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testListPushNotificationConfigWithoutConfigId() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ // Create first push notification config without explicit ID (will use task ID as default)
+ PushNotificationConfig pushConfig1 = new PushNotificationConfig.Builder()
+ .url("http://1.example.com")
+ .id(MINIMAL_TASK.getId()) // Use task ID as config ID
+ .build();
+ TaskPushNotificationConfig taskPushConfig1 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig1);
+
+ CreateTaskPushNotificationConfigRequest createRequest1 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId(MINIMAL_TASK.getId())
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig1))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest1);
+
+ // Create second push notification config with same ID (will overwrite the previous one)
+ PushNotificationConfig pushConfig2 = new PushNotificationConfig.Builder()
+ .url("http://2.example.com")
+ .id(MINIMAL_TASK.getId()) // Same ID, will overwrite
+ .build();
+ TaskPushNotificationConfig taskPushConfig2 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig2);
+
+ CreateTaskPushNotificationConfigRequest createRequest2 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId(MINIMAL_TASK.getId())
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig2))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest2);
+
+ // Now, list all push notification configs for the task
+ ListTaskPushNotificationConfigRequest listRequest = ListTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .build();
+
+ ListTaskPushNotificationConfigResponse listResponse = grpcClient.listTaskPushNotificationConfig(listRequest);
+
+ // Verify only 1 config exists (second one overwrote the first)
+ assertEquals(1, listResponse.getConfigsCount());
+
+ // Convert back to spec for easier assertions
+ TaskPushNotificationConfig config = ProtoUtils.FromProto.taskPushNotificationConfig(listResponse.getConfigs(0));
+
+ assertEquals(MINIMAL_TASK.getId(), config.taskId());
+ assertEquals("http://2.example.com", config.pushNotificationConfig().url());
+ assertEquals(MINIMAL_TASK.getId(), config.pushNotificationConfig().id());
+ } finally {
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), MINIMAL_TASK.getId());
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testListPushNotificationConfigTaskNotFound() throws Exception {
+ ListTaskPushNotificationConfigRequest listRequest = ListTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/non-existent-task")
+ .build();
+
+ try {
+ grpcClient.listTaskPushNotificationConfig(listRequest);
+ // Should not reach here
+ assertTrue(false, "Expected StatusRuntimeException but method returned normally");
+ } catch (StatusRuntimeException e) {
+ assertEquals(Status.NOT_FOUND.getCode(), e.getStatus().getCode());
+ String description = e.getStatus().getDescription();
+ assertTrue(description != null && description.contains("TaskNotFoundError"));
+ }
+ }
+
+ @Test
+ public void testListPushNotificationConfigEmptyList() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ // List configs for a task that has no configs
+ ListTaskPushNotificationConfigRequest listRequest = ListTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .build();
+
+ ListTaskPushNotificationConfigResponse listResponse = grpcClient.listTaskPushNotificationConfig(listRequest);
+
+ // Verify empty list
+ assertEquals(0, listResponse.getConfigsCount());
+ } finally {
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testDeletePushNotificationConfigWithValidConfigId() throws Exception {
+ // Create a second task for testing cross-task isolation
+ Task secondTask = new Task.Builder()
+ .id("task-456")
+ .contextId("session-xyz")
+ .status(new TaskStatus(TaskState.SUBMITTED))
+ .build();
+
+ saveTaskInTaskStore(MINIMAL_TASK);
+ saveTaskInTaskStore(secondTask);
+ try {
+ // Create config1 and config2 for MINIMAL_TASK
+ PushNotificationConfig pushConfig1 = new PushNotificationConfig.Builder()
+ .url("http://example.com")
+ .id("config1")
+ .build();
+ TaskPushNotificationConfig taskPushConfig1 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig1);
+
+ CreateTaskPushNotificationConfigRequest createRequest1 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId("config1")
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig1))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest1);
+
+ PushNotificationConfig pushConfig2 = new PushNotificationConfig.Builder()
+ .url("http://example.com")
+ .id("config2")
+ .build();
+ TaskPushNotificationConfig taskPushConfig2 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig2);
+
+ CreateTaskPushNotificationConfigRequest createRequest2 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId("config2")
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig2))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest2);
+
+ // Create config1 for secondTask
+ TaskPushNotificationConfig taskPushConfig3 =
+ new TaskPushNotificationConfig(secondTask.getId(), pushConfig1);
+
+ CreateTaskPushNotificationConfigRequest createRequest3 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + secondTask.getId())
+ .setConfigId("config1")
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig3))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest3);
+
+ // Delete config1 from MINIMAL_TASK
+ DeleteTaskPushNotificationConfigRequest deleteRequest = DeleteTaskPushNotificationConfigRequest.newBuilder()
+ .setName("tasks/" + MINIMAL_TASK.getId() + "/pushNotificationConfigs/config1")
+ .build();
+
+ com.google.protobuf.Empty deleteResponse = grpcClient.deleteTaskPushNotificationConfig(deleteRequest);
+ assertNotNull(deleteResponse); // Should return Empty, not null
+
+ // Verify MINIMAL_TASK now has only 1 config (config2)
+ ListTaskPushNotificationConfigRequest listRequest1 = ListTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .build();
+ ListTaskPushNotificationConfigResponse listResponse1 = grpcClient.listTaskPushNotificationConfig(listRequest1);
+ assertEquals(1, listResponse1.getConfigsCount());
+
+ TaskPushNotificationConfig remainingConfig = ProtoUtils.FromProto.taskPushNotificationConfig(listResponse1.getConfigs(0));
+ assertEquals("config2", remainingConfig.pushNotificationConfig().id());
+
+ // Verify secondTask remains unchanged (still has config1)
+ ListTaskPushNotificationConfigRequest listRequest2 = ListTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + secondTask.getId())
+ .build();
+ ListTaskPushNotificationConfigResponse listResponse2 = grpcClient.listTaskPushNotificationConfig(listRequest2);
+ assertEquals(1, listResponse2.getConfigsCount());
+
+ TaskPushNotificationConfig secondTaskConfig = ProtoUtils.FromProto.taskPushNotificationConfig(listResponse2.getConfigs(0));
+ assertEquals("config1", secondTaskConfig.pushNotificationConfig().id());
+
+ } finally {
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), "config1");
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), "config2");
+ deletePushNotificationConfigInStore(secondTask.getId(), "config1");
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ deleteTaskInTaskStore(secondTask.getId());
+ }
+ }
+
+ @Test
+ public void testDeletePushNotificationConfigWithNonExistingConfigId() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ // Create config1 and config2
+ PushNotificationConfig pushConfig1 = new PushNotificationConfig.Builder()
+ .url("http://example.com")
+ .id("config1")
+ .build();
+ TaskPushNotificationConfig taskPushConfig1 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig1);
+
+ CreateTaskPushNotificationConfigRequest createRequest1 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId("config1")
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig1))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest1);
+
+ PushNotificationConfig pushConfig2 = new PushNotificationConfig.Builder()
+ .url("http://example.com")
+ .id("config2")
+ .build();
+ TaskPushNotificationConfig taskPushConfig2 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig2);
+
+ CreateTaskPushNotificationConfigRequest createRequest2 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId("config2")
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig2))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest2);
+
+ // Try to delete non-existent config (should succeed silently)
+ DeleteTaskPushNotificationConfigRequest deleteRequest = DeleteTaskPushNotificationConfigRequest.newBuilder()
+ .setName("tasks/" + MINIMAL_TASK.getId() + "/pushNotificationConfigs/non-existent-config-id")
+ .build();
+
+ com.google.protobuf.Empty deleteResponse = grpcClient.deleteTaskPushNotificationConfig(deleteRequest);
+ assertNotNull(deleteResponse); // Should return Empty, not throw error
+
+ // Verify both configs remain unchanged
+ ListTaskPushNotificationConfigRequest listRequest = ListTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .build();
+ ListTaskPushNotificationConfigResponse listResponse = grpcClient.listTaskPushNotificationConfig(listRequest);
+ assertEquals(2, listResponse.getConfigsCount());
+
+ } finally {
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), "config1");
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), "config2");
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testDeletePushNotificationConfigTaskNotFound() throws Exception {
+ DeleteTaskPushNotificationConfigRequest deleteRequest = DeleteTaskPushNotificationConfigRequest.newBuilder()
+ .setName("tasks/non-existent-task/pushNotificationConfigs/non-existent-config-id")
+ .build();
+
+ try {
+ grpcClient.deleteTaskPushNotificationConfig(deleteRequest);
+ // Should not reach here
+ assertTrue(false, "Expected StatusRuntimeException but method returned normally");
+ } catch (StatusRuntimeException e) {
+ assertEquals(Status.NOT_FOUND.getCode(), e.getStatus().getCode());
+ String description = e.getStatus().getDescription();
+ assertTrue(description != null && description.contains("TaskNotFoundError"));
+ }
+ }
+
+ @Test
+ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ // Create first config without explicit ID (will use task ID as default)
+ PushNotificationConfig pushConfig1 = new PushNotificationConfig.Builder()
+ .url("http://1.example.com")
+ .id(MINIMAL_TASK.getId())
+ .build();
+ TaskPushNotificationConfig taskPushConfig1 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig1);
+
+ CreateTaskPushNotificationConfigRequest createRequest1 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId(MINIMAL_TASK.getId())
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig1))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest1);
+
+ // Create second config with same ID (will overwrite the previous one)
+ PushNotificationConfig pushConfig2 = new PushNotificationConfig.Builder()
+ .url("http://2.example.com")
+ .id(MINIMAL_TASK.getId())
+ .build();
+ TaskPushNotificationConfig taskPushConfig2 =
+ new TaskPushNotificationConfig(MINIMAL_TASK.getId(), pushConfig2);
+
+ CreateTaskPushNotificationConfigRequest createRequest2 = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .setConfigId(MINIMAL_TASK.getId())
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushConfig2))
+ .build();
+ grpcClient.createTaskPushNotificationConfig(createRequest2);
+
+ // Delete the config using task ID
+ DeleteTaskPushNotificationConfigRequest deleteRequest = DeleteTaskPushNotificationConfigRequest.newBuilder()
+ .setName("tasks/" + MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + MINIMAL_TASK.getId())
+ .build();
+
+ com.google.protobuf.Empty deleteResponse = grpcClient.deleteTaskPushNotificationConfig(deleteRequest);
+ assertNotNull(deleteResponse); // Should return Empty
+
+ // Verify no configs remain
+ ListTaskPushNotificationConfigRequest listRequest = ListTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + MINIMAL_TASK.getId())
+ .build();
+ ListTaskPushNotificationConfigResponse listResponse = grpcClient.listTaskPushNotificationConfig(listRequest);
+ assertEquals(0, listResponse.getConfigsCount());
+
+ } finally {
+ deletePushNotificationConfigInStore(MINIMAL_TASK.getId(), MINIMAL_TASK.getId());
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testSendMessageStreamExistingTaskSuccess() throws Exception {
+ saveTaskInTaskStore(MINIMAL_TASK);
+ try {
+ // Build message for existing task
+ Message message = new Message.Builder(MESSAGE)
+ .taskId(MINIMAL_TASK.getId())
+ .contextId(MINIMAL_TASK.getContextId())
+ .build();
+
+ // Create gRPC streaming request
+ SendMessageRequest request = SendMessageRequest.newBuilder()
+ .setRequest(ProtoUtils.ToProto.message(message))
+ .build();
+
+ // Use blocking iterator to consume stream responses
+ java.util.Iterator responseIterator = grpcClient.sendStreamingMessage(request);
+
+ // Collect responses - expect at least one
+ java.util.List responses = new java.util.ArrayList<>();
+ while (responseIterator.hasNext()) {
+ StreamResponse response = responseIterator.next();
+ responses.add(response);
+
+ // For this test, we expect to get the message back - stop after first response
+ if (response.hasMsg()) {
+ break;
+ }
+ }
+
+ // Verify we got at least one response
+ assertTrue(responses.size() >= 1, "Expected at least one response from streaming call");
+
+ // Find the message response
+ StreamResponse messageResponse = null;
+ for (StreamResponse response : responses) {
+ if (response.hasMsg()) {
+ messageResponse = response;
+ break;
+ }
+ }
+
+ assertNotNull(messageResponse, "Expected to receive a message response");
+
+ // Verify the message content
+ io.a2a.grpc.Message grpcMessage = messageResponse.getMsg();
+ Message responseMessage = ProtoUtils.FromProto.message(grpcMessage);
+ assertEquals(MESSAGE.getMessageId(), responseMessage.getMessageId());
+ assertEquals(MESSAGE.getRole(), responseMessage.getRole());
+ Part> part = responseMessage.getParts().get(0);
+ assertEquals(Part.Kind.TEXT, part.getKind());
+ assertEquals("test message", ((TextPart) part).getText());
+
+ } finally {
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ }
+ }
+
+ @Test
+ public void testStreamingMethodWithAcceptHeader() throws Exception {
+ // NOTE: This test is not applicable to gRPC since HTTP Accept headers
+ // are an HTTP/REST-specific concept and do not apply to gRPC protocol.
+ // gRPC uses Protocol Buffers for message encoding and doesn't use HTTP content negotiation.
+
+ // This stub is maintained to preserve method order compatibility with AbstractA2AServerTest
+ // for future migration when extending that base class.
+ }
+
+ @Test
+ public void testSendMessageStreamNewMessageSuccess() throws Exception {
+ // Ensure no task exists initially (test creates new task via streaming)
+ assertTrue(getTaskFromTaskStore(MINIMAL_TASK.getId()) == null, "Task should not exist initially");
+
+ try {
+ // Build message for new task (no pre-existing task)
+ Message message = new Message.Builder(MESSAGE)
+ .taskId(MINIMAL_TASK.getId())
+ .contextId(MINIMAL_TASK.getContextId())
+ .build();
+
+ // Create gRPC streaming request
+ SendMessageRequest request = SendMessageRequest.newBuilder()
+ .setRequest(ProtoUtils.ToProto.message(message))
+ .build();
+
+ // Use blocking iterator to consume stream responses
+ java.util.Iterator responseIterator = grpcClient.sendStreamingMessage(request);
+
+ // Collect responses - expect at least one
+ java.util.List responses = new java.util.ArrayList<>();
+ while (responseIterator.hasNext()) {
+ StreamResponse response = responseIterator.next();
+ responses.add(response);
+
+ // For this test, we expect to get the message back - stop after first response
+ if (response.hasMsg()) {
+ break;
+ }
+ }
+
+ // Verify we got at least one response
+ assertTrue(responses.size() >= 1, "Expected at least one response from streaming call");
+
+ // Find the message response
+ StreamResponse messageResponse = null;
+ for (StreamResponse response : responses) {
+ if (response.hasMsg()) {
+ messageResponse = response;
+ break;
+ }
+ }
+
+ assertNotNull(messageResponse, "Expected to receive a message response");
+
+ // Verify the message content
+ io.a2a.grpc.Message grpcMessage = messageResponse.getMsg();
+ Message responseMessage = ProtoUtils.FromProto.message(grpcMessage);
+ assertEquals(MESSAGE.getMessageId(), responseMessage.getMessageId());
+ assertEquals(MESSAGE.getRole(), responseMessage.getRole());
+ Part> part = responseMessage.getParts().get(0);
+ assertEquals(Part.Kind.TEXT, part.getKind());
+ assertEquals("test message", ((TextPart) part).getText());
+
+ } finally {
+ // Clean up any task that may have been created (ignore if task doesn't exist)
+ try {
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ } catch (RuntimeException e) {
+ // Ignore if task doesn't exist (404 error)
+ if (!e.getMessage().contains("404")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testResubscribeExistingTaskSuccess() throws Exception {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ saveTaskInTaskStore(MINIMAL_TASK);
+
+ try {
+ // Ensure queue for task exists (required for resubscription)
+ ensureQueueForTask(MINIMAL_TASK.getId());
+
+ CountDownLatch taskResubscriptionRequestSent = new CountDownLatch(1);
+ CountDownLatch taskResubscriptionResponseReceived = new CountDownLatch(2);
+ AtomicReference firstResponse = new AtomicReference<>();
+ AtomicReference secondResponse = new AtomicReference<>();
+
+ // Create gRPC task subscription request
+ TaskSubscriptionRequest subscriptionRequest = TaskSubscriptionRequest.newBuilder()
+ .setName("tasks/" + MINIMAL_TASK.getId())
+ .build();
+
+ // Count down the latch when the gRPC streaming subscription is established
+ awaitStreamingSubscription()
+ .whenComplete((unused, throwable) -> taskResubscriptionRequestSent.countDown());
+
+ AtomicReference errorRef = new AtomicReference<>();
+
+ // Start the subscription in a separate thread
+ executorService.submit(() -> {
+ try {
+ java.util.Iterator responseIterator = grpcClient.taskSubscription(subscriptionRequest);
+
+ while (responseIterator.hasNext()) {
+ StreamResponse response = responseIterator.next();
+
+ if (taskResubscriptionResponseReceived.getCount() == 2) {
+ firstResponse.set(response);
+ } else {
+ secondResponse.set(response);
+ }
+ taskResubscriptionResponseReceived.countDown();
+
+ if (taskResubscriptionResponseReceived.getCount() == 0) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ errorRef.set(e);
+ // Count down both latches to unblock the test
+ taskResubscriptionRequestSent.countDown();
+ while (taskResubscriptionResponseReceived.getCount() > 0) {
+ taskResubscriptionResponseReceived.countDown();
+ }
+ }
+ });
+
+ // Wait for subscription to be established
+ assertTrue(taskResubscriptionRequestSent.await(10, TimeUnit.SECONDS), "Subscription should be established");
+
+ // Inject events into the server's event queue
+ java.util.List events = java.util.List.of(
+ new TaskArtifactUpdateEvent.Builder()
+ .taskId(MINIMAL_TASK.getId())
+ .contextId(MINIMAL_TASK.getContextId())
+ .artifact(new Artifact.Builder()
+ .artifactId("11")
+ .parts(new TextPart("text"))
+ .build())
+ .build(),
+ new TaskStatusUpdateEvent.Builder()
+ .taskId(MINIMAL_TASK.getId())
+ .contextId(MINIMAL_TASK.getContextId())
+ .status(new TaskStatus(TaskState.COMPLETED))
+ .isFinal(true)
+ .build());
+
+ for (Event event : events) {
+ enqueueEventOnServer(event);
+ }
+
+ // Wait for the client to receive the responses
+ assertTrue(taskResubscriptionResponseReceived.await(20, TimeUnit.SECONDS), "Should receive both responses");
+
+ // Check for errors
+ if (errorRef.get() != null) {
+ throw new RuntimeException("Error in subscription thread", errorRef.get());
+ }
+
+ // Verify first response (TaskArtifactUpdateEvent)
+ assertNotNull(firstResponse.get(), "Should receive first response");
+ StreamResponse firstStreamResponse = firstResponse.get();
+ assertTrue(firstStreamResponse.hasArtifactUpdate(), "First response should be artifact update");
+
+ io.a2a.grpc.TaskArtifactUpdateEvent artifactUpdate = firstStreamResponse.getArtifactUpdate();
+ assertEquals(MINIMAL_TASK.getId(), artifactUpdate.getTaskId());
+ assertEquals(MINIMAL_TASK.getContextId(), artifactUpdate.getContextId());
+ assertEquals("11", artifactUpdate.getArtifact().getArtifactId());
+ assertEquals("text", artifactUpdate.getArtifact().getParts(0).getText());
+
+ // Verify second response (TaskStatusUpdateEvent)
+ assertNotNull(secondResponse.get(), "Should receive second response");
+ StreamResponse secondStreamResponse = secondResponse.get();
+ assertTrue(secondStreamResponse.hasStatusUpdate(), "Second response should be status update");
+
+ io.a2a.grpc.TaskStatusUpdateEvent statusUpdate = secondStreamResponse.getStatusUpdate();
+ assertEquals(MINIMAL_TASK.getId(), statusUpdate.getTaskId());
+ assertEquals(MINIMAL_TASK.getContextId(), statusUpdate.getContextId());
+ assertEquals(io.a2a.grpc.TaskState.TASK_STATE_COMPLETED, statusUpdate.getStatus().getState());
+ assertTrue(statusUpdate.getFinal(), "Final status update should be marked as final");
+
+ } finally {
+ deleteTaskInTaskStore(MINIMAL_TASK.getId());
+ executorService.shutdown();
+ if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ }
+ }
+
+ @Test
+ public void testResubscribeNoExistingTaskError() throws Exception {
+ // Try to resubscribe to a non-existent task - should get TaskNotFoundError
+ TaskSubscriptionRequest request = TaskSubscriptionRequest.newBuilder()
+ .setName("tasks/non-existent-task")
+ .build();
+
+ try {
+ // Use blocking iterator to consume stream responses
+ java.util.Iterator responseIterator = grpcClient.taskSubscription(request);
+
+ // Try to get first response - should throw StatusRuntimeException
+ if (responseIterator.hasNext()) {
+ responseIterator.next();
+ }
+
+ // Should not reach here
+ assertTrue(false, "Expected StatusRuntimeException but method returned normally");
+ } catch (StatusRuntimeException e) {
+ // Verify this is a TaskNotFoundError mapped to NOT_FOUND status
+ assertEquals(Status.NOT_FOUND.getCode(), e.getStatus().getCode());
+ String description = e.getStatus().getDescription();
+ assertTrue(description != null && description.contains("TaskNotFoundError"));
+ }
+ }
+
+
+ protected void saveTaskInTaskStore(Task task) throws Exception {
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + serverPort + "/test/task"))
+ .POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(task)))
+ .header("Content-Type", APPLICATION_JSON)
+ .build();
+
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ if (response.statusCode() != 200) {
+ throw new RuntimeException(
+ String.format("Saving task failed! Status: %d, Body: %s", response.statusCode(), response.body()));
+ }
+ }
+
+ protected Task getTaskFromTaskStore(String taskId) throws Exception {
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + serverPort + "/test/task/" + taskId))
+ .GET()
+ .build();
+
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ if (response.statusCode() == 404) {
+ return null;
+ }
+ if (response.statusCode() != 200) {
+ throw new RuntimeException(String.format("Getting task failed! Status: %d, Body: %s", response.statusCode(), response.body()));
+ }
+ return Utils.OBJECT_MAPPER.readValue(response.body(), Task.TYPE_REFERENCE);
+ }
+
+ protected void deleteTaskInTaskStore(String taskId) throws Exception {
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(("http://localhost:" + serverPort + "/test/task/" + taskId)))
+ .DELETE()
+ .build();
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ if (response.statusCode() != 200) {
+ throw new RuntimeException(response.statusCode() + ": Deleting task failed!" + response.body());
+ }
+ }
+
+ protected void ensureQueueForTask(String taskId) throws Exception {
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + serverPort + "/test/queue/ensure/" + taskId))
+ .POST(HttpRequest.BodyPublishers.noBody())
+ .build();
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ if (response.statusCode() != 200) {
+ throw new RuntimeException(String.format("Ensuring queue failed! Status: %d, Body: %s", response.statusCode(), response.body()));
+ }
+ }
+
+ protected void enqueueEventOnServer(Event event) throws Exception {
+ String path;
+ if (event instanceof TaskArtifactUpdateEvent) {
+ TaskArtifactUpdateEvent e = (TaskArtifactUpdateEvent) event;
+ path = "test/queue/enqueueTaskArtifactUpdateEvent/" + e.getTaskId();
+ } else if (event instanceof TaskStatusUpdateEvent) {
+ TaskStatusUpdateEvent e = (TaskStatusUpdateEvent) event;
+ path = "test/queue/enqueueTaskStatusUpdateEvent/" + e.getTaskId();
+ } else {
+ throw new RuntimeException("Unknown event type " + event.getClass() + ". If you need the ability to" +
+ " handle more types, please add the REST endpoints.");
+ }
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + serverPort + "/" + path))
+ .header("Content-Type", APPLICATION_JSON)
+ .POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(event)))
+ .build();
+
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ if (response.statusCode() != 200) {
+ throw new RuntimeException(response.statusCode() + ": Queueing event failed!" + response.body());
+ }
+ }
+
+ private CompletableFuture awaitStreamingSubscription() {
+ int cnt = getStreamingSubscribedCount();
+ AtomicInteger initialCount = new AtomicInteger(cnt);
+
+ return CompletableFuture.runAsync(() -> {
+ try {
+ boolean done = false;
+ long end = System.currentTimeMillis() + 15000;
+ while (System.currentTimeMillis() < end) {
+ int count = getStreamingSubscribedCount();
+ if (count > initialCount.get()) {
+ done = true;
+ break;
+ }
+ Thread.sleep(500);
+ }
+ if (!done) {
+ throw new RuntimeException("Timed out waiting for subscription");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted");
+ }
+ });
+ }
+
+ private int getStreamingSubscribedCount() {
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + serverPort + "/test/streamingSubscribedCount"))
+ .GET()
+ .build();
+ try {
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ String body = response.body().trim();
+ return Integer.parseInt(body);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void deletePushNotificationConfigInStore(String taskId, String configId) throws Exception {
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(("http://localhost:" + serverPort + "/test/task/" + taskId + "/config/" + configId)))
+ .DELETE()
+ .build();
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ if (response.statusCode() != 200) {
+ throw new RuntimeException(response.statusCode() + ": Deleting task failed!" + response.body());
+ }
+ }
+
+ protected void savePushNotificationConfigInStore(String taskId, PushNotificationConfig notificationConfig) throws Exception {
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + serverPort + "/test/task/" + taskId))
+ .POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(notificationConfig)))
+ .header("Content-Type", APPLICATION_JSON)
+ .build();
+
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ if (response.statusCode() != 200) {
+ throw new RuntimeException(response.statusCode() + ": Creating task push notification config failed! " + response.body());
+ }
+ }
+
+}
diff --git a/tests-grpc/src/test/resources/META-INF/beans.xml b/tests-grpc/src/test/resources/META-INF/beans.xml
new file mode 100644
index 0000000..9b2940f
--- /dev/null
+++ b/tests-grpc/src/test/resources/META-INF/beans.xml
@@ -0,0 +1,6 @@
+
+
+
\ No newline at end of file
diff --git a/tests-grpc/src/test/resources/WEB-INF/web.xml b/tests-grpc/src/test/resources/WEB-INF/web.xml
new file mode 100644
index 0000000..4330758
--- /dev/null
+++ b/tests-grpc/src/test/resources/WEB-INF/web.xml
@@ -0,0 +1,7 @@
+
+
+
diff --git a/tests-grpc/src/test/resources/arquillian.xml b/tests-grpc/src/test/resources/arquillian.xml
new file mode 100644
index 0000000..d3909f2
--- /dev/null
+++ b/tests-grpc/src/test/resources/arquillian.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+ target/deployments
+
+
+
+
+ ${basedir}/target/wildfly
+
+ ${server.jvm.args} ${arquillian.java.vm.args}
+ ${jboss.args}
+ true
+
+
+
+
diff --git a/tests-grpc/src/test/resources/logging.properties b/tests-grpc/src/test/resources/logging.properties
new file mode 100644
index 0000000..17885c8
--- /dev/null
+++ b/tests-grpc/src/test/resources/logging.properties
@@ -0,0 +1,29 @@
+#
+# Copyright The WildFly Authors
+# SPDX-License-Identifier: Apache-2.0
+#
+
+# Additional logger names to configure (root logger is always configured)
+loggers=sun.rmi,org.jboss.shrinkwrap,org.apache.http.wire
+logger.org.jboss.shrinkwrap.level=INFO
+logger.sun.rmi.level=WARNING
+logger.org.apache.http.wire.level=WARN
+
+# Root logger level
+logger.level=WARN
+
+# Root logger handlers
+logger.handlers=FILE
+
+# File handler configuration
+handler.FILE=org.jboss.logmanager.handlers.FileHandler
+handler.FILE.properties=autoFlush,append,fileName
+handler.FILE.autoFlush=true
+handler.FILE.fileName=./target/test.log
+handler.FILE.formatter=PATTERN
+handler.FILE.append=true
+
+# Formatter pattern configuration
+formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.PATTERN.properties=pattern
+formatter.PATTERN.pattern=%d{HH:mm:ss,SSS} %-5p [%c] (%t) %s%e%n
diff --git a/tests/src/test/java/org/wildfly/extras/a2a/server/apps/jakarta/A2ATestResource.java b/tests/src/test/java/org/wildfly/extras/a2a/server/apps/jakarta/A2ATestResource.java
index 3019466..7a3dc27 100644
--- a/tests/src/test/java/org/wildfly/extras/a2a/server/apps/jakarta/A2ATestResource.java
+++ b/tests/src/test/java/org/wildfly/extras/a2a/server/apps/jakarta/A2ATestResource.java
@@ -35,7 +35,7 @@ public class A2ATestResource {
@PostConstruct
public void init() {
- //A2AServerResource.setStreamingIsSubscribedRunnable(streamingSubscribedCount::incrementAndGet);
+ A2AServerResource.setStreamingIsSubscribedRunnable(streamingSubscribedCount::incrementAndGet);
}
diff --git a/tests/src/test/java/org/wildfly/extras/a2a/server/apps/jakarta/JakartaA2AServerTest.java b/tests/src/test/java/org/wildfly/extras/a2a/server/apps/jakarta/JakartaA2AServerTest.java
index cb85b24..08f0b3d 100644
--- a/tests/src/test/java/org/wildfly/extras/a2a/server/apps/jakarta/JakartaA2AServerTest.java
+++ b/tests/src/test/java/org/wildfly/extras/a2a/server/apps/jakarta/JakartaA2AServerTest.java
@@ -5,9 +5,11 @@
import java.util.List;
import io.a2a.A2A;
+import io.a2a.client.http.A2AHttpClient;
import io.a2a.server.PublicAgentCard;
import io.a2a.server.apps.common.AbstractA2AServerTest;
import io.a2a.spec.Event;
+import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
import io.a2a.util.Assert;
import mutiny.zero.ZeroPublisher;
import org.jboss.arquillian.container.test.api.Deployment;
@@ -26,6 +28,16 @@ public JakartaA2AServerTest() {
super(8080);
}
+ @Override
+ protected String getTransportProtocol() {
+ return "JSONRPC";
+ }
+
+ @Override
+ protected String getTransportUrl() {
+ return "http://localhost:8080";
+ }
+
@Deployment
public static WebArchive createTestArchive() throws Exception {
final JavaArchive[] libraries = List.of(
@@ -33,10 +45,14 @@ public static WebArchive createTestArchive() throws Exception {
getJarForClass(A2A.class),
// a2a-java-sdk-common.jar
getJarForClass(Assert.class),
+ // a2a-java-sdk-http-client
+ getJarForClass(A2AHttpClient.class),
// a2a-java-sdk-server-common.jar
getJarForClass(PublicAgentCard.class),
// a2a-java-sdk-spec.jar
getJarForClass(Event.class),
+ // a2a-java-sdk-transport-jsonrpc
+ getJarForClass(JSONRPCHandler.class),
// mutiny-zero.jar. This is provided by some WildFly layers, but not always, and not in
// the server provisioned by Glow when inspecting our war
getJarForClass(ZeroPublisher.class)).toArray(new JavaArchive[0]);