Skip to content

Commit c9fe654

Browse files
authored
feat: Implement robust replication and task-aware queue lifecycle (#351)
This commit introduces a comprehensive overhaul of the event replication and queue lifecycle management system. It ensures data consistency for all request types, enhances robustness in distributed environments, and adds support for new operational patterns. Previously, the system suffered from several issues: blocking requests could lead to data loss, the replication mechanism was susceptible to race conditions, and the queue lifecycle was not fully aligned with the task's state, preventing patterns like fire-and-forget. This change implements three major architectural improvements: 1. Fix Data Loss in Blocking Requests: * Corrects a critical bug where blocking onMessageSend calls would stop processing events after returning the first one. * The ResultAggregator now ensures all subsequent events from the agent are consumed in the background, guaranteeing the final task state is correctly persisted in the TaskStore. 2. Authoritative State & Transaction-Aware Replication: * Introduces a TaskStateProvider interface to decouple queue management from the persistence layer, allowing the ReplicatedQueueManager to check the authoritative state of a task from a shared database. * Replaces time-based cleanup delays with a robust, transaction-aware "poison pill" (QueueClosedEvent). This event is now broadcast via a CDI observer only after the final task state is successfully committed to the database, eliminating race conditions in distributed cleanup. 3. Task-Aware Queue Lifecycle & `ThreadLocal` Removal: * Refactors the EventQueue to use a polymorphic EventQueueItem, which cleanly distinguishes between local and replicated events. This allows for the complete removal of the isHandlingReplicatedEvent ThreadLocal, simplifying the concurrency model. * The MainQueue lifecycle is now strictly tied to the task's finalization state. As a core guarantee, a `MainQueue` will remain open for a task as long as the task is not in a final state. This holds true even if all consumers disconnect, enabling reliable fire-and-forget operations and late resubscriptions, while queues for finalized tasks are correctly garbage collected.
1 parent 8e83576 commit c9fe654

File tree

48 files changed

+2637
-488
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2637
-488
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ nbproject/
4747

4848
# Private Claude config
4949
.claude/
50+
.serena/

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ public void sendMessageStreaming(MessageSendParams request, Consumer<StreamingEv
140140
msg -> sseEventListener.onMessage(msg, ref.get()),
141141
throwable -> sseEventListener.onError(throwable, ref.get()),
142142
() -> {
143-
// We don't need to do anything special on completion
143+
// Signal normal stream completion to error handler (null error means success)
144+
sseEventListener.onComplete();
144145
}));
145146
} catch (IOException e) {
146147
throw new A2AClientException("Failed to send streaming message request: " + e, e);
@@ -318,7 +319,8 @@ public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> event
318319
msg -> sseEventListener.onMessage(msg, ref.get()),
319320
throwable -> sseEventListener.onError(throwable, ref.get()),
320321
() -> {
321-
// We don't need to do anything special on completion
322+
// Signal normal stream completion to error handler (null error means success)
323+
sseEventListener.onComplete();
322324
}));
323325
} catch (IOException e) {
324326
throw new A2AClientException("Failed to send task resubscription request: " + e, e);

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class SSEEventListener {
1616
private static final Logger log = Logger.getLogger(SSEEventListener.class.getName());
1717
private final Consumer<StreamingEventKind> eventHandler;
1818
private final Consumer<Throwable> errorHandler;
19+
private volatile boolean completed = false;
1920

2021
public SSEEventListener(Consumer<StreamingEventKind> eventHandler,
2122
Consumer<Throwable> errorHandler) {
@@ -38,6 +39,24 @@ public void onError(Throwable throwable, Future<Void> future) {
3839
future.cancel(true); // close SSE channel
3940
}
4041

42+
public void onComplete() {
43+
// Idempotent: only signal completion once, even if called multiple times
44+
if (completed) {
45+
log.fine("SSEEventListener.onComplete() called again - ignoring (already completed)");
46+
return;
47+
}
48+
completed = true;
49+
50+
// Signal normal stream completion (null error means successful completion)
51+
log.fine("SSEEventListener.onComplete() called - signaling successful stream completion");
52+
if (errorHandler != null) {
53+
log.fine("Calling errorHandler.accept(null) to signal successful completion");
54+
errorHandler.accept(null);
55+
} else {
56+
log.warning("errorHandler is null, cannot signal completion");
57+
}
58+
}
59+
4160
private void handleMessage(JsonNode jsonNode, Future<Void> future) {
4261
try {
4362
if (jsonNode.has("error")) {

extras/common/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>io.github.a2asdk</groupId>
9+
<artifactId>a2a-java-sdk-parent</artifactId>
10+
<version>0.3.0.Beta3-SNAPSHOT</version>
11+
<relativePath>../../pom.xml</relativePath>
12+
</parent>
13+
14+
<artifactId>a2a-java-extras-common</artifactId>
15+
<name>A2A Java SDK :: Extras :: Common</name>
16+
<description>Common classes shared across extras modules</description>
17+
18+
<dependencies>
19+
<!-- Minimal dependencies - just what's needed for event classes -->
20+
</dependencies>
21+
</project>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.a2a.extras.common.events;
2+
3+
/**
4+
* CDI event fired when a task reaches a final state and is successfully persisted to the database.
5+
* This event is fired AFTER the database transaction commits, making it safe for downstream
6+
* components to assume the task is durably stored.
7+
*
8+
* <p>Used by the replicated queue manager to send poison pill events after ensuring
9+
* the final task state is committed to the database, eliminating race conditions.
10+
*/
11+
public class TaskFinalizedEvent {
12+
private final String taskId;
13+
14+
public TaskFinalizedEvent(String taskId) {
15+
this.taskId = taskId;
16+
}
17+
18+
public String getTaskId() {
19+
return taskId;
20+
}
21+
22+
@Override
23+
public String toString() {
24+
return "TaskFinalizedEvent{taskId='" + taskId + "'}";
25+
}
26+
}

extras/queue-manager-replicated/README.md

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,186 @@ Events are serialized using Jackson with polymorphic type information to ensure
177177
}
178178
```
179179

180+
## Production Considerations
181+
182+
### Kafka Partitioning Strategy
183+
184+
**Critical for scalability and correctness**: How you partition your Kafka topic significantly impacts system performance and behavior.
185+
186+
#### Simple Approach: Single Partition
187+
188+
The simplest configuration uses a single partition for the replicated events topic:
189+
190+
```bash
191+
kafka-topics.sh --create --topic replicated-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
192+
```
193+
194+
**Advantages**:
195+
- Guarantees global event ordering
196+
- Simpler to reason about and debug
197+
- Suitable for development, testing, and low-throughput production systems
198+
199+
**Disadvantages**:
200+
- Limited scalability (single partition bottleneck)
201+
- Cannot parallelize consumption across multiple consumer instances
202+
- All events processed sequentially
203+
204+
**When to use**: Development environments, integration tests, production systems with low event volumes (<1000 events/sec), or when strict global ordering is required.
205+
206+
#### Recommended Approach: Partition by Task ID
207+
208+
For production systems with higher throughput, partition events by `taskId`:
209+
210+
```properties
211+
# Configure the producer to use taskId as the partition key
212+
mp.messaging.outgoing.replicated-events-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
213+
mp.messaging.outgoing.replicated-events-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
214+
```
215+
216+
```bash
217+
# Create topic with multiple partitions
218+
kafka-topics.sh --create --topic replicated-events --bootstrap-server localhost:9092 --partitions 10 --replication-factor 3
219+
```
220+
221+
The `ReactiveMessagingReplicationStrategy` already sends the `taskId` as the Kafka message key, so Kafka will automatically partition by task ID using its default partitioner.
222+
223+
**Advantages**:
224+
- **Horizontal scalability**: Different tasks can be processed in parallel across partitions
225+
- **Per-task ordering guarantee**: All events for a single task go to the same partition and maintain order
226+
- **Consumer parallelism**: Multiple consumer instances can process different partitions concurrently
227+
228+
**Disadvantages**:
229+
- No global ordering across all tasks
230+
- More complex to debug (events spread across partitions)
231+
- Requires proper consumer group configuration
232+
233+
**When to use**: Production systems with medium to high throughput, systems that need to scale horizontally, distributed deployments with multiple A2A instances.
234+
235+
#### Consumer Group Configuration
236+
237+
When using multiple partitions, ensure all A2A instances belong to the same consumer group:
238+
239+
```properties
240+
mp.messaging.incoming.replicated-events-in.group.id=a2a-instance-group
241+
```
242+
243+
This ensures that:
244+
- Each partition is consumed by exactly one instance
245+
- Events for the same task always go to the same instance (partition affinity)
246+
- System can scale horizontally by adding more instances (up to the number of partitions)
247+
248+
**Rule of thumb**: Number of partitions ≥ number of A2A instances for optimal distribution.
249+
250+
### Transaction-Aware Queue Cleanup ("Poison Pill")
251+
252+
When a task reaches a final state (COMPLETED, FAILED, CANCELED), all nodes in the cluster must terminate their event consumers for that task. This is achieved through a special "poison pill" event (`QueueClosedEvent`) that is replicated to all nodes.
253+
254+
#### How It Works
255+
256+
The poison pill mechanism uses **transaction-aware CDI events** to ensure the poison pill is only sent AFTER the final task state is durably committed to the database:
257+
258+
1. **Task Finalization**: When `JpaDatabaseTaskStore.save()` persists a task with a final state, it fires a `TaskFinalizedEvent` CDI event
259+
2. **Transaction Coordination**: The CDI observer is configured with `@Observes(during = TransactionPhase.AFTER_SUCCESS)`, which delays event delivery until AFTER the JPA transaction commits
260+
3. **Poison Pill Delivery**: `ReplicatedQueueManager.onTaskFinalized()` receives the event and sends `QueueClosedEvent` via the replication strategy
261+
4. **Cluster-Wide Termination**: All nodes receive the `QueueClosedEvent`, recognize it as final, and gracefully terminate their event consumers
262+
263+
**Key Architecture Decision**: We use JPA transaction lifecycle hooks instead of time-based delays for poison pill delivery because:
264+
- **Eliminates race conditions**: No time window where the poison pill might arrive before the database commit
265+
- **Deterministic cleanup**: Queue termination happens immediately after transaction commit, without delay-based tuning
266+
- **Simplicity**: No need to monitor consumer lag or configure delays for cleanup timing
267+
- **Reliability**: Works correctly regardless of network latency or database performance
268+
269+
**Note**: While the poison pill mechanism eliminates delays for cleanup, the system still uses a configurable grace period (`a2a.replication.grace-period-seconds`, default 15s) in `JpaDatabaseTaskStore.isTaskActive()` to handle late-arriving replicated events. This grace period prevents queue recreation for tasks that were recently finalized, accommodating Kafka consumer lag and network delays. See the Grace Period Configuration section below for details.
270+
271+
#### Code Flow
272+
273+
**JpaDatabaseTaskStore** (fires CDI event):
274+
```java
275+
@Inject
276+
Event<TaskFinalizedEvent> taskFinalizedEvent;
277+
278+
public void save(Task task) {
279+
// ... persist task to database ...
280+
281+
// Fire CDI event if task reached final state
282+
if (task.getStatus().state().isFinal()) {
283+
taskFinalizedEvent.fire(new TaskFinalizedEvent(task.getId()));
284+
}
285+
// Transaction commits here (end of method)
286+
}
287+
```
288+
289+
**ReplicatedQueueManager** (observes and sends poison pill):
290+
```java
291+
public void onTaskFinalized(@Observes(during = TransactionPhase.AFTER_SUCCESS) TaskFinalizedEvent event) {
292+
String taskId = event.getTaskId();
293+
LOGGER.debug("Task {} finalized - sending poison pill after transaction commit", taskId);
294+
295+
// Send QueueClosedEvent to all nodes via replication
296+
QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
297+
replicationStrategy.send(taskId, closedEvent);
298+
}
299+
```
300+
301+
#### Configuration
302+
303+
No configuration is required for the poison pill mechanism - it works automatically when:
304+
1. Using `JpaDatabaseTaskStore` for task persistence
305+
2. Using `ReplicatedQueueManager` for event replication
306+
3. Both modules are present in your application
307+
308+
#### Monitoring
309+
310+
Enable debug logging to monitor poison pill delivery:
311+
312+
```properties
313+
quarkus.log.category."io.a2a.extras.queuemanager.replicated".level=DEBUG
314+
quarkus.log.category."io.a2a.extras.taskstore.database.jpa".level=DEBUG
315+
```
316+
317+
You should see log entries like:
318+
```
319+
Task abc-123 is in final state, firing TaskFinalizedEvent
320+
Task abc-123 finalized - sending poison pill (QueueClosedEvent) after transaction commit
321+
```
322+
323+
#### Grace Period Configuration
324+
325+
While the poison pill mechanism provides deterministic cleanup timing, the system uses a configurable **grace period** to handle late-arriving replicated events. This is separate from the poison pill mechanism and serves a different purpose.
326+
327+
**Purpose**: The grace period prevents queue recreation for tasks that were recently finalized. When a replicated event arrives after a task is finalized, the system checks if the task is still within the grace period before creating a new queue.
328+
329+
**Configuration**:
330+
```properties
331+
# Grace period for handling late-arriving events (default: 15 seconds)
332+
a2a.replication.grace-period-seconds=15
333+
```
334+
335+
**How It Works**:
336+
1. When a task is finalized, `JpaDatabaseTaskStore` records the `finalizedAt` timestamp
337+
2. When a replicated event arrives, `ReplicatedQueueManager.onReplicatedEvent()` calls `taskStateProvider.isTaskActive(taskId)`
338+
3. `JpaDatabaseTaskStore.isTaskActive()` returns `true` if:
339+
- Task is not in a final state, OR
340+
- Task is final but within the grace period (`now < finalizedAt + gracePeriodSeconds`)
341+
4. If `isTaskActive()` returns `false`, the replicated event is skipped (no queue created)
342+
343+
**When to Adjust**:
344+
- **Increase** the grace period if you observe warnings about skipped events for inactive tasks in high-latency networks
345+
- **Decrease** the grace period to reduce memory usage in systems with very low latency and high task turnover
346+
- **Default (15s)** is suitable for most deployments with typical Kafka consumer lag
347+
348+
**Monitoring**:
349+
```properties
350+
quarkus.log.category."io.a2a.extras.queuemanager.replicated".level=DEBUG
351+
```
352+
353+
Watch for:
354+
```
355+
Skipping replicated event for inactive task abc-123 # Event arrived too late
356+
```
357+
358+
**Important**: This grace period is for **late event handling**, not cleanup timing. The poison pill mechanism handles cleanup deterministically without delays.
359+
180360
## Advanced Topics
181361

182362
### Custom Replication Strategies

extras/queue-manager-replicated/core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
<artifactId>a2a-java-sdk-server-common</artifactId>
2121
<version>${project.version}</version>
2222
</dependency>
23+
<dependency>
24+
<groupId>io.github.a2asdk</groupId>
25+
<artifactId>a2a-java-extras-common</artifactId>
26+
<version>${project.version}</version>
27+
</dependency>
2328
<dependency>
2429
<groupId>io.quarkus</groupId>
2530
<artifactId>quarkus-core</artifactId>

0 commit comments

Comments
 (0)