@@ -304,7 +304,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
304304 // Store push notification config for newly created tasks (mirrors streaming logic)
305305 // Only for NEW tasks - existing tasks are handled by initMessageSend()
306306 if (mss .task () == null && kind instanceof Task createdTask && shouldAddPushInfo (params )) {
307- LOGGER .debug ("Storing push notification config for new task {}" , createdTask .getId ());
307+ LOGGER .debug ("Storing push notification config for new task {} (original taskId from params: {})" ,
308+ createdTask .getId (), params .message ().getTaskId ());
308309 pushConfigStore .setInfo (createdTask .getId (), params .configuration ().pushNotificationConfig ());
309310 }
310311
@@ -397,6 +398,16 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
397398 AtomicReference <String > taskId = new AtomicReference <>(mss .requestContext .getTaskId ());
398399 EventQueue queue = queueManager .createOrTap (taskId .get ());
399400 LOGGER .debug ("Created/tapped queue for task {}: {}" , taskId .get (), queue );
401+
402+ // Store push notification config SYNCHRONOUSLY for new tasks before agent starts
403+ // This ensures config is available when MainEventBusProcessor sends push notifications
404+ // For existing tasks, config was already stored in initMessageSend()
405+ if (mss .task () == null && shouldAddPushInfo (params )) {
406+ LOGGER .debug ("Storing push notification config for new streaming task {} EARLY (original taskId from params: {})" ,
407+ taskId .get (), params .message ().getTaskId ());
408+ pushConfigStore .setInfo (taskId .get (), params .configuration ().pushNotificationConfig ());
409+ }
410+
400411 ResultAggregator resultAggregator = new ResultAggregator (mss .taskManager , null , executor );
401412
402413 EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync (taskId .get (), mss .requestContext , queue );
@@ -425,14 +436,9 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
425436 } catch (TaskQueueExistsException e ) {
426437 // TODO Log
427438 }
428- if (pushConfigStore != null &&
429- params .configuration () != null &&
430- params .configuration ().pushNotificationConfig () != null ) {
431439
432- pushConfigStore .setInfo (
433- createdTask .getId (),
434- params .configuration ().pushNotificationConfig ());
435- }
440+ // Push notification config already stored synchronously at start of onMessageSendStream
441+ // for new tasks, or in initMessageSend for existing tasks. No need to store again here.
436442
437443 }
438444 // Push notifications now sent by MainEventBusProcessor after persistence
@@ -710,13 +716,10 @@ private CompletableFuture<Void> cleanupProducer(CompletableFuture<Void> agentFut
710716 }
711717
712718 if (isStreaming ) {
713- // For streaming: DON'T close the ChildQueue here
714- // The ChildQueue must stay open so MainEventBusProcessor can distribute events to it
715- // and the subscriber can consume them. EventConsumer will close the queue when:
716- // 1. A final event is detected, or
717- // 2. The subscriber cancels, or
718- // 3. EventConsumer completes naturally
719- LOGGER .debug ("Streaming call, NOT closing ChildQueue in cleanup for task {} (EventConsumer will close it)" , taskId );
719+ // For streaming: Queue lifecycle managed by EventConsumer
720+ // EventConsumer closes queue when it detects final event (or QueueClosedEvent from replication)
721+ // For fire-and-forget tasks, MainQueue stays open per architectural principle
722+ LOGGER .debug ("Streaming call for task {} - queue lifecycle managed by EventConsumer" , taskId );
720723 } else {
721724 // For non-streaming: close the ChildQueue and notify the parent MainQueue
722725 // The parent will close itself when all children are closed (childClosing logic)
0 commit comments