22from fastapi .responses import StreamingResponse
33import json
44import logging
5+ import time
56from datetime import datetime
67from types import SimpleNamespace
78from uuid import uuid4
@@ -137,6 +138,9 @@ async def _get_instruction_suggestions_for_completion(
137138tracer = get_tracer (__name__ )
138139
139140
141+ logger = logging .getLogger (__name__ )
142+
143+
140144class CompletionService :
141145
142146 def __init__ (self ):
@@ -1552,11 +1556,21 @@ async def _create_completion_stream_traced(
15521556 build_id : str = None ,
15531557 ):
15541558 try :
1559+ t0 = time .monotonic ()
1560+ rid = str (report_id )[:8 ]
1561+
1562+ def _log (label ):
1563+ elapsed = (time .monotonic () - t0 ) * 1000
1564+ logger .info (f"[stream:{ rid } ] { label } +{ elapsed :.0f} ms" )
1565+
1566+ _log ("stream_start" )
1567+
15551568 # Validate report exists (same as regular create_completion)
15561569 result = await db .execute (select (Report ).filter (Report .id == report_id ))
15571570 report = result .scalar_one_or_none ()
15581571 if not report :
15591572 raise HTTPException (status_code = 404 , detail = "Report not found" )
1573+ _log ("report_fetched" )
15601574
15611575 # Validate widget if provided
15621576 if completion_data .prompt .widget_id :
@@ -1577,6 +1591,7 @@ async def _create_completion_stream_traced(
15771591 step = None
15781592
15791593 span .add_event ("validation_done" )
1594+ _log ("validation_done" )
15801595
15811596 # Get default model
15821597 if completion_data .prompt and completion_data .prompt .model_id :
@@ -1589,11 +1604,13 @@ async def _create_completion_stream_traced(
15891604 status_code = 400 ,
15901605 detail = "No default LLM model configured. Please go to Settings > LLM and set a default model."
15911606 )
1607+ _log ("model_resolved" )
15921608
15931609 small_model = await self .llm_service .get_default_model (db , organization , current_user , is_small = True )
15941610 # Fallback: if no small model configured, use the main model
15951611 if not small_model :
15961612 small_model = model
1613+ _log ("small_model_resolved" )
15971614
15981615 span .set_attribute ("llm.model_id" , model .model_id )
15991616
@@ -1646,19 +1663,22 @@ async def _create_completion_stream_traced(
16461663 status_code = 500 ,
16471664 detail = f"Failed to save completions: { str (e )} "
16481665 )
1666+ _log ("db_commit_flush" )
16491667
16501668 span .set_attribute ("completion.head_id" , str (completion .id ))
16511669 span .set_attribute ("completion.system_id" , str (system_completion .id ))
16521670 span .add_event ("completions_saved" )
16531671
16541672 # Mark image files with this completion_id (so they show attached to this message)
16551673 await self ._mark_images_with_completion (db , report .id , str (completion .id ))
1674+ _log ("images_marked" )
16561675
16571676 # Store mentions associated with the user completion (best-effort, non-blocking)
16581677 try :
16591678 await self .mention_service .create_completion_mentions (db , completion )
16601679 except Exception as e :
1661- logging .error (f"Failed to create mentions for completion { completion .id } : { e } " )
1680+ logger .error (f"Failed to create mentions for completion { completion .id } : { e } " )
1681+ _log ("mentions_created" )
16621682
16631683 # Audit log
16641684 try :
@@ -1673,31 +1693,43 @@ async def _create_completion_stream_traced(
16731693 )
16741694 except Exception :
16751695 pass
1696+ _log ("audit_logged" )
16761697
16771698 org_settings = await organization .get_settings (db )
1699+ _log ("org_settings_fetched" )
16781700 resolved_build_id = await self ._resolve_build_id (db , organization , build_id )
1701+ _log ("build_id_resolved" )
16791702
16801703 # Create event queue for streaming
16811704 event_queue = CompletionEventQueue ()
16821705
16831706 async def run_agent_with_streaming ():
16841707 """Run agent in background and stream events."""
1708+ at0 = time .monotonic ()
1709+
1710+ def _alog (label ):
1711+ elapsed = (time .monotonic () - at0 ) * 1000
1712+ logger .info (f"[stream:{ rid } :agent] { label } +{ elapsed :.0f} ms" )
1713+
16851714 with tracer .start_as_current_span ("completion.stream_agent_execution" ) as agent_span :
16861715 agent_span .set_attribute ("report.id" , str (report .id ))
16871716 agent_span .set_attribute ("completion.system_id" , str (system_completion .id ))
16881717 agent_span .set_attribute ("llm.model_id" , model .model_id )
16891718 async_session = create_async_session_factory ()
16901719 async with async_session () as session :
16911720 try :
1721+ _alog ("session_opened" )
1722+
16921723 # Re-fetch all database-dependent objects using the new session
16931724 report_obj = await session .get (Report , report .id )
16941725 completion_obj = await session .get (Completion , completion .id )
16951726 system_completion_obj = await session .get (Completion , system_completion .id )
16961727 widget_obj = await session .get (Widget , widget .id ) if widget else None
16971728 step_obj = await session .get (Step , step .id ) if step else None
1729+ _alog ("objects_refetched" )
16981730
16991731 if not all ([report_obj , completion_obj , system_completion_obj ]):
1700- logging .error ("Failed to fetch necessary objects for streaming agent." )
1732+ logger .error ("Failed to fetch necessary objects for streaming agent." )
17011733 error_event = SSEEvent (
17021734 event = "completion.error" ,
17031735 completion_id = str (system_completion .id ),
@@ -1712,10 +1744,12 @@ async def run_agent_with_streaming():
17121744 ds_clients = await self .data_source_service .construct_clients (session , data_source , current_user )
17131745 clients .update (ds_clients )
17141746 clients_span .set_attribute ("data_sources.count" , len (report_obj .data_sources ))
1747+ _alog (f"clients_constructed count={ len (report_obj .data_sources )} " )
17151748
17161749 # Pre-load files relationship in async context to avoid greenlet error in AgentV2.__init__
17171750 # (AgentV2.__init__ is synchronous, so lazy-loading files there would fail)
17181751 _ = report_obj .files
1752+ _alog ("files_preloaded" )
17191753
17201754 # Create agent with event queue
17211755 agent = AgentV2 (
@@ -1735,6 +1769,7 @@ async def run_agent_with_streaming():
17351769 clients = clients ,
17361770 build_id = resolved_build_id ,
17371771 )
1772+ _alog ("agent_initialized" )
17381773
17391774 # Emit telemetry: stream started
17401775 try :
@@ -1754,9 +1789,11 @@ async def run_agent_with_streaming():
17541789
17551790 # Run agent execution
17561791 agent_span .add_event ("agent_execution_started" )
1792+ _alog ("agent_execution_start" )
17571793 with tracer .start_as_current_span ("completion.agent_execution" ):
17581794 await agent .main_execution ()
17591795 agent_span .add_event ("agent_execution_finished" )
1796+ _alog ("agent_execution_done" )
17601797
17611798 # Send completion finished event
17621799 finished_event = SSEEvent (
@@ -1765,6 +1802,7 @@ async def run_agent_with_streaming():
17651802 data = {"status" : "success" }
17661803 )
17671804 await event_queue .put (finished_event )
1805+ _alog ("queue_finished" )
17681806
17691807 # Emit telemetry: stream completed
17701808 try :
@@ -1783,7 +1821,8 @@ async def run_agent_with_streaming():
17831821 except Exception as e :
17841822 agent_span .set_status (StatusCode .ERROR , str (e ))
17851823 agent_span .record_exception (e )
1786- logging .error (f"Agent streaming execution failed: { e } " )
1824+ _alog (f"agent_execution_error error={ type (e ).__name__ } : { e } " )
1825+ logger .error (f"Agent streaming execution failed: { e } " )
17871826 # Send error event
17881827 error_event = SSEEvent (
17891828 event = "completion.error" ,
@@ -1826,6 +1865,7 @@ async def run_agent_with_streaming():
18261865
18271866 # Start agent execution in background
18281867 asyncio .create_task (run_agent_with_streaming ())
1868+ _log ("task_spawned" )
18291869
18301870 # Stream events
18311871 async def completion_stream_generator ():
@@ -1873,11 +1913,11 @@ async def completion_stream_generator():
18731913
18741914 except HTTPException as he :
18751915 # Log the error and re-raise HTTP exceptions
1876- logging .error (f"HTTP Exception in create_completion_stream: { str (he )} " )
1916+ logger .error (f"HTTP Exception in create_completion_stream: { str (he )} " )
18771917 raise he
18781918 except Exception as e :
18791919 # Log and convert unexpected errors to HTTP exceptions
1880- logging .error (f"Unexpected error in create_completion_stream: { str (e )} " )
1920+ logger .error (f"Unexpected error in create_completion_stream: { str (e )} " )
18811921 raise HTTPException (
18821922 status_code = 500 ,
18831923 detail = f"Unexpected error: { str (e )} "
0 commit comments