Skip to content

Commit 97a28b8

Browse files
committed
fix: Handle race condition when client disconnects before streaming response completes
When SSE streaming clients disconnect before task completion, background consumption continues and eventually calls subscriber.onComplete(). By this time, the HTTP response has already been written and closed by Vert.x, causing IllegalStateException: "Response has already been written". This is a benign race condition that occurs when: 1. Client disconnects → HTTP response written/closed 2. Background consumption completes → tries to call subscriber.onComplete() 3. Response already closed → IllegalStateException thrown 4. Error logged by Mutiny: "Mutiny had to drop the following exception" The fix catches the specific exception message and logs it at debug level, allowing graceful handling of the expected client disconnect scenario. Fixes SSE streaming test timeouts on REST transport in CI. Evidence: Run_4 logs showing ForkJoinPool.commonPool-worker threads throwing IllegalStateException during onComplete for streaming responses.
1 parent a3f2721 commit 97a28b8

File tree

4 files changed

+622
-1
lines changed

4 files changed

+622
-1
lines changed
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
# Event Queue Architecture
2+
3+
## Overview
4+
The A2A Java SDK uses a producer-consumer pattern for handling asynchronous agent execution and event streaming.
5+
6+
## Core Components
7+
8+
### EventQueue Hierarchy
9+
- **MainQueue**: Primary queue for a task, manages child queues
10+
- **ChildQueue**: "Tapped" view that receives copies of parent events
11+
- **Reference Counting**: MainQueue tracks children, closes when all children close (or immediate flag set)
12+
13+
### Queue Lifecycle Management
14+
15+
**Creation:**
16+
```java
17+
EventQueue queue = queueManager.createOrTap(taskId);
18+
// Creates MainQueue if new, or ChildQueue if tapping existing
19+
```
20+
21+
**Closing:**
22+
```java
23+
queue.close(immediate, notifyParent);
24+
// immediate: clear pending events vs graceful drain
25+
// notifyParent: decrement MainQueue ref count (ChildQueue only)
26+
```
27+
28+
### Thread Model
29+
30+
**Agent-Executor Threads:**
31+
- Pool of 15 threads (configurable via `a2a.executor.max-pool-size`)
32+
- Execute `AgentExecutor.execute()` which enqueues events
33+
- Return immediately after execution (as of fix 3e93dd2)
34+
- No longer block waiting for consumer to start
35+
36+
**Vert.x Worker Threads:**
37+
- Handle HTTP/gRPC/REST requests
38+
- Create and run EventConsumer to poll queue
39+
- Process events via ResultAggregator
40+
- Close queue on final events/errors
41+
42+
**Background Threads:**
43+
- Cleanup tasks tracked in `backgroundTasks` set
44+
- Manage ChildQueue/MainQueue lifecycle after agent completion
45+
- Handle reference counting for resubscription support
46+
47+
## Event Flow
48+
49+
### Non-Streaming (onMessageSend)
50+
```
51+
1. Vert.x worker: Create EventQueue, EventConsumer
52+
2. Agent-executor: Execute agent async → enqueue events → return immediately
53+
3. Vert.x worker: Poll queue in consumeAndBreakOnInterrupt()
54+
- Blocks until terminal event or AUTH_REQUIRED
55+
- Consumer closes queue on final event
56+
4. Background: Wait for agent → cleanup ChildQueue → manage MainQueue refs
57+
```
58+
59+
### Streaming (onMessageSendStream)
60+
```
61+
1. Vert.x worker: Create EventQueue, EventConsumer
62+
2. Agent-executor: Execute agent async → enqueue events → return immediately
63+
3. Vert.x worker: Return Flow.Publisher immediately
64+
- Consumer polls queue in background
65+
- Streams events to client
66+
- On client disconnect: continue consuming in background
67+
- Consumer closes queue on final event
68+
4. Background: Always async cleanup for streaming
69+
```
70+
71+
## Queue Closing Semantics
72+
73+
### Who Closes What
74+
75+
**EventConsumer.consumeAll()** - Primary queue closer:
76+
- Closes queue when final event received (COMPLETED, CANCELED, FAILED, REJECTED, UNKNOWN)
77+
- Closes queue on Message event (legacy)
78+
- Closes queue on EventQueueClosedException
79+
80+
**Background cleanupProducer()** - Secondary cleanup:
81+
- Waits for agent completion
82+
- Closes ChildQueue (may already be closed by consumer)
83+
- Uses `close(immediate=false, notifyParent=!keepMainQueueAlive)` for reference counting
84+
85+
**Error Handling:**
86+
- Agent errors: Set via EnhancedRunnable.setError(), consumer picks up via callback
87+
- Consumer errors: EventConsumer fails tube, queue closes
88+
- No immediate queue close in agent whenComplete() callback
89+
90+
### Reference Counting for Resubscription
91+
92+
**Purpose:** Keep MainQueue alive for resubscription when task is non-final
93+
94+
**Mechanism:**
95+
```java
96+
boolean keepMainQueueAlive = !isStreaming && task != null && !task.isFinal();
97+
queue.close(false, !keepMainQueueAlive);
98+
```
99+
100+
**Effect:**
101+
- Non-streaming non-final tasks: Close ChildQueue without notifying MainQueue
102+
- MainQueue stays open, allowing onResubscribeToTask() to tap it
103+
- Streaming or final tasks: Close ChildQueue and notify MainQueue (decrements ref count)
104+
105+
## Signal Mechanism (Deprecated)
106+
107+
### awaitQueuePollerStart() - NO LONGER USED as of 3e93dd2
108+
109+
**Original Purpose:**
110+
Agent-executor threads waited for consumer to start polling before returning, ensuring queue wouldn't close prematurely.
111+
112+
**Problem:**
113+
Created cascading delays when Vert.x workers were busy. Agent threads blocked up to 10 seconds waiting for worker threads to become available.
114+
115+
**Replacement:**
116+
Queue lifecycle entirely managed by consumer. Consumer is guaranteed to be running when it closes queue, eliminating race condition.
117+
118+
**Implementation Details:**
119+
- CountDownLatch in MainQueue, countdown in dequeueEvent() finally block
120+
- Still exists in code but no longer called
121+
- Can be removed in future cleanup
122+
123+
## Configuration
124+
125+
### Thread Pool Settings
126+
```properties
127+
# tck/src/main/resources/application.properties
128+
a2a.executor.core-pool-size=5
129+
a2a.executor.max-pool-size=15
130+
a2a.executor.keep-alive-seconds=60
131+
```
132+
133+
### Vert.x Settings
134+
```properties
135+
# Worker thread blocking timeout
136+
quarkus.vertx.max-worker-execute-time=300s # Temporary workaround, should be removed
137+
```
138+
139+
### Queue Settings
140+
```java
141+
EventQueue.DEFAULT_QUEUE_SIZE = 1000
142+
EventConsumer.QUEUE_WAIT_MILLISECONDS = 500 # Polling timeout
143+
```
144+
145+
## Best Practices
146+
147+
### Creating Queues
148+
- Use `queueManager.createOrTap()` for both create and resubscribe scenarios
149+
- MainQueue created with onClose callback for QueueManager cleanup
150+
- ChildQueues inherit MainQueue events via internalEnqueueEvent()
151+
152+
### Closing Queues
153+
- Let EventConsumer.consumeAll() handle queue closing (it knows when to close)
154+
- Background cleanup manages reference counting
155+
- Don't close queue in agent execution code
156+
157+
### Resubscription Support
158+
- Non-streaming non-final tasks keep MainQueue alive
159+
- Use `queue.close(false, false)` to close ChildQueue without notifying parent
160+
- MainQueue closes when last child notifies or immediate close requested
161+
162+
### Error Handling
163+
- Set errors via EnhancedRunnable.setError() for consumer pickup
164+
- Consumer handles errors via createAgentRunnableDoneCallback()
165+
- Background cleanup is safe to call even if queue already closed
166+
167+
## Common Patterns
168+
169+
### Standard Request Handler Pattern
170+
```java
171+
EventQueue queue = queueManager.createOrTap(taskId);
172+
EnhancedRunnable producer = registerAndExecuteAgentAsync(taskId, context, queue);
173+
try {
174+
EventConsumer consumer = new EventConsumer(queue);
175+
producer.addDoneCallback(consumer.createAgentRunnableDoneCallback());
176+
// Consume events (blocking or streaming)
177+
} finally {
178+
runningAgents.remove(taskId);
179+
trackBackgroundTask(cleanupProducer(agentFuture, taskId, queue, isStreaming));
180+
}
181+
```
182+
183+
### Cancel Pattern
184+
```java
185+
EventQueue queue = queueManager.tap(taskId); // Tap existing or create new
186+
agentExecutor.cancel(context, queue);
187+
Optional.ofNullable(runningAgents.get(taskId)).ifPresent(cf -> cf.cancel(true));
188+
EventConsumer consumer = new EventConsumer(queue);
189+
EventKind result = resultAggregator.consumeAll(consumer);
190+
```
191+
192+
### Resubscribe Pattern
193+
```java
194+
EventQueue queue = queueManager.tap(taskId);
195+
if (queue == null && !task.isFinal()) {
196+
queue = queueManager.createOrTap(taskId); // Create new for future events
197+
}
198+
EventConsumer consumer = new EventConsumer(queue);
199+
return resultAggregator.consumeAndEmit(consumer);
200+
```
201+
202+
## Troubleshooting
203+
204+
### Queue Not Closing
205+
- Check if consumer reached final event (DEBUG logs show "Dequeued event")
206+
- Verify task state is actually final (isFinal() returns true)
207+
- Check background cleanup completed (CLEANUP END log message)
208+
209+
### Events Missing
210+
- Verify queue not closed before consumer started polling
211+
- Check for EventQueueClosedException in logs
212+
- Ensure agent enqueued events before returning
213+
214+
### Resource Leaks
215+
- Monitor runningAgents.size() (should return to 0)
216+
- Check backgroundTasks.size() (tasks should complete)
217+
- Verify QueueManager active queue count
218+
- Use thread dumps to identify blocked threads
219+
220+
### Performance Issues
221+
- Check for blocking waits in agent-executor threads (should be none)
222+
- Monitor Vert.x worker thread availability
223+
- Review queue wait timeouts and polling frequency
224+
- Consider increasing thread pool size if CPU allows
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# TCK Timeout Investigation & Resolution
2+
3+
## Problem Summary
4+
TCK tests were timing out in CI environment after 5 minutes, with 11 leaked agents that never completed cleanup (183 agents registered, only 172 removed from runningAgents map).
5+
6+
## Investigation Timeline
7+
8+
### Initial Hypothesis (Incorrect)
9+
- Suspected finally blocks not executing due to Vert.x thread abandonment
10+
- Evidence: "Thread blocked for 60770ms, time limit is 60000ms" in logs
11+
- Applied fix: Increased Vert.x timeout from 60s to 300s
12+
- Result: Leak pattern persisted (183 registered, 172 removed, 11 leaked)
13+
14+
### Root Cause Discovery
15+
Analyzed CI logs and discovered the actual bottleneck:
16+
17+
**Pattern from logs:**
18+
```
19+
16:17:43 - Agent waits for queue poller to start
20+
16:17:53 - Queue poller started (10 seconds later)
21+
16:18:13 - Next agent waits for queue poller to start
22+
16:18:23 - Queue poller started (10 seconds later)
23+
```
24+
25+
**Root Cause:**
26+
- Agent-executor threads call `awaitQueuePollerStart()` which blocks up to 10 seconds
27+
- When Vert.x worker threads are all busy, consumers can't start polling immediately
28+
- Agent-executor threads (limited pool of 15) block waiting for worker threads
29+
- Cascading delays: 11 agents × 10 seconds = 110+ seconds of accumulated waste
30+
- Test timeout at 5 minutes kills process before cleanup finally blocks execute
31+
32+
**Architecture Flaw:**
33+
Agent-executor threads were synchronously waiting for Vert.x worker threads to become available, creating a resource bottleneck where valuable executor threads sat idle.
34+
35+
## Solution Implemented
36+
37+
### Commit: 3e93dd2
38+
**Title:** fix: Remove awaitQueuePollerStart to eliminate thread blocking bottleneck
39+
40+
### Key Changes
41+
42+
**1. Removed Blocking Wait**
43+
File: `server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java`
44+
45+
Before:
46+
```java
47+
agentExecutor.execute(requestContext, queue);
48+
try {
49+
queueManager.awaitQueuePollerStart(queue);
50+
} catch (InterruptedException e) {
51+
Thread.currentThread().interrupt();
52+
}
53+
```
54+
55+
After:
56+
```java
57+
agentExecutor.execute(requestContext, queue);
58+
// No longer wait for queue poller to start - the consumer (which is guaranteed
59+
// to be running on the Vert.x worker thread) will handle queue lifecycle.
60+
// This avoids blocking agent-executor threads waiting for worker threads.
61+
```
62+
63+
**2. Moved Queue Lifecycle to Consumer**
64+
65+
Before: `whenComplete()` callback closed queue immediately on errors and final states
66+
67+
After: Queue lifecycle entirely managed by `EventConsumer.consumeAll()`:
68+
- Consumer closes queue on final events (line 95 of EventConsumer.java)
69+
- Consumer handles errors via error callback mechanism
70+
- No race condition: consumer is running before it can close queue
71+
72+
**3. Architecture Documentation**
73+
Added comprehensive JavaDoc explaining new queue lifecycle:
74+
- Agent-executor thread: Executes and returns immediately
75+
- Vert.x worker (consumer): Polls queue, closes on final event
76+
- Background cleanup: Manages ChildQueue/MainQueue reference counting
77+
78+
### Benefits
79+
1. **Eliminates blocking:** Agent-executor threads return immediately
80+
2. **No cascading delays:** No more 10-second waits accumulating
81+
3. **Better resource utilization:** Threads don't block idle
82+
4. **Simpler architecture:** Consumer owns queue lifecycle (single responsibility)
83+
5. **No race conditions:** Consumer guaranteed running before closing queue
84+
85+
### Temporary Workaround Status
86+
File: `tck/src/main/resources/application.properties`
87+
88+
```properties
89+
# TEMPORARY WORKAROUND: Increase Vert.x worker thread blocking timeout
90+
# Default is 60s, increased to 300s (5 minutes) to match TCK timeout
91+
# TODO: Remove this workaround once proper fix is validated:
92+
# - Make EventQueue polling non-blocking with async/reactive patterns
93+
# - Or add timeout to consumeAndBreakOnInterrupt() to return early
94+
# See: Investigation of TCK timeout issues in CI (21 agents leaked due to thread abandonment)
95+
quarkus.vertx.max-worker-execute-time=300s
96+
```
97+
98+
This workaround prevented Vert.x from abandoning threads, but didn't fix the root cause (blocking wait).
99+
Should be removed once CI validates the awaitQueuePollerStart fix.
100+
101+
## Expected CI Results
102+
After this fix, CI should show:
103+
- ✅ All 183 agents register and all 183 clean up (no leaks)
104+
- ✅ Tests complete in 2-3 minutes (not 5+ minutes)
105+
- ✅ No "Waiting for queue poller to start" log entries with long delays
106+
- ✅ All finally blocks execute successfully
107+
- ✅ runningAgents map size returns to 0 at test completion
108+
109+
## Technical Details
110+
111+
### EventQueue Lifecycle Before Fix
112+
1. Agent-executor: Execute agent → enqueue events → **BLOCK** waiting for consumer to start
113+
2. Vert.x worker: Start consumer → first dequeue signals latch → agent-executor unblocks
114+
3. Agent-executor whenComplete: Close queue on final state
115+
4. Consumer: May still be polling when queue closes (race condition prevented by latch)
116+
117+
### EventQueue Lifecycle After Fix
118+
1. Agent-executor: Execute agent → enqueue events → **RETURN IMMEDIATELY**
119+
2. Vert.x worker: Start consumer → poll queue → process events → close on final event
120+
3. Agent-executor whenComplete: Set error flag, invoke callbacks (no queue operations)
121+
4. Background cleanup: Wait for agent complete → close ChildQueue → manage MainQueue refs
122+
123+
### Key Architectural Insight
124+
The consumer (EventConsumer.consumeAll()) is guaranteed to be running when it needs to close the queue, eliminating the need for synchronization between agent-executor threads and Vert.x worker threads.
125+
126+
## Related Files
127+
- `server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java` - Main fix
128+
- `server-common/src/main/java/io/a2a/server/events/EventConsumer.java` - Queue lifecycle owner
129+
- `server-common/src/main/java/io/a2a/server/events/EventQueue.java` - Unused awaitQueuePollerStart() method
130+
- `tck/src/main/resources/application.properties` - Temporary workaround (to be removed)
131+
- `.github/workflows/run-tck.yml` - Thread/heap dump capture for diagnostics
132+
133+
## Session Date
134+
2025-10-13
135+
136+
## Branch
137+
event-queues-take3

0 commit comments

Comments
 (0)