Skip to content

Commit 8903e3d

Browse files
authored
fix: Implement background cleanup and producer continuation for clien… (#284)
…t disconnect Backports key fixes from a2a-python PRs #440 and #472 to resolve event queue lifecycle issues that prevented task resubscription after client disconnect. - Add background task tracking to DefaultRequestHandler with CompletableFuture - Implement asynchronous cleanup when clients disconnect during streaming - Add EventQueue graceful vs immediate close behavior Fixes #283 🦕
1 parent ce15a42 commit 8903e3d

File tree

5 files changed

+530
-13
lines changed

5 files changed

+530
-13
lines changed

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,31 @@ public void taskDone() {
100100

101101
public abstract void close();
102102

103+
public abstract void close(boolean immediate);
104+
105+
public boolean isClosed() {
106+
return closed;
107+
}
108+
103109
public void doClose() {
110+
doClose(false);
111+
}
112+
113+
public void doClose(boolean immediate) {
104114
synchronized (this) {
105115
if (closed) {
106116
return;
107117
}
108-
LOGGER.debug("Closing {}", this);
118+
LOGGER.debug("Closing {} (immediate={})", this, immediate);
109119
closed = true;
110120
}
111-
// Although the Python implementation drains the queue on closing,
112-
// here it makes events go missing
113-
// TODO do we actually need to drain it? If we do, we need some mechanism to determine that noone is
114-
// polling any longer and drain it asynchronously once it is all done. That could perhaps be done
115-
// via an EnhancedRunnable.DoneCallback.
116-
//queue.drainTo(new ArrayList<>());
121+
122+
if (immediate) {
123+
// Immediate close: clear pending events
124+
queue.clear();
125+
LOGGER.debug("Cleared queue for immediate close: {}", this);
126+
}
127+
// For graceful close, let the queue drain naturally through normal consumption
117128
}
118129

119130
static class MainQueue extends EventQueue {
@@ -151,8 +162,13 @@ void signalQueuePollerStarted() {
151162

152163
@Override
153164
public void close() {
154-
doClose();
155-
children.forEach(EventQueue::doClose);
165+
close(false);
166+
}
167+
168+
@Override
169+
public void close(boolean immediate) {
170+
doClose(immediate);
171+
children.forEach(child -> child.doClose(immediate));
156172
}
157173
}
158174

@@ -191,5 +207,10 @@ void signalQueuePollerStarted() {
191207
public void close() {
192208
parent.close();
193209
}
210+
211+
@Override
212+
public void close(boolean immediate) {
213+
parent.close(immediate);
214+
}
194215
}
195216
}

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.concurrent.Flow;
1616
import java.util.concurrent.atomic.AtomicReference;
1717
import java.util.function.Supplier;
18+
import java.util.Set;
19+
import java.util.concurrent.TimeUnit;
1820

1921
import io.a2a.server.ServerCallContext;
2022
import io.a2a.server.agentexecution.AgentExecutor;
@@ -67,6 +69,7 @@ public class DefaultRequestHandler implements RequestHandler {
6769
private final Supplier<RequestContext.Builder> requestContextBuilder;
6870

6971
private final ConcurrentMap<String, CompletableFuture<Void>> runningAgents = new ConcurrentHashMap<>();
72+
private final Set<CompletableFuture<Void>> backgroundTasks = ConcurrentHashMap.newKeySet();
7073

7174
private final Executor executor;
7275

@@ -190,8 +193,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
190193

191194
} finally {
192195
if (interrupted) {
193-
// TODO Make this async
194-
cleanupProducer(taskId);
196+
CompletableFuture<Void> cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId), executor);
197+
trackBackgroundTask(cleanupTask);
195198
} else {
196199
cleanupProducer(taskId);
197200
}
@@ -212,9 +215,9 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
212215
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null);
213216

214217
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId.get(), mss.requestContext, queue);
218+
EventConsumer consumer = new EventConsumer(queue);
215219

