Skip to content

Commit fa93ee7

Browse files
kabirclaude
andcommitted
fix: Implement reference counting for EventQueue to prevent premature MainQueue closure
Previously, MainQueues closed immediately when any ChildQueue closed, causing race conditions where resubscription attempts would fail with TaskNotFoundError. This was particularly problematic in scenarios with multiple concurrent consumers or when clients resubscribed to active tasks. This commit introduces a reference counting mechanism where MainQueues track active ChildQueues and only close when all children have closed, preventing premature closure while consumers are still active. Changes: EventQueue architecture: - MainQueue now maintains a list of active ChildQueues - ChildQueue.close() notifies parent via childClosing() callback - MainQueue only closes when: immediate=true OR all children closed - Added getActiveChildCount() for testing reference counting mechanism Non-blocking message support: - Non-blocking non-final tasks keep MainQueue alive for resubscription - Added close(immediate, notifyParent) method to EventQueue - ChildQueue can close without decrementing parent reference count - DefaultRequestHandler detects non-blocking non-final scenarios - Support multiple messages to same task after non-blocking sendMessage - AUTH_REQUIRED state now continues processing in background ResultAggregator improvements: - consumeAndBreakOnInterrupt() supports blocking and non-blocking modes - AUTH_REQUIRED events trigger background continuation for additional messages - Refactored to use AsyncUtils.consumer() for consistent event processing - EventConsumer.close() explicitly closes queue to prevent resource leaks Robustness improvements: - onResubscribeToTask() calls createOrTap() for non-final tasks when queue missing - Provides recovery path for edge cases (server restart, explicit queue closure) - Allows client resubscription even if MainQueue was unexpectedly removed - Note: historical events unavailable, only future events delivered Test coverage: - Added testMainQueueReferenceCountingWithMultipleConsumers() - Added testNonBlockingWithMultipleMessages() - Verified TCK compliance for resubscription behavior Fixes race condition where: 1. Client A subscribes to task → ChildQueue A created 2. Client B resubscribes → ChildQueue B created 3. Client A disconnects → ChildQueue A closes → MainQueue closed prematurely 4. Client B receives no further events (incorrect) With reference counting: - MainQueue stays alive while any ChildQueue exists - All active consumers receive events until MainQueue closes - Resubscription works reliably for active tasks - Multiple messages can be sent to same non-blocking task 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 75b4150 commit fa93ee7

File tree

24 files changed

+905
-76
lines changed

24 files changed

+905
-76
lines changed

.serena/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/cache
-1.41 MB
Binary file not shown.

