@@ -108,7 +108,6 @@ public class DefaultRequestHandler implements RequestHandler {
108108 private final Supplier <RequestContext .Builder > requestContextBuilder ;
109109
110110 private final ConcurrentMap <String , CompletableFuture <Void >> runningAgents = new ConcurrentHashMap <>();
111- private final Set <CompletableFuture <Void >> backgroundTasks = ConcurrentHashMap .newKeySet ();
112111
113112 private final Executor executor ;
114113
@@ -374,9 +373,9 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
374373 CompletableFuture <Void > agentFuture = runningAgents .remove (taskId );
375374 LOGGER .debug ("Removed agent for task {} from runningAgents in finally block, size after: {}" , taskId , runningAgents .size ());
376375
377- // Track cleanup as background task to avoid blocking Vert.x threads
376+ // Cleanup as background task to avoid blocking Vert.x threads
378377 // Pass the consumption future to ensure cleanup waits for background consumption to complete
379- trackBackgroundTask ( cleanupProducer (agentFuture , etai != null ? etai .consumptionFuture () : null , taskId , queue , false ) );
378+ cleanupProducer (agentFuture , etai != null ? etai .consumptionFuture () : null , taskId , queue , false );
380379 }
381380
382381 LOGGER .debug ("Returning: {}" , kind );
@@ -386,8 +385,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
386385 @ Override
387386 public Flow .Publisher <StreamingEventKind > onMessageSendStream (
388387 MessageSendParams params , ServerCallContext context ) throws JSONRPCError {
389- LOGGER .debug ("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {} " ,
390- params .message ().getTaskId (), params .message ().getContextId (), runningAgents .size (), backgroundTasks . size () );
388+ LOGGER .debug ("onMessageSendStream START - task: {}; context: {}; runningAgents: {}" ,
389+ params .message ().getTaskId (), params .message ().getContextId (), runningAgents .size ());
391390 MessageSendSetup mss = initMessageSend (params , context );
392391
393392 AtomicReference <String > taskId = new AtomicReference <>(mss .requestContext .getTaskId ());
@@ -398,12 +397,9 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
398397 EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync (taskId .get (), mss .requestContext , queue );
399398
400399 // Move consumer creation and callback registration outside try block
401- // so consumer is available for background consumption on client disconnect
402400 EventConsumer consumer = new EventConsumer (queue );
403401 producerRunnable .addDoneCallback (consumer .createAgentRunnableDoneCallback ());
404402
405- AtomicBoolean backgroundConsumeStarted = new AtomicBoolean (false );
406-
407403 try {
408404 Flow .Publisher <EventQueueItem > results = resultAggregator .consumeAndEmit (consumer );
409405
@@ -444,7 +440,8 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
444440
445441 Flow .Publisher <StreamingEventKind > finalPublisher = convertingProcessor (eventPublisher , event -> (StreamingEventKind ) event );
446442
447- // Wrap publisher to detect client disconnect and continue background consumption
443+ // Wrap publisher to detect client disconnect and immediately close ChildQueue
444+ // This prevents ChildQueue backpressure from blocking MainEventBusProcessor
448445 return subscriber -> {
449446 LOGGER .debug ("Creating subscription wrapper for task {}" , taskId .get ());
450447 finalPublisher .subscribe (new Flow .Subscriber <StreamingEventKind >() {
@@ -464,8 +461,10 @@ public void request(long n) {
464461
465462 @ Override
466463 public void cancel () {
467- LOGGER .debug ("Client cancelled subscription for task {}, starting background consumption" , taskId .get ());
468- startBackgroundConsumption ();
464+ LOGGER .debug ("Client cancelled subscription for task {}, closing ChildQueue immediately" , taskId .get ());
465+ // Close ChildQueue immediately to prevent backpressure
466+ // (clears queue and releases semaphore permits)
467+ queue .close (true ); // immediate=true
469468 subscription .cancel ();
470469 }
471470 });
@@ -490,45 +489,26 @@ public void onComplete() {
490489 subscriber .onComplete ();
491490 } catch (IllegalStateException e ) {
492491 // Client already disconnected and response closed - this is expected
493- // for streaming responses where client disconnect triggers background
494- // consumption. Log and ignore.
492+ // for streaming responses where client disconnect closes ChildQueue.
493+ // Log and ignore.
495494 if (e .getMessage () != null && e .getMessage ().contains ("Response has already been written" )) {
496495 LOGGER .debug ("Client disconnected before onComplete, response already closed for task {}" , taskId .get ());
497496 } else {
498497 throw e ;
499498 }
500499 }
501500 }
502-
503- private void startBackgroundConsumption () {
504- if (backgroundConsumeStarted .compareAndSet (false , true )) {
505- LOGGER .debug ("Starting background consumption for task {}" , taskId .get ());
506- // Client disconnected: continue consuming and persisting events in background
507- CompletableFuture <Void > bgTask = CompletableFuture .runAsync (() -> {
508- try {
509- LOGGER .debug ("Background consumption thread started for task {}" , taskId .get ());
510- resultAggregator .consumeAll (consumer );
511- LOGGER .debug ("Background consumption completed for task {}" , taskId .get ());
512- } catch (Exception e ) {
513- LOGGER .error ("Error during background consumption for task {}" , taskId .get (), e );
514- }
515- }, executor );
516- trackBackgroundTask (bgTask );
517- } else {
518- LOGGER .debug ("Background consumption already started for task {}" , taskId .get ());
519- }
520- }
521501 });
522502 };
523503 } finally {
524- LOGGER .debug ("onMessageSendStream FINALLY - task: {}; runningAgents: {}; backgroundTasks: {} " ,
525- taskId .get (), runningAgents .size (), backgroundTasks . size () );
504+ LOGGER .debug ("onMessageSendStream FINALLY - task: {}; runningAgents: {}" ,
505+ taskId .get (), runningAgents .size ());
526506
527507 // Remove agent from map immediately to prevent accumulation
528508 CompletableFuture <Void > agentFuture = runningAgents .remove (taskId .get ());
529509 LOGGER .debug ("Removed agent for task {} from runningAgents in finally block, size after: {}" , taskId .get (), runningAgents .size ());
530510
531- trackBackgroundTask ( cleanupProducer (agentFuture , null , taskId .get (), queue , true ) );
511+ cleanupProducer (agentFuture , null , taskId .get (), queue , true );
532512 }
533513 }
534514
@@ -695,47 +675,6 @@ public void run() {
695675 return runnable ;
696676 }
697677
698- private void trackBackgroundTask (CompletableFuture <Void > task ) {
699- backgroundTasks .add (task );
700- LOGGER .debug ("Tracking background task (total: {}): {}" , backgroundTasks .size (), task );
701-
702- task .whenComplete ((result , throwable ) -> {
703- try {
704- if (throwable != null ) {
705- // Unwrap CompletionException to check for CancellationException
706- Throwable cause = throwable ;
707- if (throwable instanceof java .util .concurrent .CompletionException && throwable .getCause () != null ) {
708- cause = throwable .getCause ();
709- }
710-
711- if (cause instanceof java .util .concurrent .CancellationException ) {
712- LOGGER .debug ("Background task cancelled: {}" , task );
713- } else {
714- LOGGER .error ("Background task failed" , throwable );
715- }
716- }
717- } finally {
718- backgroundTasks .remove (task );
719- LOGGER .debug ("Removed background task (remaining: {}): {}" , backgroundTasks .size (), task );
720- }
721- });
722- }
723-
724- /**
725- * Wait for all background tasks to complete.
726- * Useful for testing to ensure cleanup completes before assertions.
727- *
728- * @return CompletableFuture that completes when all background tasks finish
729- */
730- public CompletableFuture <Void > waitForBackgroundTasks () {
731- CompletableFuture <?>[] tasks = backgroundTasks .toArray (new CompletableFuture [0 ]);
732- if (tasks .length == 0 ) {
733- return CompletableFuture .completedFuture (null );
734- }
735- LOGGER .debug ("Waiting for {} background tasks to complete" , tasks .length );
736- return CompletableFuture .allOf (tasks );
737- }
738-
739678 private CompletableFuture <Void > cleanupProducer (CompletableFuture <Void > agentFuture , CompletableFuture <Void > consumptionFuture , String taskId , EventQueue queue , boolean isStreaming ) {
740679 LOGGER .debug ("Starting cleanup for task {} (streaming={})" , taskId , isStreaming );
741680 logThreadStats ("CLEANUP START" );
@@ -837,7 +776,6 @@ private void logThreadStats(String label) {
837776 LOGGER .debug ("=== THREAD STATS: {} ===" , label );
838777 LOGGER .debug ("Active threads: {}" , activeThreads );
839778 LOGGER .debug ("Running agents: {}" , runningAgents .size ());
840- LOGGER .debug ("Background tasks: {}" , backgroundTasks .size ());
841779 LOGGER .debug ("Queue manager active queues: {}" , queueManager .getClass ().getSimpleName ());
842780
843781 // List running agents
@@ -848,13 +786,6 @@ private void logThreadStats(String label) {
848786 );
849787 }
850788
851- // List background tasks
852- if (!backgroundTasks .isEmpty ()) {
853- LOGGER .debug ("Background tasks:" );
854- backgroundTasks .forEach (task ->
855- LOGGER .debug (" - {}: {}" , task , task .isDone () ? "DONE" : "RUNNING" )
856- );
857- }
858789 LOGGER .debug ("=== END THREAD STATS ===" );
859790 }
860791
0 commit comments