216220
try {
217-
EventConsumer consumer = new EventConsumer(queue);
218221

219222
// This callback must be added before we start consuming. Otherwise,
220223
// any errors thrown by the producerRunnable are not picked up by the consumer
@@ -258,7 +261,8 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
258261

259262
return convertingProcessor(eventPublisher, event -> (StreamingEventKind) event);
260263
} finally {
261-
cleanupProducer(taskId.get());
264+
CompletableFuture<Void> cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId.get()), executor);
265+
trackBackgroundTask(cleanupTask);
262266
}
263267
}
264268

@@ -396,6 +400,24 @@ public void run() {
396400
return runnable;
397401
}
398402

403+
private void trackBackgroundTask(CompletableFuture<Void> task) {
404+
backgroundTasks.add(task);
405+
406+
task.whenComplete((result, throwable) -> {
407+
try {
408+
if (throwable != null) {
409+
if (throwable instanceof java.util.concurrent.CancellationException) {
410+
LOGGER.debug("Background task cancelled: {}", task);
411+
} else {
412+
LOGGER.error("Background task failed", throwable);
413+
}
414+
}
415+
} finally {
416+
backgroundTasks.remove(task);
417+
}
418+
});
419+
}
420+
399421
private void cleanupProducer(String taskId) {
400422
// TODO the Python implementation waits for the producerRunnable
401423
runningAgents.get(taskId)

server-common/src/test/java/io/a2a/server/events/EventQueueTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.junit.jupiter.api.Assertions.assertNull;
44
import static org.junit.jupiter.api.Assertions.assertSame;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
56

67
import java.util.List;
78

@@ -113,4 +114,75 @@ public void testEnqueueDifferentEventTypes() throws Exception {
113114
assertSame(event, dequeuedEvent);
114115
}
115116
}
117+
118+
/**
119+
* Test close behavior sets flag and handles graceful close.
120+
* Backported from Python test: test_close_sets_flag_and_handles_internal_queue_old_python
121+
*/
122+
@Test
123+
public void testCloseGracefulSetsFlag() throws Exception {
124+
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
125+
eventQueue.enqueueEvent(event);
126+
127+
eventQueue.close(false); // Graceful close
128+
assertTrue(eventQueue.isClosed());
129+
}
130+
131+
/**
132+
* Test immediate close behavior.
133+
* Backported from Python test behavior
134+
*/
135+
@Test
136+
public void testCloseImmediateClearsQueue() throws Exception {
137+
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
138+
eventQueue.enqueueEvent(event);
139+
140+
eventQueue.close(true); // Immediate close
141+
assertTrue(eventQueue.isClosed());
142+
143+
// After immediate close, queue should be cleared
144+
// Attempting to dequeue should return null or throw exception
145+
try {
146+
Event dequeuedEvent = eventQueue.dequeueEvent(-1);
147+
// If we get here, the event should be null (queue was cleared)
148+
assertNull(dequeuedEvent);
149+
} catch (EventQueueClosedException e) {
150+
// This is also acceptable - queue is closed
151+
}
152+
}
153+
154+
/**
155+
* Test that close is idempotent.
156+
* Backported from Python test: test_close_idempotent
157+
*/
158+
@Test
159+
public void testCloseIdempotent() throws Exception {
160+
eventQueue.close();
161+
assertTrue(eventQueue.isClosed());
162+
163+
// Calling close again should not cause issues
164+
eventQueue.close();
165+
assertTrue(eventQueue.isClosed());
166+
167+
// Test with immediate close as well
168+
EventQueue eventQueue2 = EventQueue.create();
169+
eventQueue2.close(true);
170+
assertTrue(eventQueue2.isClosed());
171+
172+
eventQueue2.close(true);
173+
assertTrue(eventQueue2.isClosed());
174+
}
175+
176+
/**
177+
* Test that child queues are closed when parent closes.
178+
*/
179+
@Test
180+
public void testCloseChildQueues() throws Exception {
181+
EventQueue childQueue = eventQueue.tap();
182+
assertTrue(childQueue != null);
183+
184+
eventQueue.close();
185+
assertTrue(eventQueue.isClosed());
186+
assertTrue(childQueue.isClosed());
187+
}
116188
}

0 commit comments

Comments
 (0)