.serena/project.yml

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# language of the project (csharp, python, rust, java, typescript, go, cpp, or ruby)
2+
# * For C, use cpp
3+
# * For JavaScript, use typescript
4+
# Special requirements:
5+
# * csharp: Requires the presence of a .sln file in the project folder.
6+
language: java
7+
8+
# whether to use the project's gitignore file to ignore files
9+
# Added on 2025-04-07
10+
ignore_all_files_in_gitignore: true
11+
# list of additional paths to ignore
12+
# same syntax as gitignore, so you can use * and **
13+
# Was previously called `ignored_dirs`, please update your config if you are using that.
14+
# Added (renamed) on 2025-04-07
15+
ignored_paths: []
16+
17+
# whether the project is in read-only mode
18+
# If set to true, all editing tools will be disabled and attempts to use them will result in an error
19+
# Added on 2025-04-18
20+
read_only: false
21+
22+
# list of tool names to exclude. We recommend not excluding any tools, see the readme for more details.
23+
# Below is the complete list of tools for convenience.
24+
# To make sure you have the latest list of tools, and to view their descriptions,
25+
# execute `uv run scripts/print_tool_overview.py`.
26+
#
27+
# * `activate_project`: Activates a project by name.
28+
# * `check_onboarding_performed`: Checks whether project onboarding was already performed.
29+
# * `create_text_file`: Creates/overwrites a file in the project directory.
30+
# * `delete_lines`: Deletes a range of lines within a file.
31+
# * `delete_memory`: Deletes a memory from Serena's project-specific memory store.
32+
# * `execute_shell_command`: Executes a shell command.
33+
# * `find_referencing_code_snippets`: Finds code snippets in which the symbol at the given location is referenced.
34+
# * `find_referencing_symbols`: Finds symbols that reference the symbol at the given location (optionally filtered by type).
35+
# * `find_symbol`: Performs a global (or local) search for symbols with/containing a given name/substring (optionally filtered by type).
36+
# * `get_current_config`: Prints the current configuration of the agent, including the active and available projects, tools, contexts, and modes.
37+
# * `get_symbols_overview`: Gets an overview of the top-level symbols defined in a given file.
38+
# * `initial_instructions`: Gets the initial instructions for the current project.
39+
# Should only be used in settings where the system prompt cannot be set,
40+
# e.g. in clients you have no control over, like Claude Desktop.
41+
# * `insert_after_symbol`: Inserts content after the end of the definition of a given symbol.
42+
# * `insert_at_line`: Inserts content at a given line in a file.
43+
# * `insert_before_symbol`: Inserts content before the beginning of the definition of a given symbol.
44+
# * `list_dir`: Lists files and directories in the given directory (optionally with recursion).
45+
# * `list_memories`: Lists memories in Serena's project-specific memory store.
46+
# * `onboarding`: Performs onboarding (identifying the project structure and essential tasks, e.g. for testing or building).
47+
# * `prepare_for_new_conversation`: Provides instructions for preparing for a new conversation (in order to continue with the necessary context).
48+
# * `read_file`: Reads a file within the project directory.
49+
# * `read_memory`: Reads the memory with the given name from Serena's project-specific memory store.
50+
# * `remove_project`: Removes a project from the Serena configuration.
51+
# * `replace_lines`: Replaces a range of lines within a file with new content.
52+
# * `replace_symbol_body`: Replaces the full definition of a symbol.
53+
# * `restart_language_server`: Restarts the language server, may be necessary when edits not through Serena happen.
54+
# * `search_for_pattern`: Performs a search for a pattern in the project.
55+
# * `summarize_changes`: Provides instructions for summarizing the changes made to the codebase.
56+
# * `switch_modes`: Activates modes by providing a list of their names
57+
# * `think_about_collected_information`: Thinking tool for pondering the completeness of collected information.
58+
# * `think_about_task_adherence`: Thinking tool for determining whether the agent is still on track with the current task.
59+
# * `think_about_whether_you_are_done`: Thinking tool for determining whether the task is truly completed.
60+
# * `write_memory`: Writes a named memory (for future reference) to Serena's project-specific memory store.
61+
excluded_tools: []
62+
63+
# initial prompt for the project. It will always be given to the LLM upon activating the project
64+
# (contrary to the memories, which are loaded on demand).
65+
initial_prompt: ""
66+
67+
project_name: "a2a-java"

extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) {
9494
.hook(new ReplicationHook(taskId));
9595
}
9696

