@@ -583,6 +583,9 @@ class PipelineRun:
583583 _device_id : str | None = None
584584 """Optional device id set during run start."""
585585
586+ _satellite_id : str | None = None
587+ """Optional satellite id set during run start."""
588+
586589 _conversation_data : PipelineConversationData | None = None
587590 """Data tied to the conversation ID."""
588591
@@ -636,16 +639,21 @@ def process_event(self, event: PipelineEvent) -> None:
636639 return
637640 pipeline_data .pipeline_debug [self .pipeline .id ][self .id ].events .append (event )
638641
639- def start (self , conversation_id : str , device_id : str | None ) -> None :
642+ def start (
643+ self , conversation_id : str , device_id : str | None , satellite_id : str | None
644+ ) -> None :
640645 """Emit run start event."""
641646 self ._device_id = device_id
647+ self ._satellite_id = satellite_id
642648 self ._start_debug_recording_thread ()
643649
644650 data : dict [str , Any ] = {
645651 "pipeline" : self .pipeline .id ,
646652 "language" : self .language ,
647653 "conversation_id" : conversation_id ,
648654 }
655+ if satellite_id is not None :
656+ data ["satellite_id" ] = satellite_id
649657 if self .runner_data is not None :
650658 data ["runner_data" ] = self .runner_data
651659 if self .tts_stream :
@@ -1057,7 +1065,6 @@ async def recognize_intent(
10571065 self ,
10581066 intent_input : str ,
10591067 conversation_id : str ,
1060- device_id : str | None ,
10611068 conversation_extra_system_prompt : str | None ,
10621069 ) -> str :
10631070 """Run intent recognition portion of pipeline. Returns text to speak."""
@@ -1088,7 +1095,8 @@ async def recognize_intent(
10881095 "language" : input_language ,
10891096 "intent_input" : intent_input ,
10901097 "conversation_id" : conversation_id ,
1091- "device_id" : device_id ,
1098+ "device_id" : self ._device_id ,
1099+ "satellite_id" : self ._satellite_id ,
10921100 "prefer_local_intents" : self .pipeline .prefer_local_intents ,
10931101 },
10941102 )
@@ -1099,7 +1107,8 @@ async def recognize_intent(
10991107 text = intent_input ,
11001108 context = self .context ,
11011109 conversation_id = conversation_id ,
1102- device_id = device_id ,
1110+ device_id = self ._device_id ,
1111+ satellite_id = self ._satellite_id ,
11031112 language = input_language ,
11041113 agent_id = self .intent_agent .id ,
11051114 extra_system_prompt = conversation_extra_system_prompt ,
@@ -1269,6 +1278,7 @@ async def tts_input_stream_generator() -> AsyncGenerator[str]:
12691278 text = user_input .text ,
12701279 conversation_id = user_input .conversation_id ,
12711280 device_id = user_input .device_id ,
1281+ satellite_id = user_input .satellite_id ,
12721282 context = user_input .context ,
12731283 language = user_input .language ,
12741284 agent_id = user_input .agent_id ,
@@ -1567,10 +1577,15 @@ class PipelineInput:
15671577 device_id : str | None = None
15681578 """Identifier of the device that is processing the input/output of the pipeline."""
15691579
1580+ satellite_id : str | None = None
1581+ """Identifier of the satellite that is processing the input/output of the pipeline."""
1582+
15701583 async def execute (self ) -> None :
15711584 """Run pipeline."""
15721585 self .run .start (
1573- conversation_id = self .session .conversation_id , device_id = self .device_id
1586+ conversation_id = self .session .conversation_id ,
1587+ device_id = self .device_id ,
1588+ satellite_id = self .satellite_id ,
15741589 )
15751590 current_stage : PipelineStage | None = self .run .start_stage
15761591 stt_audio_buffer : list [EnhancedAudioChunk ] = []
@@ -1656,7 +1671,6 @@ async def buffer_then_audio_stream() -> AsyncGenerator[
16561671 tts_input = await self .run .recognize_intent (
16571672 intent_input ,
16581673 self .session .conversation_id ,
1659- self .device_id ,
16601674 self .conversation_extra_system_prompt ,
16611675 )
16621676 if tts_input .strip ():
0 commit comments