diff --git a/_posts/2025-10-31-quarkus-a2a-cloud-enhancements.adoc b/_posts/2025-10-31-quarkus-a2a-cloud-enhancements.adoc new file mode 100644 index 0000000000..51b11a8eee --- /dev/null +++ b/_posts/2025-10-31-quarkus-a2a-cloud-enhancements.adoc @@ -0,0 +1,494 @@ +--- +layout: post +title: 'How to Use Your A2A Server Agent in a Distributed or Cloud Native Environment' +date: 2025-10-31 +tags: ai a2a cloud +synopsis: This blog post shows how to configure an A2A server agent to use persistent stores and event queues, in order to get it working for distributed, cloud-native environments. +author: kkhan +--- + +The recent https://github.com/a2aproject/a2a-java/tree/v0.3.0.Final[0.3.0.Final] release of the A2A Java SDK, included a bunch of improvements. + +Farah recently blogged about the https://quarkus.io/blog/quarkus-a2a-java-security/[new security features] in the A2A Java SDK. + +This post will focus on the significant cloud-related enhancements which also are part of this release. These provide the building blocks for running the SDK in a distributed, cloud-native environment. We will demonstrate this with a simple A2A Agent, which simply appending `Artifacts` to a `Task` that contain the name of the pod handling the incoming `Message`s. + +Note that we have done a https://github.com/wildfly-extras/a2a-java-sdk-server-jakarta[0.3.1.Final] release since. The links in the examples will use this very latest release. + +We will cover the following topics: + +* Initially, we will look at the simple, in-memory components that form the core of the SDK's asynchronous model. These core classes were inspired by the Python version of the SDK, https://github.com/a2aproject/a2a-python[a2a-python]. +* Next, we'll introduce the new, persistent and replicated implementations designed for distributed environments. +* Finally, we'll walk through an https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/README.md[example] that combines these components and deploys an application in a Kubernetes cluster, showing that the components enable us to work in a load-balanced environment. + +== Core SDK In-Memory Components +The A2A Java SDK provides the tools to manage asynchronous, long-running processes, which we call Tasks. As described in the https://a2a-protocol.org/latest/topics/key-concepts/[Core Concepts] section of the A2A protocol documentation (as well as the https://a2a-protocol.org/latest/topics/life-of-a-task/[Life of a Task] and https://a2a-protocol.org/latest/topics/streaming-and-async/[Streaming and Asynchronous Operations for Long-Running Tasks] sections), a Task can move through various states. + +A key design principle is that as long as a Task is in a non-final state, it must be possible to send messages to do more work on it, or re-subscribe to its events. + +To manage this, the SDK relies on a few core components, and provides simple, default implementations of those: + +* https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java[`TaskStore`]: Manages the lifecycle and persistence of https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/spec/src/main/java/io/a2a/spec/Task.java[`Task`] objects. The default implementation is https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java[`InMemoryTaskStore`], which holds all `Task` information in memory. +* https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/server-common/src/main/java/io/a2a/server/tasks/PushNotificationConfigStore.java[`PushNotificationConfigStore`]: Stores a client's push notification configurations (like webhook URLs). The default is the https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/server-common/src/main/java/io/a2a/server/tasks/InMemoryPushNotificationConfigStore.java[`InMemoryPushNotificationConfigStore`]. +* https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/server-common/src/main/java/io/a2a/server/events/QueueManager.java[`QueueManager`]: This is central to the asynchronous model. It creates and manages https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/server-common/src/main/java/io/a2a/server/events/EventQueue.java[`EventQueue`]s for tasks. In your A2A applications, you will typically implement an `AgentExecutor`, which takes an incoming message, and then puts events on the `EventQueue` to update the `Task` with the results of the processing. The SDK then listens on this `EventQueue` to update the `Task` in the `TaskStore`, and notify any clients listening for updates. Clients listening for updates are ones that have asked to resubscribe, or ones having sent a message, and are waiting for results. + +These in-memory implementations are simple and fast, making them perfect for single-instance deployments, getting started, or testing. However, in a distributed or cloud environment where service instances can be created or destroyed, this in-memory state is lost, which breaks the guarantee for long-running tasks. Also, since all the state is kept in memory, there are overheads associated with this approach. + +== Distributed and Cloud Ready Implementations +To run in a distributed environment, we needed to replace these in-memory components with implementations that could persist state and coordinate across multiple service instances. For this release, we introduced several new components, which you can find in the https://github.com/a2aproject/a2a-java/tree/v0.3.1.Final/extras[`extras/`] directory of the `a2a-java` repository. + +Essentially, they are alternative implementations of the `TaskStore`, `PushNotificationConfigStore` and `QueueManager` interfaces. They have been annotated with the `@Alternative` and `@Priority` CDI annotations, so that if their Maven modules is included in your build, these implementations will take precedence over the default, in-memory ones. + +Each component has a README, and is fully tested with an integration test for additional information about how to set it up. + +While we are not aiming to provide support for every possible backend in this repository, we hope that the examples given show that it is relatively simple to provide other mechanisms to back the functionality. + +=== Database-Backed TaskStore and PushNotificationStore +The simplest problem to solve was persistence. We introduced: + +* https://github.com/a2aproject/a2a-java/tree/v0.3.1.Final/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java[`JpaDatabaseTaskStore`]: This persists `Task` state to a relational database using Jakarta JPA. See the https://github.com/a2aproject/a2a-java/tree/v0.3.1.Final/extras/task-store-database-jpa/README.md[README] for more details. The module also contains tests for additional guidance for how to set it up. +* https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java[`JpaDatabasePushNotificationConfigStore`]: This persists `PushNotificationConfig` instances to a relational database using Jakarta JPA. The https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/extras/push-notification-config-store-database-jpa/README.md[README] and the module's tests contain more details. + +=== Replicated QueueManager +As for the `QueueManager`, it doesn't just store state. It also manages active, in-memory message queues. If a client sends a message to a `Task` via one A2A Agent instance, we need to make sure that subscribers to the `Task` on other A2A Agent instances forming part of the cluster, also receive the resulting `EventQueue` messages. + +We introduced the https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java[ReplicatedQueueManager] for this. The component is built with a pluggable https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicationStrategy.java[`ReplicationStrategy`] responsible for pushing the events to other nodes in the cluster. It also notifies the other nodes when an `EventQueue` for a `Task` is created, and when it is closed. + +We provide one implementation of `ReplicationStrategy`, https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/extras/queue-manager-replicated/replication-mp-reactive/src/main/java/io/a2a/extras/queuemanager/replicated/mp_reactive/ReactiveMessagingReplicationStrategy.java[`ReactiveMessagingReplicationStrategy`], which uses MicroProfile Reactive Messaging under the hood to send and receive the replicated `EventQueue` events. The nice thing about MicroProfile Reactive Messaging, is that it is simple to configure alternative providers to do the actual messaging part by changing properties in your `application.properties`. In our tests, and examples we chose Kafka as the provider. + +However, you are not stuck with Kafka or MicroProfile Reactive Messaging. It should be possible to plug in whichever mechanism you choose, as long as you provide your own implementation of `ReactiveMessagingReplicationStrategy`. + +The https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/extras/queue-manager-replicated/README.md[README], and the tests in the module provide more information to help you, whether you want to learn more about how to configure the `ReplicatedQueueManager`, or implement your own `ReactiveMessagingReplicationStrategy`. + +=== Stabilising the Server-Side +Getting the `ReplicatedQueueManager` to work reliably required significant improvements to our server-side request handling. We did a lot of work to ensure that the event `EventQueue` associated with a `Task` is always available for non-final `Task` s, and that they are reliably cleaned up when a `Task` finishes, while ensuring that existing subscribers still get all the events before the `EventQueue` is closed. + +This was the final step needed to get a deployment working reliably on Kubernetes, which is the focus of the next section. + +=== Example: Running on Kubernetes +Now let us see how to combine these new components in a real-world scenario. We have created a new Kubernetes example demonstrating how to run a stateful A2A Java application in a Kubernetes cluster. It focuses on the improved implementations of `TaskStore` and `QueueManager`, and can be found https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/README.md[here]. + +We will summarise the most important parts here below. We will look at how to set everything up, and run the example, before showing the most important parts of how the A2A Agent server is configured. + +==== Prerequisites +To run the example, you need to install the following: + +* *Java*: Version 17 or higher +* *Container runtime*: We have tested with Docker and Podman +* *Maven*: 3.8.x or higher +* *Kind*: For Kubernetes support. Note the example will only work with Kind. Other implementations, such as Minikube, need other tweaks than Kind. +* *kubectl*: Kind does not automatically install a kubectl for you. + +==== Deploying and running the example +From the https://github.com/a2aproject/a2a-java/tree/v0.3.1.Final/examples/cloud-deployment/scripts[`scripts\`] folder, run either: + +* `./deploy.sh`: if you use Docker +* `./deploy.sh --container-tool podman`: if you use Podman + +The https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/scripts/deploy.sh[`deploy.sh`] script will take care of everything for you, and deploy your application. This includes + +* Creating the Kind cluster with fully configured local registry support +* Building the A2A Agent application image and pushing it to the registry +* Install the Strimzi operator from https://strimzi.io/install/latest?namespace=kafka +* Deploy the files in the https://github.com/a2aproject/a2a-java/tree/v0.3.1.Final/examples/cloud-deployment/k8s[k8s/] directory ordered by their numerical prefix. These: +** Configure the `a2a-demo` namespace for our application +** Installs PostgreSQL needed for our `JPADatabaseTaskStore` +** Installs Kafka, using the Strimzi operator +** Creates the Kafka topic our A2A Agent applications will use to replicate `EventQueue` events +** Creates a `ConfigMap` containing database connection properties, kafka bootstrap servers and the URL of the Agent +** Deploys our A2A Agent application + +Some of these steps take several minutes, so you need to be patient! + +Once everything is up and running you will see a message like this: +---- +========================================= +Deployment completed successfully! +========================================= + +To verify the deployment, run: + ./verify.sh + +To access the agent (via NodePort): + curl http://localhost:8080/.well-known/agent-card.json + +To run the test client (demonstrating load balancing): + cd ../server + mvn test-compile exec:java -Dexec.classpathScope=test \ + -Dexec.mainClass="io.a2a.examples.cloud.A2ACloudExampleClient" \ + -Dagent.url="http://localhost:8080" +---- + +Now that everything is deployed, simply copy the above command and run it: + +[source, bash] +---- +$ cd ../server +$ mvn test-compile exec:java -Dexec.classpathScope=test \ + -Dexec.mainClass="io.a2a.examples.cloud.A2ACloudExampleClient" \ + -Dagent.url="http://localhost:8080" + +---- + +You should now see output like the following + +---- +============================================= +A2A Cloud Deployment Example Client +============================================= + +Agent URL: http://localhost:8080 +Process messages: 8 +Message interval: 1500ms + +Fetching agent card... +✓ Agent: Cloud Deployment Demo Agent +✓ Description: Demonstrates A2A multi-pod deployment with Kafka event replication, PostgreSQL persistence, and round-robin load balancing across Kubernetes pods + +Client task ID: cloud-test-1761754920509 + +Creating streaming client for subscription... +Creating non-streaming client for sending messages... +✓ Clients created + +Step 1: Sending 'start' to create task... +✓ Task created: 2b525ae8-0b2a-43c9-b2fa-007a8b618240 + State: SUBMITTED + +Step 2: Subscribing to task for streaming updates... +✓ Subscribed to task updates + +Step 3: Sending 8 'process' messages (interval: 1500ms)... +-------------------------------------------- + Artifact #1: Processed by a2a-agent-cb7fd769-5wr8g + → Pod: a2a-agent-cb7fd769-5wr8g (Total unique pods: 1) + Artifact #2: Processed by a2a-agent-cb7fd769-5wr8g + → Pod: a2a-agent-cb7fd769-5wr8g (Total unique pods: 1) +✓ Process message 1 sent +✓ Process message 2 sent + Artifact #3: Processed by a2a-agent-cb7fd769-x9tdm + → Pod: a2a-agent-cb7fd769-x9tdm (Total unique pods: 2) +... +✓ Process message 8 sent + Artifact #13: Processed by a2a-agent-cb7fd769-5wr8g + → Pod: a2a-agent-cb7fd769-5wr8g (Total unique pods: 2) + +Waiting for process artifacts to arrive... + +Step 4: Sending 'complete' to finalize task... + Artifact #14: Completed by a2a-agent-cb7fd769-5wr8g + → Pod: a2a-agent-cb7fd769-5wr8g (Total unique pods: 2) +ℹ Subscription stream closed (expected after task completion) +✓ Complete message sent, task state: WORKING + +Waiting for task to complete... +⚠ Timeout waiting for task completion + +============================================= +Test Results +============================================= +Total artifacts received: 14 +Unique pods observed: 2 +Pod names and counts: {a2a-agent-cb7fd769-x9tdm=3, a2a-agent-cb7fd769-5wr8g=11} + +✓ TEST PASSED - Successfully demonstrated multi-pod processing! + Messages were handled by 2 different pods. + This proves that: + - Load balancing is working (round-robin across pods) + - Event replication is working (subscriber sees events from all pods) + - Database persistence is working (task state shared across pods) +---- + +The source code for the client can be found in https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/server/src/test/java/io/a2a/examples/cloud/A2ACloudExampleClient.java[`A2ACloudExampleClient`]. + +We will not show the full source code here, but in a nutshell what the client does is: + +1. Fetch the `AgentCard` of our A2A Agent. +2. Send an initial `Message` containing a `TextPart` with the text `create`. We will see in the next section how the server uses this to create a new `Task`. This `Task` is then returned to the client, and on the server side the A2A Java SDK will create an entry in the `JPADatabaseTaskStore` for the `Task`, and also `ReplicatedQueueManager` makes sure that the `EventQueue` for the `Task` remains open since the `Task` is in a non-final state. +3. Calls `resubscribe()` for events to the `Task` we just created. The resulting subscription is kept open until the end TODO step. It does not matter if this call is handled on the same, or a different, node as in the previous step since the `TaskStore` is backed by a database, and `QueueManager` is replicated. +4. The client then sends several `process` `Message` s to the server. It creates a new connection each time. Since there are two pods, it is not deterministic which pod will handle the request. During the course of the full run, both pods should get invoked. Again, updates to the `Task` should be reflected in the database-backed `TaskStore` and all `Events` are replicated to all nodes. On the server-side, an `Artifact` is added to the `Task` containing the name of the pod that processed the `Message`. +5. The subscriber from 3. outputs the messages as they come in, and keeps track of which nodes have been involved in processing `Message` s. The information about which node processed the message, is contained in the `Task` artifacts, as mentioned in the last point. +6. Finally, we send a `complete` `Message` to the A2A Agent, which puts the `Task` in a final state. This causes the `EventQueue` to be closed, which in turn causes the closure of the stream the client subscriber is subscribed to. Note that we receive the `Task` with this `completed` artifact from the server before the stream and subscription end. + +The important thing to note, is that it does not matter which node the client subscription happens on, nor which nodes the messages are sent on. Since the `TaskStore` is persistent, and the `Event` s replicated, everything is received by the client as if there was only one A2A Agent node involved. + +==== The Server Part of the Application +First let us look briefly at how the application has been written. As usual in an A2A application, you provide implementations of `AgentCard` and `AgentExecutor` via CDI. + +Our `AgentCard` is provided by https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/server/src/main/java/io/a2a/examples/cloud/CloudAgentCardProducer.java[`CloudAgentCardProducer`]. The most important parts are highlighted: + +[source,java] +---- +@ApplicationScoped +public class CloudAgentCardProducer { + + @ConfigProperty(name = "agent.url", defaultValue = "http://localhost:8080") <1> + String agentUrl; + + @Produces + @PublicAgentCard + public AgentCard agentCard() { + return new AgentCard.Builder() + .name("Cloud Deployment Demo Agent") + .description("Demonstrates A2A multi-pod deployment with Kafka event replication, " + + "PostgreSQL persistence, and round-robin load balancing across Kubernetes pods") + .url(agentUrl) <1> + .version("1.0.0") + .capabilities(new AgentCapabilities.Builder() + .streaming(true) <2> + .pushNotifications(false) + .stateTransitionHistory(false) + .build()) + .defaultInputModes(Collections.singletonList("text")) + .defaultOutputModes(Collections.singletonList("text")) + .skills(Collections.singletonList( + new AgentSkill.Builder() + .id("multi_pod_demo") + .name("Multi-Pod Replication Demo") + .description("Demonstrates cross-pod event replication. " + <3> + "Send 'start' to initialize, 'process' to add artifacts, " + + "'complete' to finalize. Each artifact shows which pod processed it.") + .tags(List.of("demo", "cloud", "kubernetes", "replication")) + .examples(List.of( + "start", + "process", + "complete" + )) + .build() + )) + .protocolVersion("0.3.0") + .build(); + } +} +---- +<1> The agent url is configurable, and is set by https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/k8s/05-agent-deployment.yaml#L53[`k8s/05-agent-deployment.yaml`] referencing a value from the ConfigMap configured in https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/k8s/04-agent-configmap.yaml#L11[`k8s/04-agent-configmap.yaml`] +<2> We have enabled streaming, since this is needed for the `resubscribe()` call done by the client +<3> Then we have a brief description of the `start`, `process` and `complete` 'commands' we saw the client send + +The `AgentExecutor` is provided by https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/server/src/main/java/io/a2a/examples/cloud/CloudAgentExecutorProducer.java[`CloudAgentExecutorProducer`] (some code has been removed to keep the example manageable): + +[source,java] +---- +@ApplicationScoped +public class CloudAgentExecutorProducer { + @Produces + public AgentExecutor agentExecutor() { + return new CloudAgentExecutor(); + } + + private static class CloudAgentExecutor implements AgentExecutor { + + @Override + public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError { + TaskUpdater updater = new TaskUpdater(context, eventQueue); <1> + + try { + // Extract user message and normalize + String messageText = extractTextFromMessage(context.getMessage()).trim().toLowerCase(); + // Get pod name from environment (set by Kubernetes Downward API) + String podName = System.getenv("POD_NAME"); <2> + + // Handle message based on command + if (context.getTask() == null) { <3> + // Initial message - create task in SUBMITTED → WORKING state + // This will have the `start` command + updater.submit(); + updater.startWork(); + String artifactText = "Started by " + podName; + List> parts = List.of(new TextPart(artifactText, null)); + updater.addArtifact(parts); + } else if ("complete".equals(messageText)) { <5> + // Completion trigger - add final artifact and complete + String artifactText = "Completed by " + podName; + List> parts = List.of(new TextPart(artifactText, null)); + updater.addArtifact(parts); + updater.complete(); + } else { <4> + // Subsequent messages - add artifacts (fire-and-forget, stays in WORKING) + // This is for the `process` commands + String artifactText = "Processed by " + podName; + List> parts = List.of(new TextPart(artifactText, null)); + updater.addArtifact(parts); + // No state change - task remains in WORKING + LOGGER.info("Artifact added on pod: {}", podName); + } + + } catch (JSONRPCError e) { + LOGGER.error("JSONRPC error processing task", e); + throw e; + } catch (Exception e) { + LOGGER.error("Error processing task", e); + throw new InternalError("Processing failed: " + e.getMessage()); + } + } +} +---- +<1> A TaskUpdater is created with the `RequestContext` and the `EventQueue`. Note that even for new `Task`s, the framework will have created the `EventQueue` for us. +<2> We get the name of the pod, as configured in https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/k8s/05-agent-deployment.yaml#L29[`k8s/05-agent-deployment.yaml`] +<3> The `start` command sent by the client ends up in this block. This makes a few calls to update the state of the `Task` using the `TaskUpdater`, and adds an `Artifact` indicating which pod started the `Task`. The `TaskUpdater` internally puts `Events` on the `EventQueue` after each of the calls on it, and the A2A Java SDK framework 'listens' to the queue, resulting in updates to the `Task` in the `TaskStore`, and sending results to any clients subscribed to the `Task` (or involved in making triggering this request). Since the `EventQueue` is replicated, the events are also pushed to other nodes in the cluster. +<4> The `process` messages end up in this block, which again add an `Artifact` to the `Task` via the `TaskUpdater`, which adds an `Event` to the queue. The `Event` is handled in the same way as in the above point. +<5> When a `complete` message is received, we add an `Artifact` to the `Task` using the `TaskUpdater`, indicating which pod is completing the `Task`, and finally use the `TaskUpdater` to set the `Task` state to `completed`. This again results in events on the `EventQueue`, which are handled as before. However, the update to a final state causes the `EventQueue` to be closed, which is also replicated to the other nodes. + +==== Configuring Quarkus for A2A with `ReplicatedQueueManager` and `JPADatabaseTaskStore` + +The two main parts involved in configuring our application, are adding dependencies to the POM, and adding configuration via `application.properties.` We will look at the POM first, and then look at the configuration. + +===== Pom Dependencies +The full POM for the example can be found https://github.com/a2aproject/a2a-java/blob/v0.3.1.Final/examples/cloud-deployment/server/pom.xml[here]. We will talk about the most important dependencies below, step-by-step. + +Since we are building a Quarkus based server, and for this case we only want the JSONRPC transport, we can include the following dependency which transitively includes everything else we need for our base A2A Agent server: + +[source, xml] +---- + + + io.github.a2asdk + a2a-java-sdk-reference-jsonrpc + ${sdk.version} + +---- +Then to override the standard `InMemoryTaskStore` with the `JpaDatabaseTaskStore`, and `InMemoryQueueManager` with `ReplicatedQueueManager` we include their modules +[source, xml] +---- + + + io.github.a2asdk + a2a-java-extras-task-store-database-jpa + ${sdk.version} + + + + io.github.a2asdk + a2a-java-queue-manager-replicated-core + ${sdk.version} + +---- +The `ReplicatedQueueManager` needs a `ReplicationStrategy`. Our `ReactiveMessagingReplicationStrategy` is implemented by this module: +[source, xml] +---- + + + io.github.a2asdk + a2a-java-queue-manager-replication-mp-reactive + ${sdk.version} + +---- +We will configure the `ReactiveMessagingReplicationStrategy` to use Kafka later, so we need the dependency to use Kafka with MicroProfile Reactive Messaging: +[source, xml] +---- + + + io.quarkus + quarkus-messaging-kafka + +---- +For the `JpaDatabaseTaskStore`, we need to add Hibernate, which provides the JPA functionality. Since our example uses PostgreSQL, we include its driver: +[source, xml] +---- + + + io.quarkus + quarkus-hibernate-orm + + + + io.quarkus + quarkus-jdbc-postgresql + +---- +Finally, since we will be deploying our A2A agent in Kubernetes, which uses readiness and liveness probes, we add the following dependency: +[source, xml] +---- + + + io.quarkus + quarkus-smallrye-health + +---- +That's it for the POM dependencies! + +Additionally, the POM contains the `quarkus-maven-plugin`, used to build the Quarkus server. This has no special configuration, so see the POM for more details. + +===== Configuration in application.properties + +The final piece of the puzzle is configuring the A2A Agent Quarkus application in its https://github.com/a2aproject/a2a-java/blob/main/examples/cloud-deployment/server/src/main/resources/application.properties[`application.properties`]. Again, let's discuss the contents in chunks. + +First we have some simple properties, setting the `agent.url` to use in the `CloudAgentCardProducer`. We also define the location of the health endpoint. +[source, properties] +---- +# Agent Configuration +agent.url=${AGENT_URL:http://localhost:8080} <1> + +# Health checks +quarkus.smallrye-health.root-path=/health <2> +---- +<1> `AGENT_URL` comes from https://github.com/a2aproject/a2a-java/blob/main/examples/cloud-deployment/k8s/05-agent-deployment.yaml#L53[`05-agent-deployment.yaml`], which in turn references the value defined in https://github.com/a2aproject/a2a-java/blob/main/examples/cloud-deployment/k8s/04-agent-configmap.yaml#L11[`04-agent-configmap.yaml`]. +<2> The endpoint matches what is expected by the readiness and liveness probes in https://github.com/a2aproject/a2a-java/blob/main/examples/cloud-deployment/k8s/05-agent-deployment.yaml#L65-L80[`05-agent-deployment.yaml`]. + + +Now we define the database used by our `JpaDatabaseTaskStore`: +[source, properties] +---- +# Database Configuration (PostgreSQL) +quarkus.datasource.db-kind=postgresql +quarkus.datasource.jdbc.url=${DATABASE_URL:jdbc:postgresql://localhost:5432/a2a} <1> +quarkus.datasource.username=${DATABASE_USER:a2a} +quarkus.datasource.password=${DATABASE_PASSWORD:a2a} +quarkus.datasource.jdbc.max-size=16 +---- +<1> The `DATABASE_URL`, `DATABASE_USER` and `DATABASE_PASSWORD` environment variables used are defined in `04-agent-configmap.yaml` and exposed to the application via `05-agent-deployment.yaml`. + +Next we have the configuration of the `a2a-java` JPA persistence unit, which is used by `JpaDatabaseTaskStore` and `JpaPushNotificationConfigStore`. +[source, properties] +---- + +# Hibernate ORM - Configure persistence unit "a2a-java" +quarkus.hibernate-orm."a2a-java".datasource= +quarkus.hibernate-orm."a2a-java".database.generation=update +quarkus.hibernate-orm."a2a-java".log.sql=false +quarkus.hibernate-orm."a2a-java".packages=io.a2a.extras.taskstore.database.jpa,io.a2a.extras.pushnotificationconfigstore.database.jpa +---- + +Finally, we have the MicroProfile Reactive Messaging configuration, which maps our channels to Kafka. For more information about how MicroProfile Reactive Messaging works in Quarkus, see this https://quarkus.io/guides/kafka-getting-started[guide]. + +Under the hood, the `ReactiveMessagingReplicationStrategy` uses MicroProfile Reactive Messaging. It uses an `Emitter` writing to a channel called `replicated-events-out`, and has an `@Incoming` annotated method receiving events from a channel called `replicated-events-in`. + +[source, properties] +---- + +# Kafka Configuration for Event Replication +kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} <1> + +# MicroProfile Reactive Messaging - Outgoing (Publish to Kafka) +mp.messaging.outgoing.replicated-events-out.connector=smallrye-kafka <2> +mp.messaging.outgoing.replicated-events-out.topic=a2a-replicated-events +mp.messaging.outgoing.replicated-events-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer + +# MicroProfile Reactive Messaging - Incoming (Subscribe from Kafka) +mp.messaging.incoming.replicated-events-in.connector=smallrye-kafka <2> +mp.messaging.incoming.replicated-events-in.topic=a2a-replicated-events +# Each pod needs a unique consumer group to receive ALL events (broadcast behavior) +# Using POD_NAME from Kubernetes Downward API ensures each instance gets its own group +mp.messaging.incoming.replicated-events-in.group.id=a2a-cloud-${POD_NAME:local} <3> +mp.messaging.incoming.replicated-events-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +mp.messaging.incoming.replicated-events-in.auto.offset.reset=earliest +---- +<1> `KAFKA_BOOTSTRAP_SERVERS` is defined in `04-agent-configmap.yaml` and exposed to the application via `05-agent-deployment.yaml`. +<2> Both the `replicated-events-out` and `replicated-events-in` channels use the Kafka connector, send/receive on the same Kafka topic, and (de)serialize `String`. +<3> `POD_NAME` is exposed to the application in `05-agent-deployment.yaml`, which obtains it from the Kubernetes metadata. This has the effect of setting a unique groupId, so that all pods in the cluster receive the replicated events. + +== Conclusion +The simple in-memory of the components shown are great for getting up and running fast. However, to work in an enterprise, distributed, or cloud environment we need to replace these components to use shared state, in order to survive server reboots and have replication between A2A Agent instances. + +=== Further Reading + +* https://quarkus.io/blog/quarkus-a2a-java-0-3-0-alpha-release/[Getting Started with Quarkus and A2A Java SDK 0.3.0] +* https://quarkus.io/blog/quarkus-a2a-java-0-3-0-beta-release/[A2A Java SDK: Support for the REST Transport is Now Here] +* https://quarkus.io/blog/quarkus-a2a-java-grpc/[Getting Started with A2A Java SDK and gRPC] +* https://github.com/a2aproject/a2a-samples/tree/main/samples/java/agents[A2A Java SDK Samples] +* https://github.com/a2aproject/a2a-java/blob/main/README.md[A2A Java SDK Documentation] +* https://a2a-protocol.org/latest/specification/[A2A Specification] +