97+
@Override
98+
public int getActiveChildQueueCount(String taskId) {
99+
return delegate.getActiveChildQueueCount(taskId);
100+
}
101+
97102
private class ReplicatingEventQueueFactory implements EventQueueFactory {
98103
@Override
99104
public EventQueue.EventQueueBuilder builder(String taskId) {

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.junit.jupiter.api.Assertions.assertNotEquals;
77
import static org.junit.jupiter.api.Assertions.assertNotNull;
88
import static org.junit.jupiter.api.Assertions.assertNull;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
910
import static org.junit.jupiter.api.Assertions.assertTrue;
1011
import static org.junit.jupiter.api.Assertions.fail;
1112

@@ -17,6 +18,7 @@
1718

1819
import io.a2a.server.events.EventQueue;
1920
import io.a2a.server.events.EventQueueClosedException;
21+
import io.a2a.server.events.EventQueueTestHelper;
2022
import io.a2a.spec.Event;
2123
import io.a2a.spec.StreamingEventKind;
2224
import io.a2a.spec.TaskState;
@@ -147,8 +149,11 @@ void testBasicQueueManagerFunctionality() throws InterruptedException {
147149
EventQueue queue = queueManager.createOrTap(taskId);
148150
assertNotNull(queue);
149151

152+
// createOrTap now returns ChildQueue, get returns MainQueue
150153
EventQueue retrievedQueue = queueManager.get(taskId);
151-
assertEquals(queue, retrievedQueue);
154+
assertNotNull(retrievedQueue);
155+
// queue should be a ChildQueue (cannot be tapped)
156+
assertThrows(IllegalStateException.class, () -> EventQueueTestHelper.tapQueue(queue));
152157

153158
EventQueue tappedQueue = queueManager.tap(taskId);
154159
assertNotNull(tappedQueue);
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.a2a.server.events;
2+
3+
/**
4+
* Utils to access package private methods in the io.a2a.server.events package
5+
*/
6+
public class EventQueueTestHelper {
7+
public static EventQueue tapQueue(EventQueue queue) {
8+
return queue.tap();
9+
}
10+
}

reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ public Response getStreamingSubscribedCount() {
105105
return Response.ok(String.valueOf(streamingSubscribedCount.get()), TEXT_PLAIN).build();
106106
}
107107

108+
@GET
109+
@Path("/queue/childCount/{taskId}")
110+
@Produces(TEXT_PLAIN)
111+
public Response getChildQueueCount(@PathParam("taskId") String taskId) {
112+
int count = testUtilsBean.getChildQueueCount(taskId);
113+
return Response.ok(String.valueOf(count), TEXT_PLAIN).build();
114+
}
115+
108116
@DELETE
109117
@Path("/task/{taskId}/config/{configId}")
110118
public Response deleteTaskPushNotificationConfig(@PathParam("taskId") String taskId, @PathParam("configId") String configId) {

reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public void getStreamingSubscribedCount(RoutingContext rc) {
139139
.end(String.valueOf(streamingSubscribedCount.get()));
140140
}
141141

142+
@Route(path = "/test/queue/childCount/:taskId", methods = {Route.HttpMethod.GET}, produces = {TEXT_PLAIN})
143+
public void getChildQueueCount(@Param String taskId, RoutingContext rc) {
144+
int count = testUtilsBean.getChildQueueCount(taskId);
145+
rc.response()
146+
.setStatusCode(200)
147+
.end(String.valueOf(count));
148+
}
149+
142150
@Route(path = "/test/task/:taskId/config/:configId", methods = {Route.HttpMethod.DELETE}, type = Route.HandlerType.BLOCKING)
143151
public void deleteTaskPushNotificationConfig(@Param String taskId, @Param String configId, RoutingContext rc) {
144152
try {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient
1+
quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient

reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,14 @@ public void getStreamingSubscribedCount(RoutingContext rc) {
141141
.end(String.valueOf(streamingSubscribedCount.get()));
142142
}
143143

144+
@Route(path = "/test/queue/childCount/:taskId", methods = {Route.HttpMethod.GET}, produces = {TEXT_PLAIN})
145+
public void getChildQueueCount(@Param String taskId, RoutingContext rc) {
146+
int count = testUtilsBean.getChildQueueCount(taskId);
147+
rc.response()
148+
.setStatusCode(200)
149+
.end(String.valueOf(count));
150+
}
151+
144152
@Route(path = "/test/task/:taskId/config/:configId", methods = {Route.HttpMethod.DELETE}, type = Route.HandlerType.BLOCKING)
145153
public void deleteTaskPushNotificationConfig(@Param String taskId, @Param String configId, RoutingContext rc) {
146154
try {

0 commit comments

Comments
 (0)