Skip to content

Commit e25328b

Browse files
authored
fix: Implement reference counting for EventQueue to prevent premature MainQueue closure (#333)
Implements reference counting for EventQueues to fix KafkaReplicationIntegrationTest and prepare for improvements to the replicated QueueManager. Additional fixes resolve TCK timeout issues discovered during testing. ## Changes **EventQueue Reference Counting:** - MainQueue tracks active ChildQueues with reference counting - Prevents premature closure when ChildQueues are still consuming events - Fixes KafkaReplicationIntegrationTest failures - Prepares infrastructure for replicated QueueManager improvements **Fix ForkJoinPool Saturation (TCK fixes):** - Inject @internal Executor (15 threads) into all transport handlers - Changed CompletableFuture.runAsync() to use injected executor instead of ForkJoinPool.commonPool() - Prevents streaming subscription timeouts under concurrent load on CI (3 threads) - Affects: RestHandler, GrpcHandler, JSONRPCHandler **Improved Queue Lifecycle:** - Removed awaitQueuePollerStart() to eliminate thread blocking bottleneck - EventConsumer now manages queue closing on terminal events - Background cleanup to avoid blocking request threads - Fixed race condition when clients disconnect during streaming - Better terminal event detection in ResultAggregator **Tests & Documentation:** - Updated all transport handler tests to pass executor parameter - Added executor configuration section to README.md Fixes #248 (it became necessary as part of the work)
1 parent 75b4150 commit e25328b

File tree

34 files changed

+1383
-254
lines changed

34 files changed

+1383
-254
lines changed

.github/workflows/run-tck.yml

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,88 @@ jobs:
9797
done
9898
9999
- name: Run TCK
100+
id: run-tck
101+
timeout-minutes: 5
100102
run: |
101103
./run_tck.py --sut-url ${{ env.SUT_JSONRPC_URL }} --category all --transports jsonrpc,grpc,rest --compliance-report report.json
102104
working-directory: tck/a2a-tck
105+
- name: Capture Thread Dump
106+
if: failure()
107+
run: |
108+
# Find the actual Quarkus JVM (child of Maven process), not the Maven parent
109+
# Look for the dev.jar process which is the actual application
110+
QUARKUS_PID=$(pgrep -f "a2a-tck-server-dev.jar" || echo "")
111+
if [ -n "$QUARKUS_PID" ]; then
112+
echo "📊 Capturing thread dump for Quarkus JVM PID $QUARKUS_PID"
113+
jstack $QUARKUS_PID > tck/target/thread-dump.txt || echo "Failed to capture thread dump"
114+
if [ -f tck/target/thread-dump.txt ]; then
115+
echo "✅ Thread dump captured ($(wc -l < tck/target/thread-dump.txt) lines)"
116+
fi
117+
else
118+
echo "⚠️ No Quarkus JVM process found for thread dump"
119+
echo "Available Java processes:"
120+
ps aux | grep java || true
121+
fi
122+
- name: Capture Heap Dump
123+
if: failure()
124+
run: |
125+
# Find the actual Quarkus JVM (child of Maven process), not the Maven parent
126+
QUARKUS_PID=$(pgrep -f "a2a-tck-server-dev.jar" || echo "")
127+
if [ -n "$QUARKUS_PID" ]; then
128+
echo "📊 Capturing heap dump for Quarkus JVM PID $QUARKUS_PID"
129+
jmap -dump:live,format=b,file=tck/target/heap-dump.hprof $QUARKUS_PID || echo "Failed to capture heap dump"
130+
if [ -f tck/target/heap-dump.hprof ]; then
131+
SIZE=$(du -h tck/target/heap-dump.hprof | cut -f1)
132+
echo "✅ Heap dump captured ($SIZE)"
133+
# Compress to reduce artifact size
134+
gzip tck/target/heap-dump.hprof
135+
COMPRESSED_SIZE=$(du -h tck/target/heap-dump.hprof.gz | cut -f1)
136+
echo "✅ Compressed heap dump ($COMPRESSED_SIZE)"
137+
fi
138+
else
139+
echo "⚠️ No Quarkus JVM process found for heap dump"
140+
echo "Available Java processes:"
141+
ps aux | grep java || true
142+
fi
143+
- name: Stop Quarkus Server
144+
if: always()
145+
run: |
146+
# Find and kill the Quarkus process to ensure logs are flushed
147+
pkill -f "quarkus:dev" || true
148+
sleep 2
149+
- name: Verify TCK Log
150+
if: failure()
151+
run: |
152+
echo "Checking for log file..."
153+
if [ -f tck/target/tck-test.log ]; then
154+
echo "✅ Log file exists ($(wc -l < tck/target/tck-test.log) lines)"
155+
ls -lh tck/target/tck-test.log
156+
else
157+
echo "❌ Log file not found at tck/target/tck-test.log"
158+
echo "Contents of tck/target/:"
159+
ls -la tck/target/ || echo "tck/target/ does not exist"
160+
fi
161+
- name: Upload TCK Log
162+
if: failure()
163+
uses: actions/upload-artifact@v4
164+
with:
165+
name: tck-test-log-java-${{ matrix.java-version }}
166+
path: tck/target/tck-test.log
167+
retention-days: 2
168+
if-no-files-found: warn
169+
- name: Upload Thread Dump
170+
if: failure()
171+
uses: actions/upload-artifact@v4
172+
with:
173+
name: thread-dump-java-${{ matrix.java-version }}
174+
path: tck/target/thread-dump.txt
175+
retention-days: 2
176+
if-no-files-found: warn
177+
- name: Upload Heap Dump
178+
if: failure()
179+
uses: actions/upload-artifact@v4
180+
with:
181+
name: heap-dump-java-${{ matrix.java-version }}
182+
path: tck/target/heap-dump.hprof.gz
183+
retention-days: 2
184+
if-no-files-found: warn
-1.41 MB
Binary file not shown.

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,38 @@ public class WeatherAgentExecutorProducer {
232232
}
233233
```
234234

235+
### 4. Configure Executor Settings (Optional)
236+
237+
The A2A Java SDK uses a dedicated executor for handling asynchronous operations like streaming subscriptions. By default, this executor is configured with a core pool size of 5 threads and a maximum pool size of 50 threads, optimized for I/O-bound operations.
238+
239+
You can customize the executor settings in your `application.properties`:
240+
241+
```properties
242+
# Core thread pool size for the @Internal executor (default: 5)
243+
a2a.executor.core-pool-size=5
244+
245+
# Maximum thread pool size (default: 50)
246+
a2a.executor.max-pool-size=50
247+
248+
# Thread keep-alive time in seconds (default: 60)
249+
a2a.executor.keep-alive-seconds=60
250+
```
251+
252+
**Why this matters:**
253+
- **Streaming Performance**: The executor handles streaming subscriptions. Too few threads can cause timeouts under concurrent load.
254+
- **Resource Management**: The dedicated executor prevents streaming operations from competing with the ForkJoinPool used by other async tasks.
255+
- **Concurrency**: In production environments with high concurrent streaming requests, increase the pool sizes accordingly.
256+
257+
**Default Configuration:**
258+
```properties
259+
# These are the defaults - no need to set unless you want different values
260+
a2a.executor.core-pool-size=5
261+
a2a.executor.max-pool-size=50
262+
a2a.executor.keep-alive-seconds=60
263+
```
264+
265+
**Note:** The reference server implementations automatically configure this executor. If you're creating a custom server integration, ensure you provide an `@Internal Executor` bean for optimal streaming performance.
266+
235267
## A2A Client
236268

237269
The A2A Java SDK provides a Java client implementation of the [Agent2Agent (A2A) Protocol](https://google-a2a.github.io/A2A), allowing communication with A2A servers. The Java client implementation supports the following transports:

extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) {
9494
.hook(new ReplicationHook(taskId));
9595
}
9696

97+
@Override
98+
public int getActiveChildQueueCount(String taskId) {
99+
return delegate.getActiveChildQueueCount(taskId);
100+
}
101+
97102
private class ReplicatingEventQueueFactory implements EventQueueFactory {
98103
@Override
99104
public EventQueue.EventQueueBuilder builder(String taskId) {

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.junit.jupiter.api.Assertions.assertNotEquals;
77
import static org.junit.jupiter.api.Assertions.assertNotNull;
88
import static org.junit.jupiter.api.Assertions.assertNull;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
910
import static org.junit.jupiter.api.Assertions.assertTrue;
1011
import static org.junit.jupiter.api.Assertions.fail;
1112

@@ -17,6 +18,7 @@
1718

1819
import io.a2a.server.events.EventQueue;
1920
import io.a2a.server.events.EventQueueClosedException;
21+
import io.a2a.server.events.EventQueueTestHelper;
2022
import io.a2a.spec.Event;
2123
import io.a2a.spec.StreamingEventKind;
2224
import io.a2a.spec.TaskState;
@@ -147,8 +149,11 @@ void testBasicQueueManagerFunctionality() throws InterruptedException {
147149
EventQueue queue = queueManager.createOrTap(taskId);
148150
assertNotNull(queue);
149151

152+
// createOrTap now returns ChildQueue, get returns MainQueue
150153
EventQueue retrievedQueue = queueManager.get(taskId);
151-
assertEquals(queue, retrievedQueue);
154+
assertNotNull(retrievedQueue);
155+
// queue should be a ChildQueue (cannot be tapped)
156+
assertThrows(IllegalStateException.class, () -> EventQueueTestHelper.tapQueue(queue));
152157

153158
EventQueue tappedQueue = queueManager.tap(taskId);
154159
assertNotNull(tappedQueue);
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.a2a.server.events;
2+
3+
/**
4+
* Utils to access package private methods in the io.a2a.server.events package
5+
*/
6+
public class EventQueueTestHelper {
7+
public static EventQueue tapQueue(EventQueue queue) {
8+
return queue.tap();
9+
}
10+
}

reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import jakarta.enterprise.inject.Instance;
44
import jakarta.inject.Inject;
55

6+
import java.util.concurrent.Executor;
7+
68
import io.a2a.server.PublicAgentCard;
79
import io.a2a.server.requesthandlers.RequestHandler;
10+
import io.a2a.server.util.async.Internal;
811
import io.a2a.spec.AgentCard;
912
import io.a2a.transport.grpc.handler.CallContextFactory;
1013
import io.a2a.transport.grpc.handler.GrpcHandler;
@@ -20,14 +23,17 @@ public class QuarkusGrpcHandler extends GrpcHandler {
2023
private final AgentCard agentCard;
2124
private final RequestHandler requestHandler;
2225
private final Instance<CallContextFactory> callContextFactoryInstance;
26+
private final Executor executor;
2327

2428
@Inject
2529
public QuarkusGrpcHandler(@PublicAgentCard AgentCard agentCard,
2630
RequestHandler requestHandler,
27-
Instance<CallContextFactory> callContextFactoryInstance) {
31+
Instance<CallContextFactory> callContextFactoryInstance,
32+
@Internal Executor executor) {
2833
this.agentCard = agentCard;
2934
this.requestHandler = requestHandler;
3035
this.callContextFactoryInstance = callContextFactoryInstance;
36+
this.executor = executor;
3137
}
3238

3339
@Override
@@ -44,4 +50,9 @@ protected AgentCard getAgentCard() {
4450
protected CallContextFactory getCallContextFactory() {
4551
return callContextFactoryInstance.isUnsatisfied() ? null : callContextFactoryInstance.get();
4652
}
53+
54+
@Override
55+
protected Executor getExecutor() {
56+
return executor;
57+
}
4758
}

reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ public Response getStreamingSubscribedCount() {
105105
return Response.ok(String.valueOf(streamingSubscribedCount.get()), TEXT_PLAIN).build();
106106
}
107107

108+
@GET
109+
@Path("/queue/childCount/{taskId}")
110+
@Produces(TEXT_PLAIN)
111+
public Response getChildQueueCount(@PathParam("taskId") String taskId) {
112+
int count = testUtilsBean.getChildQueueCount(taskId);
113+
return Response.ok(String.valueOf(count), TEXT_PLAIN).build();
114+
}
115+
108116
@DELETE
109117
@Path("/task/{taskId}/config/{configId}")
110118
public Response deleteTaskPushNotificationConfig(@PathParam("taskId") String taskId, @PathParam("configId") String configId) {

reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public void getStreamingSubscribedCount(RoutingContext rc) {
139139
.end(String.valueOf(streamingSubscribedCount.get()));
140140
}
141141

142+
@Route(path = "/test/queue/childCount/:taskId", methods = {Route.HttpMethod.GET}, produces = {TEXT_PLAIN})
143+
public void getChildQueueCount(@Param String taskId, RoutingContext rc) {
144+
int count = testUtilsBean.getChildQueueCount(taskId);
145+
rc.response()
146+
.setStatusCode(200)
147+
.end(String.valueOf(count));
148+
}
149+
142150
@Route(path = "/test/task/:taskId/config/:configId", methods = {Route.HttpMethod.DELETE}, type = Route.HandlerType.BLOCKING)
143151
public void deleteTaskPushNotificationConfig(@Param String taskId, @Param String configId, RoutingContext rc) {
144152
try {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient
1+
quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient

0 commit comments

Comments
 (0)