@@ -231,11 +231,13 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
231231 @ Override
232232 public Flow .Publisher <StreamingEventKind > onMessageSendStream (
233233 MessageSendParams params , ServerCallContext context ) throws JSONRPCError {
234- LOGGER .debug ("onMessageSendStream - task: {}; context {}" , params .message ().getTaskId (), params .message ().getContextId ());
234+ LOGGER .debug ("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}" ,
235+ params .message ().getTaskId (), params .message ().getContextId (), runningAgents .size (), backgroundTasks .size ());
235236 MessageSendSetup mss = initMessageSend (params , context );
236237
237238 AtomicReference <String > taskId = new AtomicReference <>(mss .requestContext .getTaskId ());
238239 EventQueue queue = queueManager .createOrTap (taskId .get ());
240+ LOGGER .debug ("Created/tapped queue for task {}: {}" , taskId .get (), queue );
239241 ResultAggregator resultAggregator = new ResultAggregator (mss .taskManager , null );
240242
241243 EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync (taskId .get (), mss .requestContext , queue );
@@ -288,59 +290,73 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
288290 Flow .Publisher <StreamingEventKind > finalPublisher = convertingProcessor (eventPublisher , event -> (StreamingEventKind ) event );
289291
290292 // Wrap publisher to detect client disconnect and continue background consumption
291- return subscriber -> finalPublisher .subscribe (new Flow .Subscriber <StreamingEventKind >() {
292- private Flow .Subscription subscription ;
293-
294- @ Override
295- public void onSubscribe (Flow .Subscription subscription ) {
296- this .subscription = subscription ;
297- // Wrap subscription to detect cancellation
298- subscriber .onSubscribe (new Flow .Subscription () {
299- @ Override
300- public void request (long n ) {
301- subscription .request (n );
302- }
293+ return subscriber -> {
294+ LOGGER .debug ("Creating subscription wrapper for task {}" , taskId .get ());
295+ finalPublisher .subscribe (new Flow .Subscriber <StreamingEventKind >() {
296+ private Flow .Subscription subscription ;
297+
298+ @ Override
299+ public void onSubscribe (Flow .Subscription subscription ) {
300+ LOGGER .debug ("onSubscribe called for task {}" , taskId .get ());
301+ this .subscription = subscription ;
302+ // Wrap subscription to detect cancellation
303+ subscriber .onSubscribe (new Flow .Subscription () {
304+ @ Override
305+ public void request (long n ) {
306+ LOGGER .debug ("Subscription.request({}) for task {}" , n , taskId .get ());
307+ subscription .request (n );
308+ }
303309
304- @ Override
305- public void cancel () {
306- LOGGER .debug ("Client cancelled subscription for task {}, starting background consumption" , taskId .get ());
307- startBackgroundConsumption ();
308- subscription .cancel ();
309- }
310- });
311- }
310+ @ Override
311+ public void cancel () {
312+ LOGGER .debug ("Client cancelled subscription for task {}, starting background consumption" , taskId .get ());
313+ startBackgroundConsumption ();
314+ subscription .cancel ();
315+ }
316+ });
317+ }
312318
313- @ Override
314- public void onNext (StreamingEventKind item ) {
315- subscriber .onNext (item );
316- }
319+ @ Override
320+ public void onNext (StreamingEventKind item ) {
321+ LOGGER .debug ("onNext: {} for task {}" , item .getClass ().getSimpleName (), taskId .get ());
322+ subscriber .onNext (item );
323+ }
317324
318- @ Override
319- public void onError (Throwable throwable ) {
320- subscriber .onError (throwable );
321- }
325+ @ Override
326+ public void onError (Throwable throwable ) {
327+ LOGGER .error ("onError for task {}" , taskId .get (), throwable );
328+ subscriber .onError (throwable );
329+ }
322330
323- @ Override
324- public void onComplete () {
325- subscriber .onComplete ();
326- }
331+ @ Override
332+ public void onComplete () {
333+ LOGGER .debug ("onComplete for task {}" , taskId .get ());
334+ subscriber .onComplete ();
335+ }
327336
328- private void startBackgroundConsumption () {
329- if (backgroundConsumeStarted .compareAndSet (false , true )) {
330- // Client disconnected: continue consuming and persisting events in background
331- CompletableFuture <Void > bgTask = CompletableFuture .runAsync (() -> {
332- try {
333- resultAggregator .consumeAll (consumer );
334- LOGGER .debug ("Background consumption completed for task {}" , taskId .get ());
335- } catch (Exception e ) {
336- LOGGER .error ("Error during background consumption for task {}" , taskId .get (), e );
337- }
338- }, executor );
339- trackBackgroundTask (bgTask );
337+ private void startBackgroundConsumption () {
338+ if (backgroundConsumeStarted .compareAndSet (false , true )) {
339+ LOGGER .debug ("Starting background consumption for task {}" , taskId .get ());
340+ // Client disconnected: continue consuming and persisting events in background
341+ CompletableFuture <Void > bgTask = CompletableFuture .runAsync (() -> {
342+ try {
343+ LOGGER .debug ("Background consumption thread started for task {}" , taskId .get ());
344+ resultAggregator .consumeAll (consumer );
345+ LOGGER .debug ("Background consumption completed for task {}" , taskId .get ());
346+ } catch (Exception e ) {
347+ LOGGER .error ("Error during background consumption for task {}" , taskId .get (), e );
348+ }
349+ }, executor );
350+ trackBackgroundTask (bgTask );
351+ } else {
352+ LOGGER .debug ("Background consumption already started for task {}" , taskId .get ());
353+ }
340354 }
341- }
342- }) ;
355+ });
356+ };
343357 } finally {
358+ LOGGER .debug ("onMessageSendStream FINALLY - task: {}; runningAgents: {}; backgroundTasks: {}" ,
359+ taskId .get (), runningAgents .size (), backgroundTasks .size ());
344360 trackBackgroundTask (cleanupProducer (taskId .get (), queue , true ));
345361 }
346362 }
@@ -462,10 +478,14 @@ private boolean shouldAddPushInfo(MessageSendParams params) {
462478 }
463479
464480 private EnhancedRunnable registerAndExecuteAgentAsync (String taskId , RequestContext requestContext , EventQueue queue ) {
481+ LOGGER .debug ("Registering agent execution for task {}, runningAgents.size() before: {}" , taskId , runningAgents .size ());
482+
465483 EnhancedRunnable runnable = new EnhancedRunnable () {
466484 @ Override
467485 public void run () {
486+ LOGGER .debug ("Agent execution starting for task {}" , taskId );
468487 agentExecutor .execute (requestContext , queue );
488+ LOGGER .debug ("Agent execution completed for task {}" , taskId );
469489 try {
470490 queueManager .awaitQueuePollerStart (queue );
471491 } catch (InterruptedException e ) {
@@ -477,13 +497,15 @@ public void run() {
477497 CompletableFuture <Void > cf = CompletableFuture .runAsync (runnable , executor )
478498 .whenComplete ((v , err ) -> {
479499 if (err != null ) {
500+ LOGGER .error ("Agent execution failed for task {}" , taskId , err );
480501 runnable .setError (err );
481502 // Close queue on error
482503 queue .close ();
483504 } else {
484505 // Only close queue if task is in a final state
485506 Task task = taskStore .get (taskId );
486507 if (task != null && task .getStatus ().state ().isFinal ()) {
508+ LOGGER .debug ("Task {} in final state {}, closing queue" , taskId , task .getStatus ().state ());
487509 queue .close ();
488510 } else {
489511 LOGGER .debug ("Task {} not in final state or not yet created, keeping queue open" , taskId );
@@ -492,11 +514,13 @@ public void run() {
492514 runnable .invokeDoneCallbacks ();
493515 });
494516 runningAgents .put (taskId , cf );
517+ LOGGER .debug ("Registered agent for task {}, runningAgents.size() after: {}" , taskId , runningAgents .size ());
495518 return runnable ;
496519 }
497520
498521 private void trackBackgroundTask (CompletableFuture <Void > task ) {
499522 backgroundTasks .add (task );
523+ LOGGER .debug ("Tracking background task (total: {}): {}" , backgroundTasks .size (), task );
500524
501525 task .whenComplete ((result , throwable ) -> {
502526 try {
@@ -509,19 +533,25 @@ private void trackBackgroundTask(CompletableFuture<Void> task) {
509533 }
510534 } finally {
511535 backgroundTasks .remove (task );
536+ LOGGER .debug ("Removed background task (remaining: {}): {}" , backgroundTasks .size (), task );
512537 }
513538 });
514539 }
515540
516541 private CompletableFuture <Void > cleanupProducer (String taskId , EventQueue queue , boolean isStreaming ) {
517- LOGGER .debug ("Starting cleanup for task {} (streaming={})" , taskId , isStreaming );
542+ LOGGER .debug ("Starting cleanup for task {} (streaming={}, queue={}, runningAgents.size={})" ,
543+ taskId , isStreaming , queue , runningAgents .size ());
518544 CompletableFuture <Void > agentFuture = runningAgents .get (taskId );
519545 if (agentFuture == null ) {
520- LOGGER .debug ("No running agent found for task {}" , taskId );
546+ LOGGER .debug ("No running agent found for task {}, cleanup complete " , taskId );
521547 return CompletableFuture .completedFuture (null ); // Return completed future
522548 }
523549 return agentFuture .whenComplete ((v , t ) -> {
524- LOGGER .debug ("Agent completed for task {}" , taskId );
550+ if (t != null ) {
551+ LOGGER .debug ("Agent completed with error for task {}" , taskId , t );
552+ } else {
553+ LOGGER .debug ("Agent completed successfully for task {}" , taskId );
554+ }
525555
526556 // Determine if we should keep the MainQueue alive
527557 // For non-streaming, non-blocking requests with non-final tasks, we close the ChildQueue
@@ -531,17 +561,21 @@ private CompletableFuture<Void> cleanupProducer(String taskId, EventQueue queue,
531561 Task task = taskStore .get (taskId );
532562 if (task != null && !task .getStatus ().state ().isFinal ()) {
533563 keepMainQueueAlive = true ;
534- LOGGER .debug ("Non-streaming call with non-final task {}, closing ChildQueue but keeping MainQueue alive for resubscription" , taskId );
564+ LOGGER .debug ("Non-streaming call with non-final task {} (state={}), closing ChildQueue but keeping MainQueue alive for resubscription" ,
565+ taskId , task .getStatus ().state ());
535566 }
536567 }
537568
538- LOGGER .debug ("{} call, closing queue for task {}" , isStreaming ? "Streaming" : "Non-streaming" , taskId );
569+ LOGGER .debug ("{} call, closing queue for task {} (immediate=false, notifyParent={})" ,
570+ isStreaming ? "Streaming" : "Non-streaming" , taskId , !keepMainQueueAlive );
539571
540572 // Close the ChildQueue, optionally keeping MainQueue alive
541573 queue .close (false , !keepMainQueueAlive );
542574
543575 // Always remove from running agents
544- runningAgents .remove (taskId , agentFuture );
576+ boolean removed = runningAgents .remove (taskId , agentFuture );
577+ LOGGER .debug ("Removed agent for task {} from runningAgents: {}, runningAgents.size() after: {}" ,
578+ taskId , removed , runningAgents .size ());
545579 });
546580 }
547581
0 commit comments