@@ -566,6 +566,35 @@ def connection_made(self, transport: asyncio.Transport) -> None:
566
566
self .transport = transport
567
567
self .proxy .target_transport = transport
568
568
569
+ def _ensure_output_processor (self ) -> None :
570
+ if self .proxy .context_tracking is None :
571
+ # No context tracking, no need to process pipeline
572
+ return
573
+
574
+ if self .sse_processor is not None :
575
+ # Already initialized, no need to reinitialize
576
+ return
577
+
578
+ # this is a hotfix - we shortcut before selecting the output pipeline for FIM
579
+ # because our FIM output pipeline is actually empty as of now. We should fix this
580
+ # but don't have any immediate need.
581
+ is_fim = self .proxy .context_tracking .metadata .get ("is_fim" , False )
582
+ if is_fim :
583
+ return
584
+
585
+ logger .debug ("Tracking context for pipeline processing" )
586
+ self .sse_processor = SSEProcessor ()
587
+ is_fim = self .proxy .context_tracking .metadata .get ("is_fim" , False )
588
+ if is_fim :
589
+ out_pipeline_processor = self .proxy .pipeline_factory .create_fim_output_pipeline ()
590
+ else :
591
+ out_pipeline_processor = self .proxy .pipeline_factory .create_output_pipeline ()
592
+
593
+ self .output_pipeline_instance = OutputPipelineInstance (
594
+ pipeline_steps = out_pipeline_processor .pipeline_steps ,
595
+ input_context = self .proxy .context_tracking ,
596
+ )
597
+
569
598
async def _process_stream (self ):
570
599
try :
571
600
@@ -633,14 +662,7 @@ def _proxy_transport_write(self, data: bytes):
633
662
634
663
def data_received (self , data : bytes ) -> None :
635
664
"""Handle data received from target"""
636
- if self .proxy .context_tracking is not None and self .sse_processor is None :
637
- logger .debug ("Tracking context for pipeline processing" )
638
- self .sse_processor = SSEProcessor ()
639
- out_pipeline_processor = self .proxy .pipeline_factory .create_output_pipeline ()
640
- self .output_pipeline_instance = OutputPipelineInstance (
641
- pipeline_steps = out_pipeline_processor .pipeline_steps ,
642
- input_context = self .proxy .context_tracking ,
643
- )
665
+ self ._ensure_output_processor ()
644
666
645
667
if self .proxy .transport and not self .proxy .transport .is_closing ():
646
668
if not self .sse_processor :
0 commit comments