diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java index 40abbc95c..4a98ace48 100644 --- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java @@ -1,9 +1,11 @@ package io.a2a.server.grpc.quarkus; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; -import io.a2a.server.PublicAgentCard; import io.a2a.grpc.handler.GrpcHandler; +import io.a2a.server.PublicAgentCard; +import io.a2a.server.requesthandlers.CallContextFactory; import io.a2a.server.requesthandlers.RequestHandler; import io.a2a.spec.AgentCard; import io.quarkus.grpc.GrpcService; @@ -11,8 +13,31 @@ @GrpcService public class QuarkusGrpcHandler extends GrpcHandler { + private final AgentCard agentCard; + private final RequestHandler requestHandler; + private final Instance callContextFactoryInstance; + @Inject - public QuarkusGrpcHandler(@PublicAgentCard AgentCard agentCard, RequestHandler requestHandler) { - super(agentCard, requestHandler); + public QuarkusGrpcHandler(@PublicAgentCard AgentCard agentCard, + RequestHandler requestHandler, + Instance callContextFactoryInstance) { + this.agentCard = agentCard; + this.requestHandler = requestHandler; + this.callContextFactoryInstance = callContextFactoryInstance; + } + + @Override + protected RequestHandler getRequestHandler() { + return requestHandler; + } + + @Override + protected AgentCard getAgentCard() { + return agentCard; + } + + @Override + protected CallContextFactory getCallContextFactory() { + return callContextFactoryInstance.isUnsatisfied() ? null : callContextFactoryInstance.get(); } } \ No newline at end of file diff --git a/reference/grpc/src/test/resources/application.properties b/reference/grpc/src/test/resources/application.properties index 840386426..2ddb0ae2a 100644 --- a/reference/grpc/src/test/resources/application.properties +++ b/reference/grpc/src/test/resources/application.properties @@ -1,5 +1,2 @@ quarkus.grpc.clients.a2a-service.host=localhost -quarkus.grpc.clients.a2a-service.port=9001 - -# The GrpcHandler @ApplicationScoped annotation is not compatible with Quarkus -quarkus.arc.exclude-types=io.a2a.grpc.handler.GrpcHandler \ No newline at end of file +quarkus.grpc.clients.a2a-service.port=9001 \ No newline at end of file diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentCardProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentCardProducer.java index 3354c1522..7a50b7697 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentCardProducer.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentCardProducer.java @@ -18,10 +18,11 @@ public class AgentCardProducer { @Produces @PublicAgentCard public AgentCard agentCard() { + String port = System.getProperty("test.agent.card.port", "8081"); return new AgentCard.Builder() .name("test-card") .description("A test agent card") - .url("http://localhost:8081") + .url("http://localhost:" + port) .version("1.0") .documentationUrl("http://example.com/docs") .capabilities(new AgentCapabilities.Builder() diff --git a/transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java index 68311009f..15aa84209 100644 --- a/transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java +++ b/transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java @@ -3,18 +3,15 @@ import static io.a2a.grpc.utils.ProtoUtils.FromProto; import static io.a2a.grpc.utils.ProtoUtils.ToProto; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Instance; -import jakarta.inject.Inject; - +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow; import com.google.protobuf.Empty; import io.a2a.grpc.A2AServiceGrpc; import io.a2a.grpc.StreamResponse; -import io.a2a.server.PublicAgentCard; import io.a2a.server.ServerCallContext; import io.a2a.server.auth.UnauthenticatedUser; import io.a2a.server.auth.User; @@ -46,29 +43,14 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; -import java.util.HashMap; -import java.util.Map; - -@ApplicationScoped -public class GrpcHandler extends A2AServiceGrpc.A2AServiceImplBase { - - private AgentCard agentCard; - private RequestHandler requestHandler; +public abstract class GrpcHandler extends A2AServiceGrpc.A2AServiceImplBase { // Hook so testing can wait until streaming subscriptions are established. // Without this we get intermittent failures private static volatile Runnable streamingSubscribedRunnable; - @Inject - Instance callContextFactory; - - protected GrpcHandler() { - } - @Inject - public GrpcHandler(@PublicAgentCard AgentCard agentCard, RequestHandler requestHandler) { - this.agentCard = agentCard; - this.requestHandler = requestHandler; + public GrpcHandler() { } @Override @@ -77,7 +59,7 @@ public void sendMessage(io.a2a.grpc.SendMessageRequest request, try { ServerCallContext context = createCallContext(responseObserver); MessageSendParams params = FromProto.messageSendParams(request); - EventKind taskOrMessage = requestHandler.onMessageSend(params, context); + EventKind taskOrMessage = getRequestHandler().onMessageSend(params, context); io.a2a.grpc.SendMessageResponse response = ToProto.taskOrMessage(taskOrMessage); responseObserver.onNext(response); responseObserver.onCompleted(); @@ -94,7 +76,7 @@ public void getTask(io.a2a.grpc.GetTaskRequest request, try { ServerCallContext context = createCallContext(responseObserver); TaskQueryParams params = FromProto.taskQueryParams(request); - Task task = requestHandler.onGetTask(params, context); + Task task = getRequestHandler().onGetTask(params, context); if (task != null) { responseObserver.onNext(ToProto.task(task)); responseObserver.onCompleted(); @@ -114,7 +96,7 @@ public void cancelTask(io.a2a.grpc.CancelTaskRequest request, try { ServerCallContext context = createCallContext(responseObserver); TaskIdParams params = FromProto.taskIdParams(request); - Task task = requestHandler.onCancelTask(params, context); + Task task = getRequestHandler().onCancelTask(params, context); if (task != null) { responseObserver.onNext(ToProto.task(task)); responseObserver.onCompleted(); @@ -131,7 +113,7 @@ public void cancelTask(io.a2a.grpc.CancelTaskRequest request, @Override public void createTaskPushNotificationConfig(io.a2a.grpc.CreateTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { - if (! agentCard.capabilities().pushNotifications()) { + if (!getAgentCard().capabilities().pushNotifications()) { handleError(responseObserver, new PushNotificationNotSupportedError()); return; } @@ -139,7 +121,7 @@ public void createTaskPushNotificationConfig(io.a2a.grpc.CreateTaskPushNotificat try { ServerCallContext context = createCallContext(responseObserver); TaskPushNotificationConfig config = FromProto.taskPushNotificationConfig(request); - TaskPushNotificationConfig responseConfig = requestHandler.onSetTaskPushNotificationConfig(config, context); + TaskPushNotificationConfig responseConfig = getRequestHandler().onSetTaskPushNotificationConfig(config, context); responseObserver.onNext(ToProto.taskPushNotificationConfig(responseConfig)); responseObserver.onCompleted(); } catch (JSONRPCError e) { @@ -152,7 +134,7 @@ public void createTaskPushNotificationConfig(io.a2a.grpc.CreateTaskPushNotificat @Override public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { - if (! agentCard.capabilities().pushNotifications()) { + if (!getAgentCard().capabilities().pushNotifications()) { handleError(responseObserver, new PushNotificationNotSupportedError()); return; } @@ -160,7 +142,7 @@ public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationCon try { ServerCallContext context = createCallContext(responseObserver); GetTaskPushNotificationConfigParams params = FromProto.getTaskPushNotificationConfigParams(request); - TaskPushNotificationConfig config = requestHandler.onGetTaskPushNotificationConfig(params, context); + TaskPushNotificationConfig config = getRequestHandler().onGetTaskPushNotificationConfig(params, context); responseObserver.onNext(ToProto.taskPushNotificationConfig(config)); responseObserver.onCompleted(); } catch (JSONRPCError e) { @@ -173,7 +155,7 @@ public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationCon @Override public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { - if (! agentCard.capabilities().pushNotifications()) { + if (!getAgentCard().capabilities().pushNotifications()) { handleError(responseObserver, new PushNotificationNotSupportedError()); return; } @@ -181,7 +163,7 @@ public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationC try { ServerCallContext context = createCallContext(responseObserver); ListTaskPushNotificationConfigParams params = FromProto.listTaskPushNotificationConfigParams(request); - List configList = requestHandler.onListTaskPushNotificationConfig(params, context); + List configList = getRequestHandler().onListTaskPushNotificationConfig(params, context); io.a2a.grpc.ListTaskPushNotificationConfigResponse.Builder responseBuilder = io.a2a.grpc.ListTaskPushNotificationConfigResponse.newBuilder(); for (TaskPushNotificationConfig config : configList) { @@ -199,7 +181,7 @@ public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationC @Override public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request, StreamObserver responseObserver) { - if (! agentCard.capabilities().streaming()) { + if (!getAgentCard().capabilities().streaming()) { handleError(responseObserver, new InvalidRequestError()); return; } @@ -207,7 +189,7 @@ public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request, try { ServerCallContext context = createCallContext(responseObserver); MessageSendParams params = FromProto.messageSendParams(request); - Flow.Publisher publisher = requestHandler.onMessageSendStream(params, context); + Flow.Publisher publisher = getRequestHandler().onMessageSendStream(params, context); convertToStreamResponse(publisher, responseObserver); } catch (JSONRPCError e) { handleError(responseObserver, e); @@ -219,7 +201,7 @@ public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request, @Override public void taskSubscription(io.a2a.grpc.TaskSubscriptionRequest request, StreamObserver responseObserver) { - if (! agentCard.capabilities().streaming()) { + if (!getAgentCard().capabilities().streaming()) { handleError(responseObserver, new InvalidRequestError()); return; } @@ -227,7 +209,7 @@ public void taskSubscription(io.a2a.grpc.TaskSubscriptionRequest request, try { ServerCallContext context = createCallContext(responseObserver); TaskIdParams params = FromProto.taskIdParams(request); - Flow.Publisher publisher = requestHandler.onResubscribeToTask(params, context); + Flow.Publisher publisher = getRequestHandler().onResubscribeToTask(params, context); convertToStreamResponse(publisher, responseObserver); } catch (JSONRPCError e) { handleError(responseObserver, e); @@ -287,7 +269,7 @@ public void onComplete() { public void getAgentCard(io.a2a.grpc.GetAgentCardRequest request, StreamObserver responseObserver) { try { - responseObserver.onNext(ToProto.agentCard(agentCard)); + responseObserver.onNext(ToProto.agentCard(getAgentCard())); responseObserver.onCompleted(); } catch (Throwable t) { handleInternalError(responseObserver, t); @@ -297,7 +279,7 @@ public void getAgentCard(io.a2a.grpc.GetAgentCardRequest request, @Override public void deleteTaskPushNotificationConfig(io.a2a.grpc.DeleteTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { - if (! agentCard.capabilities().pushNotifications()) { + if (!getAgentCard().capabilities().pushNotifications()) { handleError(responseObserver, new PushNotificationNotSupportedError()); return; } @@ -305,7 +287,7 @@ public void deleteTaskPushNotificationConfig(io.a2a.grpc.DeleteTaskPushNotificat try { ServerCallContext context = createCallContext(responseObserver); DeleteTaskPushNotificationConfigParams params = FromProto.deleteTaskPushNotificationConfigParams(request); - requestHandler.onDeleteTaskPushNotificationConfig(params, context); + getRequestHandler().onDeleteTaskPushNotificationConfig(params, context); // void response responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); @@ -317,7 +299,8 @@ public void deleteTaskPushNotificationConfig(io.a2a.grpc.DeleteTaskPushNotificat } private ServerCallContext createCallContext(StreamObserver responseObserver) { - if (callContextFactory == null || callContextFactory.isUnsatisfied()) { + CallContextFactory factory = getCallContextFactory(); + if (factory == null) { // Default implementation when no custom CallContextFactory is provided // This handles both CDI injection scenarios and test scenarios where callContextFactory is null User user = UnauthenticatedUser.INSTANCE; @@ -335,7 +318,6 @@ private ServerCallContext createCallContext(StreamObserver responseObserv return new ServerCallContext(user, state); } else { - CallContextFactory factory = callContextFactory.get(); // TODO: CallContextFactory interface expects ServerCall + Metadata, but we only have StreamObserver // This is another manifestation of the architectural limitation mentioned above return factory.create(responseObserver); // Fall back to basic create() method for now @@ -393,4 +375,9 @@ public static void setStreamingSubscribedRunnable(Runnable runnable) { streamingSubscribedRunnable = runnable; } + protected abstract RequestHandler getRequestHandler(); + + protected abstract AgentCard getAgentCard(); + + protected abstract CallContextFactory getCallContextFactory(); } diff --git a/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java index 6e269026b..1cfeb6626 100644 --- a/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java +++ b/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java @@ -32,7 +32,9 @@ import io.a2a.server.ServerCallContext; import io.a2a.server.events.EventConsumer; import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest; +import io.a2a.server.requesthandlers.CallContextFactory; import io.a2a.server.requesthandlers.DefaultRequestHandler; +import io.a2a.server.requesthandlers.RequestHandler; import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.AgentCard; import io.a2a.spec.Artifact; @@ -67,7 +69,7 @@ public class GrpcHandlerTest extends AbstractA2ARequestHandlerTest { @Test public void testOnGetTaskSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); GetTaskRequest request = GetTaskRequest.newBuilder() .setName("tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId()) @@ -89,7 +91,7 @@ public void testOnGetTaskSuccess() throws Exception { @Test public void testOnGetTaskNotFound() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); GetTaskRequest request = GetTaskRequest.newBuilder() .setName("tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId()) .build(); @@ -103,7 +105,7 @@ public void testOnGetTaskNotFound() throws Exception { @Test public void testOnCancelTaskSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorCancel = (context, eventQueue) -> { @@ -134,7 +136,7 @@ public void testOnCancelTaskSuccess() throws Exception { @Test public void testOnCancelTaskNotSupported() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorCancel = (context, eventQueue) -> { @@ -153,7 +155,7 @@ public void testOnCancelTaskNotSupported() throws Exception { @Test public void testOnCancelTaskNotFound() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); CancelTaskRequest request = CancelTaskRequest.newBuilder() .setName("tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId()) .build(); @@ -166,7 +168,7 @@ public void testOnCancelTaskNotFound() throws Exception { @Test public void testOnMessageNewMessageSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getMessage()); }; @@ -182,7 +184,7 @@ public void testOnMessageNewMessageSuccess() throws Exception { @Test public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getMessage()); @@ -198,7 +200,7 @@ public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception { @Test public void testOnMessageError() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(new UnsupportedOperationError()); }; @@ -208,7 +210,7 @@ public void testOnMessageError() throws Exception { @Test public void testSetPushNotificationConfigSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + "config456"; StreamRecorder streamRecorder = createTaskPushNotificationConfigRequest(handler, NAME); @@ -227,7 +229,7 @@ public void testSetPushNotificationConfigSuccess() throws Exception { @Test public void testGetPushNotificationConfigSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; @@ -256,7 +258,7 @@ public void testGetPushNotificationConfigSuccess() throws Exception { @Test public void testPushNotificationsNotSupportedError() throws Exception { AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false, true); - GrpcHandler handler = new GrpcHandler(card, requestHandler); + GrpcHandler handler = new TestGrpcHandler(card, requestHandler); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); StreamRecorder streamRecorder = createTaskPushNotificationConfigRequest(handler, NAME); assertGrpcError(streamRecorder, Status.Code.UNIMPLEMENTED); @@ -268,7 +270,7 @@ public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception { DefaultRequestHandler requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false); - GrpcHandler handler = new GrpcHandler(card, requestHandler); + GrpcHandler handler = new TestGrpcHandler(card, requestHandler); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); StreamRecorder streamRecorder = getTaskPushNotificationConfigRequest(handler, NAME); assertGrpcError(streamRecorder, Status.Code.UNIMPLEMENTED); @@ -280,7 +282,7 @@ public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception { DefaultRequestHandler requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false); - GrpcHandler handler = new GrpcHandler(card, requestHandler); + GrpcHandler handler = new TestGrpcHandler(card, requestHandler); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); StreamRecorder streamRecorder = createTaskPushNotificationConfigRequest(handler, NAME); assertGrpcError(streamRecorder, Status.Code.UNIMPLEMENTED); @@ -288,7 +290,7 @@ public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception { @Test public void testOnMessageStreamNewMessageSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; @@ -306,7 +308,7 @@ public void testOnMessageStreamNewMessageSuccess() throws Exception { @Test public void testOnMessageStreamNewMessageExistingTaskSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; @@ -358,7 +360,7 @@ public void onCompleted() { @Test public void testOnMessageStreamNewMessageExistingTaskSuccessMocks() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); io.a2a.spec.Task task = new io.a2a.spec.Task.Builder(AbstractA2ARequestHandlerTest.MINIMAL_TASK) .history(new ArrayList<>()) @@ -408,7 +410,7 @@ public void testOnMessageStreamNewMessageExistingTaskSuccessMocks() throws Excep @Test public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); List events = List.of( AbstractA2ARequestHandlerTest.MINIMAL_TASK, new TaskArtifactUpdateEvent.Builder() @@ -488,7 +490,7 @@ public void onCompleted() { @Test public void testOnResubscribeNoExistingTaskError() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); TaskSubscriptionRequest request = TaskSubscriptionRequest.newBuilder() .setName("tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId()) .build(); @@ -500,7 +502,7 @@ public void testOnResubscribeNoExistingTaskError() throws Exception { @Test public void testOnResubscribeExistingTaskSuccess() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); queueManager.createOrTap(AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId()); @@ -535,7 +537,7 @@ public void testOnResubscribeExistingTaskSuccess() throws Exception { @Test public void testOnResubscribeExistingTaskSuccessMocks() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); queueManager.createOrTap(AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId()); @@ -580,7 +582,7 @@ public void testOnResubscribeExistingTaskSuccessMocks() throws Exception { @Test public void testStreamingNotSupportedError() throws Exception { AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, true); - GrpcHandler handler = new GrpcHandler(card, requestHandler); + GrpcHandler handler = new TestGrpcHandler(card, requestHandler); StreamRecorder streamRecorder = sendStreamingMessageRequest(handler); assertGrpcError(streamRecorder, Status.Code.INVALID_ARGUMENT); } @@ -589,7 +591,7 @@ public void testStreamingNotSupportedError() throws Exception { public void testStreamingNotSupportedErrorOnResubscribeToTask() throws Exception { // This test does not exist in the Python implementation AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, true); - GrpcHandler handler = new GrpcHandler(card, requestHandler); + GrpcHandler handler = new TestGrpcHandler(card, requestHandler); TaskSubscriptionRequest request = TaskSubscriptionRequest.newBuilder() .setName("tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId()) .build(); @@ -603,14 +605,14 @@ public void testStreamingNotSupportedErrorOnResubscribeToTask() throws Exception public void testOnMessageStreamInternalError() throws Exception { DefaultRequestHandler mocked = Mockito.mock(DefaultRequestHandler.class); Mockito.doThrow(new InternalError("Internal Error")).when(mocked).onMessageSendStream(Mockito.any(MessageSendParams.class), Mockito.any(ServerCallContext.class)); - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, mocked); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, mocked); StreamRecorder streamRecorder = sendStreamingMessageRequest(handler); assertGrpcError(streamRecorder, Status.Code.INTERNAL); } @Test public void testListPushNotificationConfig() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); @@ -636,7 +638,7 @@ public void testListPushNotificationConfig() throws Exception { @Test public void testListPushNotificationConfigNotSupported() throws Exception { AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false, true); - GrpcHandler handler = new GrpcHandler(card, requestHandler); + GrpcHandler handler = new TestGrpcHandler(card, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); @@ -654,7 +656,7 @@ public void testListPushNotificationConfigNotSupported() throws Exception { public void testListPushNotificationConfigNoPushConfigStore() { DefaultRequestHandler requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); @@ -670,7 +672,7 @@ public void testListPushNotificationConfigNoPushConfigStore() { @Test public void testListPushNotificationConfigTaskNotFound() { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; @@ -685,7 +687,7 @@ public void testListPushNotificationConfigTaskNotFound() { @Test public void testDeletePushNotificationConfig() throws Exception { - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); @@ -708,7 +710,7 @@ public void testDeletePushNotificationConfig() throws Exception { @Test public void testDeletePushNotificationConfigNotSupported() throws Exception { AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false, true); - GrpcHandler handler = new GrpcHandler(card, requestHandler); + GrpcHandler handler = new TestGrpcHandler(card, requestHandler); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); @@ -727,7 +729,7 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception { public void testDeletePushNotificationConfigNoPushConfigStore() { DefaultRequestHandler requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); - GrpcHandler handler = new GrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); + GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder() .setName(NAME) @@ -799,4 +801,29 @@ private void assertGrpcError(StreamRecorder streamRecorder, Status.Code e Assertions.assertEquals(expectedStatusCode, ((StatusRuntimeException) streamRecorder.getError()).getStatus().getCode()); Assertions.assertTrue(streamRecorder.getValues().isEmpty()); } + + private static class TestGrpcHandler extends GrpcHandler { + private final AgentCard card; + private final RequestHandler handler; + + TestGrpcHandler(AgentCard card, RequestHandler handler) { + this.card = card; + this.handler = handler; + } + + @Override + protected RequestHandler getRequestHandler() { + return handler; + } + + @Override + protected AgentCard getAgentCard() { + return card; + } + + @Override + protected CallContextFactory getCallContextFactory() { + return null; + } + } }