Skip to content

Commit 1828b4d

Browse files
authored
feat: Replicated Queue Manager based on Mp Reactive Messaging with Kafka (#309)
It is implemented in such a way that the replication mechanism can be swapped with something else
1 parent 014a899 commit 1828b4d

File tree

33 files changed

+2154
-39
lines changed

33 files changed

+2154
-39
lines changed

extras/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ This directory contains additions to what is provided by the default SDK impleme
55
Please see the README's of each child directory for more details.
66

77
[`task-store-database-jpa`](./task-store-database-jpa/README.md) - Replaces the default `InMemoryTaskStore` with a `TaskStore` backed by a RDBMS. It uses JPA to interact with the RDBMS.
8-
[`push-notification-config-store-database-jpa`](./push-notification-config-store-database-jpa/README.md) - Replaces the default `InMemoryPushNotificationConfigStore` with a `PushNotificationConfigStore` backed by a RDBMS. It uses JPA to interact with the RDBMS.
8+
[`push-notification-config-store-database-jpa`](./push-notification-config-store-database-jpa/README.md) - Replaces the default `InMemoryPushNotificationConfigStore` with a `PushNotificationConfigStore` backed by a RDBMS. It uses JPA to interact with the RDBMS.
9+
[`queue-manager-replicated`](./queue-manager-replicated/README.md) - Replaces the default `InMemoryQueueManager` with a `QueueManager` supporting replication to other A2A servers implementing the same agent. You can write your own `ReplicationStrategy`, or use the provided `MicroProfile Reactive Messaging implementation`.
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
# A2A Java SDK - Replicated Queue Manager
2+
3+
This module provides a replicated implementation of the `QueueManager` interface that enables event replication across multiple A2A instances using message brokers like Apache Kafka. It ensures that events generated in one A2A instance are propagated to other instances for distributed operation.
4+
5+
The replication works by intercepting events as they are enqueued and sending them to a message broker. Events received from the broker are then processed by the local A2A instance, maintaining consistency across the distributed system.
6+
7+
## Architecture
8+
9+
The main components in the replicated queue manager are:
10+
11+
- **[`ReplicatedQueueManager`](./core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java)**: Core queue manager that wraps the default `InMemoryQueueManager` and handles event replication.
12+
- **[`ReplicationStrategy`](./core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicationStrategy.java)**: Interface for different replication implementations. If `ReplicatedQueueManager` is used, a `ReplicationStrategy` **must** be provided.
13+
14+
Currently, one implementation is provided: [`ReactiveMessagingReplicationStrategy`](./replication-mp-reactive/src/main/java/io/a2a/extras/queuemanager/replicated/mp_reactive/ReactiveMessagingReplicationStrategy.java), which uses MicroProfile Reactive Messaging with message brokers like Apache Kafka.
15+
16+
## Quick Start
17+
18+
This section will get you up and running quickly with a `ReplicatedQueueManager` using the `ReactiveMessagingReplicationStrategy` set up to use Kafka as the message broker.
19+
20+
### 1. Add Dependencies
21+
22+
#### Core Module (Required)
23+
24+
Add the core replicated queue manager module to your project's `pom.xml`:
25+
26+
```xml
27+
<dependency>
28+
<groupId>io.github.a2asdk</groupId>
29+
<artifactId>a2a-java-queue-manager-replicated-core</artifactId>
30+
<version>${a2a.version}</version>
31+
</dependency>
32+
```
33+
34+
The `ReplicatedQueueManager` is annotated in such a way that it should take precedence over the default `InMemoryQueueManager`. Hence, it is a drop-in replacement.
35+
36+
#### Replication Strategy Implementation (Required)
37+
38+
You must also include a replication strategy implementation. Currently, we provide one implementation using MicroProfile Reactive Messaging:
39+
40+
```xml
41+
<dependency>
42+
<groupId>io.github.a2asdk</groupId>
43+
<artifactId>a2a-java-queue-manager-replication-mp-reactive</artifactId>
44+
<version>${a2a.version}</version>
45+
</dependency>
46+
```
47+
48+
### 2. Basic Configuration
49+
50+
Add to your `application.properties`:
51+
52+
```properties
53+
# Configure the outgoing channel (QueueManager -> Kafka)
54+
mp.messaging.outgoing.replicated-events-out.connector=smallrye-kafka
55+
mp.messaging.outgoing.replicated-events-out.topic=replicated-events
56+
mp.messaging.outgoing.replicated-events-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
57+
58+
# Configure the incoming channel (Kafka -> QueueManager)
59+
mp.messaging.incoming.replicated-events-in.connector=smallrye-kafka
60+
mp.messaging.incoming.replicated-events-in.topic=replicated-events
61+
mp.messaging.incoming.replicated-events-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
62+
```
63+
64+
The channel names `replicated-events-in` and `replicated-events-out` correspond to the `@Incoming` and `@Channel` annotations in the ReactiveMessagingReplicationStrategy.
65+
66+
### 3. Kafka Topic Setup
67+
68+
Ensure your Kafka broker has the topic configured:
69+
70+
```bash
71+
# Create the replicated-events topic
72+
kafka-topics.sh --create --topic replicated-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
73+
```
74+
75+
## Configuration
76+
77+
### Kafka Configuration
78+
79+
#### Basic Settings
80+
81+
```properties
82+
# Kafka broker configuration
83+
kafka.bootstrap.servers=kafka-broker-1:9092,kafka-broker-2:9092
84+
85+
# Topic configuration
86+
mp.messaging.outgoing.replicated-events-out.topic=my-replicated-events
87+
mp.messaging.incoming.replicated-events-in.topic=my-replicated-events
88+
89+
# Consumer behavior
90+
mp.messaging.incoming.replicated-events-in.auto.offset.reset=earliest
91+
```
92+
93+
#### Advanced Settings
94+
95+
```properties
96+
# Consumer group configuration (important for multiple A2A instances)
97+
mp.messaging.incoming.replicated-events-in.group.id=a2a-instance-group
98+
99+
# Reliability configuration
100+
mp.messaging.outgoing.replicated-events-out.acks=all
101+
mp.messaging.outgoing.replicated-events-out.retries=3
102+
103+
# Performance tuning
104+
mp.messaging.outgoing.replicated-events-out.batch.size=16384
105+
mp.messaging.incoming.replicated-events-in.max.poll.records=500
106+
107+
# Serialization configuration
108+
mp.messaging.outgoing.replicated-events-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
109+
mp.messaging.incoming.replicated-events-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
110+
```
111+
112+
### Alternative Message Brokers
113+
114+
While Kafka is the primary tested message broker, Quarkus Reactive Messaging supports other brokers:
115+
116+
#### Apache Pulsar
117+
118+
```properties
119+
mp.messaging.outgoing.replicated-events-out.connector=smallrye-pulsar
120+
mp.messaging.incoming.replicated-events-in.connector=smallrye-pulsar
121+
pulsar.client.serviceUrl=pulsar://localhost:6650
122+
```
123+
124+
#### AMQP (RabbitMQ, etc.)
125+
126+
```properties
127+
mp.messaging.outgoing.replicated-events-out.connector=smallrye-amqp
128+
mp.messaging.incoming.replicated-events-in.connector=smallrye-amqp
129+
amqp-host=localhost
130+
amqp-port=5672
131+
```
132+
133+
**Note**: Alternative message brokers have not been tested in this project yet.
134+
135+
### WildFly/Jakarta EE Servers
136+
137+
For non-Quarkus environments, you'll need to configure MicroProfile Reactive Messaging according to your application server's documentation. The exact configuration will depend on your server's messaging capabilities, but generally, you will need to make sure the same properties as above are made available to the server application.
138+
139+
## How It Works
140+
141+
### Event Flow
142+
143+
1. **Event Generation**: When an event is generated in the A2A system (e.g., TaskStatusUpdateEvent), it's enqueued in the local queue
144+
2. **Replication Hook**: The `ReplicationHook` intercepts the event and sends it to the replication strategy
145+
3. **Message Broker**: The replication strategy serializes the event and sends it to the configured message broker
146+
4. **Event Reception**: Other A2A instances receive the event from the message broker
147+
5. **Local Processing**: The received event is deserialized and enqueued in the local instance's queue
148+
6. **Event Processing**: The local instance processes the replicated event, updating its state accordingly
149+
150+
### Event Types
151+
152+
The system replicates various event types while preserving their specific types:
153+
154+
- **TaskStatusUpdateEvent**: Task state changes (SUBMITTED, COMPLETED, etc.)
155+
- **TaskArtifactUpdateEvent**: Task artifact changes
156+
- **Message**: Chat messages and responses
157+
- **Task**: Complete task objects
158+
- **JSONRPCError**: Error events
159+
160+
### Serialization
161+
162+
Events are serialized using Jackson with polymorphic type information to ensure proper deserialization:
163+
164+
```json
165+
{
166+
"taskId": "task-123",
167+
"event": {
168+
"@type": "TaskStatusUpdateEvent",
169+
"taskId": "task-123",
170+
"status": {
171+
"state": "completed",
172+
"timestamp": "2023-09-29T10:30:00Z"
173+
},
174+
"final": true,
175+
"kind": "status-update"
176+
}
177+
}
178+
```
179+
180+
## Advanced Topics
181+
182+
### Custom Replication Strategies
183+
184+
The architecture is designed to support additional replication strategies in the future. To implement a custom replication strategy, create a class that implements the `ReplicationStrategy` interface and ensure it's discoverable via CDI:
185+
186+
```java
187+
@ApplicationScoped
188+
public class CustomReplicationStrategy implements ReplicationStrategy {
189+
190+
@Override
191+
public void send(String taskId, Event event) {
192+
// Implement custom replication logic
193+
// e.g., send to database, REST API, etc.
194+
}
195+
}
196+
```
197+
198+
### Monitoring Events
199+
200+
You can monitor replicated events by observing CDI events:
201+
202+
```java
203+
@ApplicationScoped
204+
public class ReplicationMonitor {
205+
206+
public void onReplicatedEvent(@Observes ReplicatedEvent event) {
207+
// Monitor replicated events for metrics, logging, etc.
208+
LOGGER.info("Received replicated event for task: " + event.getTaskId());
209+
}
210+
}
211+
```
212+
213+
### Logging
214+
215+
Enable debug logging to monitor replication activity:
216+
217+
```properties
218+
# For Quarkus
219+
quarkus.log.category."io.a2a.extras.queuemanager.replicated".level=DEBUG
220+
221+
# For other servers, configure your logging framework accordingly
222+
```
223+
224+
### Health Checks
225+
226+
When using Quarkus, the module integrates with MicroProfile Health to provide health checks:
227+
228+
```properties
229+
# Configure health check timeout
230+
quarkus.messaging.kafka.health.timeout=5s
231+
```
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>io.github.a2asdk</groupId>
9+
<artifactId>a2a-java-queue-manager-replicated-parent</artifactId>
10+
<version>0.3.0.Beta2-SNAPSHOT</version>
11+
<relativePath>../pom.xml</relativePath>
12+
</parent>
13+
14+
<artifactId>a2a-java-queue-manager-replicated-core</artifactId>
15+
<name>Java A2A Extras: Replicated Queue Manager Core</name>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>io.github.a2asdk</groupId>
20+
<artifactId>a2a-java-sdk-server-common</artifactId>
21+
<version>${project.version}</version>
22+
</dependency>
23+
<dependency>
24+
<groupId>io.quarkus</groupId>
25+
<artifactId>quarkus-core</artifactId>
26+
</dependency>
27+
<dependency>
28+
<groupId>jakarta.enterprise</groupId>
29+
<artifactId>jakarta.enterprise.cdi-api</artifactId>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.junit.jupiter</groupId>
33+
<artifactId>junit-jupiter-api</artifactId>
34+
<scope>test</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>ch.qos.logback</groupId>
38+
<artifactId>logback-classic</artifactId>
39+
<scope>test</scope>
40+
</dependency>
41+
</dependencies>
42+
43+
</project>

0 commit comments

Comments
 (0)