@@ -407,7 +407,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
407407 // Store push notification config for newly created tasks (mirrors streaming logic)
408408 // Only for NEW tasks - existing tasks are handled by initMessageSend()
409409 if (mss .task () == null && kind instanceof Task createdTask && shouldAddPushInfo (params )) {
410- LOGGER .debug ("Storing push notification config for new task {}" , createdTask .id ());
410+ LOGGER .debug ("Storing push notification config for new task {} (original taskId from params: {})" ,
411+ createdTask .id (), params .message ().taskId ());
411412 pushConfigStore .setInfo (createdTask .id (), params .configuration ().pushNotificationConfig ());
412413 }
413414
@@ -508,6 +509,18 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
508509 @ SuppressWarnings ("NullAway" )
509510 EventQueue queue = queueManager .createOrTap (taskId .get ());
510511 LOGGER .debug ("Created/tapped queue for task {}: {}" , taskId .get (), queue );
512+
513+ // Store push notification config SYNCHRONOUSLY for new tasks before agent starts
514+ // This ensures config is available when MainEventBusProcessor sends push notifications
515+ // For existing tasks, config was already stored in initMessageSend()
516+ if (mss .task () == null && shouldAddPushInfo (params )) {
517+ // Satisfy Nullaway
518+ Objects .requireNonNull (taskId .get (), "taskId was null" );
519+ LOGGER .debug ("Storing push notification config for new streaming task {} EARLY (original taskId from params: {})" ,
520+ taskId .get (), params .message ().taskId ());
521+ pushConfigStore .setInfo (taskId .get (), params .configuration ().pushNotificationConfig ());
522+ }
523+
511524 ResultAggregator resultAggregator = new ResultAggregator (mss .taskManager , null , executor );
512525
513526 EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync (queueTaskId , mss .requestContext , queue );
@@ -536,15 +549,6 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
536549 } catch (TaskQueueExistsException e ) {
537550 // TODO Log
538551 }
539- if (pushConfigStore != null &&
540- params .configuration () != null &&
541- params .configuration ().pushNotificationConfig () != null ) {
542-
543- pushConfigStore .setInfo (
544- createdTask .id (),
545- params .configuration ().pushNotificationConfig ());
546- }
547-
548552 }
549553 return true ;
550554 }));
@@ -815,13 +819,10 @@ private CompletableFuture<Void> cleanupProducer(@Nullable CompletableFuture<Void
815819 }
816820
817821 if (isStreaming ) {
818- // For streaming: DON'T close the ChildQueue here
819- // The ChildQueue must stay open so MainEventBusProcessor can distribute events to it
820- // and the subscriber can consume them. EventConsumer will close the queue when:
821- // 1. A final event is detected, or
822- // 2. The subscriber cancels, or
823- // 3. EventConsumer completes naturally
824- LOGGER .debug ("Streaming call, NOT closing ChildQueue in cleanup for task {} (EventConsumer will close it)" , taskId );
822+ // For streaming: Queue lifecycle managed by EventConsumer
823+ // EventConsumer closes queue when it detects final event (or QueueClosedEvent from replication)
824+ // For fire-and-forget tasks, MainQueue stays open per architectural principle
825+ LOGGER .debug ("Streaming call for task {} - queue lifecycle managed by EventConsumer" , taskId );
825826 } else {
826827 // For non-streaming: close the ChildQueue and notify the parent MainQueue
827828 // The parent will close itself when all children are closed (childClosing logic)
0 commit comments