Skip to content

Commit be32b35

Browse files
committed
Fix resource leak
1 parent b7a1d05 commit be32b35

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
218218
pushNotificationCallback.run();
219219
} finally {
220220
if (interruptedOrNonBlocking) {
221-
CompletableFuture<Void> cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId, queue, false), executor);
222-
trackBackgroundTask(cleanupTask);
221+
trackBackgroundTask(cleanupProducer(taskId, queue, false));
223222
} else {
224223
cleanupProducer(taskId, queue, false);
225224
}
@@ -342,8 +341,7 @@ private void startBackgroundConsumption() {
342341
}
343342
});
344343
} finally {
345-
CompletableFuture<Void> cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId.get(), queue, true), executor);
346-
trackBackgroundTask(cleanupTask);
344+
trackBackgroundTask(cleanupProducer(taskId.get(), queue, true));
347345
}
348346
}
349347

@@ -480,8 +478,16 @@ public void run() {
480478
.whenComplete((v, err) -> {
481479
if (err != null) {
482480
runnable.setError(err);
483-
// Close queue on agent execution error
481+
// Close queue on error
484482
queue.close();
483+
} else {
484+
// Only close queue if task is in a final state
485+
Task task = taskStore.get(taskId);
486+
if (task != null && task.getStatus().state().isFinal()) {
487+
queue.close();
488+
} else {
489+
LOGGER.debug("Task {} not in final state or not yet created, keeping queue open", taskId);
490+
}
485491
}
486492
runnable.invokeDoneCallbacks();
487493
});
@@ -507,14 +513,14 @@ private void trackBackgroundTask(CompletableFuture<Void> task) {
507513
});
508514
}
509515

510-
private void cleanupProducer(String taskId, EventQueue queue, boolean isStreaming) {
516+
private CompletableFuture<Void> cleanupProducer(String taskId, EventQueue queue, boolean isStreaming) {
511517
LOGGER.debug("Starting cleanup for task {} (streaming={})", taskId, isStreaming);
512518
CompletableFuture<Void> agentFuture = runningAgents.get(taskId);
513519
if (agentFuture == null) {
514520
LOGGER.debug("No running agent found for task {}", taskId);
515-
return;
521+
return CompletableFuture.completedFuture(null); // Return completed future
516522
}
517-
agentFuture.whenComplete((v, t) -> {
523+
return agentFuture.whenComplete((v, t) -> {
518524
LOGGER.debug("Agent completed for task {}", taskId);
519525

520526
// Determine if we should keep the MainQueue alive

0 commit comments

Comments
 (0)