1515from getstream .video .rtc .pb .stream .video .sfu .models .models_pb2 import TrackType
1616
1717from ..edge import sfu_events
18- from ..edge .events import AudioReceivedEvent , TrackAddedEvent , TrackRemovedEvent , CallEndedEvent
18+ from ..edge .events import (
19+ AudioReceivedEvent ,
20+ TrackAddedEvent ,
21+ TrackRemovedEvent ,
22+ CallEndedEvent ,
23+ )
1924from ..edge .types import Connection , Participant , PcmData , User , OutputAudioTrack
2025from ..events .manager import EventManager
2126from ..llm import events as llm_events
2732)
2833from ..llm .llm import LLM
2934from ..llm .realtime import Realtime
30- from ..logging_utils import CallContextToken , clear_call_context , set_call_context
3135from ..mcp import MCPBaseServer , MCPManager
3236from ..processors .base_processor import Processor , ProcessorType , filter_processors
3337from ..stt .events import STTTranscriptEvent , STTErrorEvent
3438from ..stt .stt import STT
3539from ..tts .tts import TTS
3640from ..tts .events import TTSAudioEvent
3741from ..turn_detection import TurnDetector , TurnStartedEvent , TurnEndedEvent
42+ from ..utils .logging import (
43+ CallContextToken ,
44+ clear_call_context ,
45+ set_call_context ,
46+ configure_default_logging ,
47+ )
3848from ..utils .video_forwarder import VideoForwarder
3949from ..utils .video_utils import ensure_even_dimensions
4050from ..vad import VAD
@@ -65,6 +75,16 @@ def _log_task_exception(task: asyncio.Task):
6575 logger .exception ("Error in background task" )
6676
6777
78+ class _AgentLoggerAdapter (logging .LoggerAdapter ):
79+ """
80+ A logger adapter to include the agent_id to the logs
81+ """
82+
83+ def process (self , msg : str , kwargs ):
84+ if self .extra :
85+ return "[Agent: %s] | %s" % (self .extra ["agent_id" ], msg ), kwargs
86+ return super (_AgentLoggerAdapter , self ).process (msg , kwargs )
87+
6888# TODO: move me
6989@dataclass
7090class AgentOptions :
@@ -132,22 +152,29 @@ def __init__(
132152 # MCP servers for external tool and resource access
133153 mcp_servers : Optional [List [MCPBaseServer ]] = None ,
134154 options : Optional [AgentOptions ] = None ,
155+ tracer : Tracer = trace .get_tracer ("agents" ),
156+ # Configure the default logging for the sdk here. Pass None to leave the config intact.
157+ log_level : Optional [int ] = logging .INFO ,
135158 ):
159+ if log_level is not None :
160+ configure_default_logging (level = log_level )
136161 if options is None :
137162 options = default_agent_options ()
138163 else :
139164 options = default_agent_options ().update (options )
140165 self .options = options
166+
141167 self .instructions = instructions
142168 self .edge = edge
143169 self .agent_user = agent_user
144170 self ._agent_user_initialized = False
145171
146172 # only needed in case we spin threads
173+ self .tracer = tracer
147174 self ._root_span : Optional [Span ] = None
148175 self ._root_ctx : Optional [Context ] = None
149176
150- self .logger = logging . getLogger ( f"Agent[ { self .agent_user .id } ]" )
177+ self .logger = _AgentLoggerAdapter ( logger , { "agent_id" : self .agent_user .id })
151178
152179 self .events = EventManager ()
153180 self .events .register_events_from_module (getstream .models , "call." )
@@ -183,7 +210,7 @@ def __init__(
183210 # Merge plugin events BEFORE subscribing to any events
184211 for plugin in [stt , tts , turn_detection , vad , llm , edge ]:
185212 if plugin and hasattr (plugin , "events" ):
186- self .logger .info (f"Registered plugin { plugin } " )
213+ self .logger .debug (f"Register events from plugin { plugin } " )
187214 self .events .merge (plugin .events )
188215
189216 self .llm ._attach_agent (self )
@@ -242,8 +269,8 @@ async def simple_response(
242269 """
243270 Overwrite simple_response if you want to change how the Agent class calls the LLM
244271 """
245- logger .info ("asking LLM to reply to %s" , text )
246- with self .span ("simple_response" ) as span :
272+ self . logger .info ('🤖 Asking LLM to reply to " %s"' , text )
273+ with self .tracer . start_as_current_span ("simple_response" ) as span :
247274 response = await self .llm .simple_response (
248275 text = text , processors = self .processors , participant = participant
249276 )
@@ -504,6 +531,13 @@ async def finish(self):
504531 Subscribes to the edge transport's `call_ended` event and awaits it. If
505532 no connection is active, returns immediately.
506533 """
534+ # If connection is None or already closed, return immediately
535+ if not self ._connection :
536+ self .logger .info (
537+ "🔚 Agent connection is already closed, finishing immediately"
538+ )
539+ return
540+
507541
508542 with self .span ("agent.finish" ):
509543 # If connection is None or already closed, return immediately
@@ -631,7 +665,7 @@ async def create_user(self) -> None:
631665 return None
632666
633667 def _on_vad_audio (self , event : VADAudioEvent ):
634- self .logger .info (f"Vad audio event { self ._truncate_for_logging (event )} " )
668+ self .logger .debug (f"Vad audio event { self ._truncate_for_logging (event )} " )
635669
636670 def _on_rtc_reconnect (self ):
637671 # update the code to listen?
@@ -677,7 +711,7 @@ async def _on_agent_say(self, event: events.AgentSayEvent):
677711 )
678712 )
679713
680- self .logger .info (f"Agent said: { event .text } " )
714+ self .logger .info (f"🤖 Agent said: { event .text } " )
681715 else :
682716 self .logger .warning ("No TTS available, cannot synthesize speech" )
683717
@@ -764,14 +798,18 @@ async def on_track(event: TrackAddedEvent):
764798 # If track is already being processed, just switch to it
765799 if track_id in self ._active_video_tracks :
766800 track_type_name = TrackType .Name (track_type )
767- self .logger .info (f"🎥 Track re-added: { track_type_name } ({ track_id } ), switching to it" )
768-
801+ self .logger .info (
802+ f"🎥 Track re-added: { track_type_name } ({ track_id } ), switching to it"
803+ )
804+
769805 if self .realtime_mode and isinstance (self .llm , Realtime ):
770806 # Get the existing forwarder and switch to this track
771807 _ , _ , forwarder = self ._active_video_tracks [track_id ]
772808 track = self .edge .add_track_subscriber (track_id )
773809 if track and forwarder :
774- await self .llm ._watch_video_track (track , shared_forwarder = forwarder )
810+ await self .llm ._watch_video_track (
811+ track , shared_forwarder = forwarder
812+ )
775813 self ._current_video_track_id = track_id
776814 return
777815
@@ -796,10 +834,16 @@ async def on_track_removed(event: TrackRemovedEvent):
796834
797835 # Clean up track metadata
798836 self ._active_video_tracks .pop (track_id , None )
799-
837+
800838 # If this was the active track, switch to any other available track
801- if track_id == self ._current_video_track_id and self .realtime_mode and isinstance (self .llm , Realtime ):
802- self .logger .info ("🎥 Active video track removed, switching to next available" )
839+ if (
840+ track_id == self ._current_video_track_id
841+ and self .realtime_mode
842+ and isinstance (self .llm , Realtime )
843+ ):
844+ self .logger .info (
845+ "🎥 Active video track removed, switching to next available"
846+ )
803847 self ._current_video_track_id = None
804848 await self ._switch_to_next_available_track ()
805849
@@ -839,16 +883,23 @@ async def _switch_to_next_available_track(self) -> None:
839883 self .logger .info ("🎥 No video tracks available" )
840884 self ._current_video_track_id = None
841885 return
842-
886+
843887 # Just pick the first available video track
844- for track_id , (track_type , participant , forwarder ) in self ._active_video_tracks .items ():
888+ for track_id , (
889+ track_type ,
890+ participant ,
891+ forwarder ,
892+ ) in self ._active_video_tracks .items ():
845893 # Only consider video tracks (camera or screenshare)
846- if track_type not in (TrackType .TRACK_TYPE_VIDEO , TrackType .TRACK_TYPE_SCREEN_SHARE ):
894+ if track_type not in (
895+ TrackType .TRACK_TYPE_VIDEO ,
896+ TrackType .TRACK_TYPE_SCREEN_SHARE ,
897+ ):
847898 continue
848-
899+
849900 track_type_name = TrackType .Name (track_type )
850901 self .logger .info (f"🎥 Switching to track: { track_type_name } ({ track_id } )" )
851-
902+
852903 # Get the track and forwarder
853904 track = self .edge .add_track_subscriber (track_id )
854905 if track and forwarder and isinstance (self .llm , Realtime ):
@@ -858,16 +909,19 @@ async def _switch_to_next_available_track(self) -> None:
858909 return
859910 else :
860911 self .logger .error (f"Failed to switch to track { track_id } " )
861-
912+
862913 self .logger .warning ("🎥 No suitable video tracks found" )
863914
864915 async def _process_track (self , track_id : str , track_type : int , participant ):
865916 raw_forwarder = None
866917 processed_forwarder = None
867-
918+
868919 try :
869920 # we only process video tracks (camera video or screenshare)
870- if track_type not in (TrackType .TRACK_TYPE_VIDEO , TrackType .TRACK_TYPE_SCREEN_SHARE ):
921+ if track_type not in (
922+ TrackType .TRACK_TYPE_VIDEO ,
923+ TrackType .TRACK_TYPE_SCREEN_SHARE ,
924+ ):
871925 return
872926
873927 # subscribe to the video track
@@ -877,14 +931,16 @@ async def _process_track(self, track_id: str, track_type: int, participant):
877931 return
878932
879933 # Wrap screenshare tracks to ensure even dimensions for H.264 encoding
880- if track_type == TrackType .TRACK_TYPE_SCREEN_SHARE :
934+ if track_type == TrackType .TRACK_TYPE_SCREEN_SHARE :
935+
881936 class _EvenDimensionsTrack (VideoStreamTrack ):
882- def __init__ (self , src ):
937+ def __init__ (self , src ):
883938 super ().__init__ ()
884939 self .src = src
885- async def recv (self ):
940+
941+ async def recv (self ):
886942 return ensure_even_dimensions (await self .src .recv ())
887-
943+
888944 track = _EvenDimensionsTrack (track ) # type: ignore[arg-type]
889945
890946 # Create a SHARED VideoForwarder for the RAW incoming track
@@ -896,17 +952,21 @@ async def recv(self):
896952 name = f"raw_video_forwarder_{ track_id } " ,
897953 )
898954 await raw_forwarder .start ()
899- self .logger .info ("🎥 Created raw VideoForwarder for track %s" , track_id )
955+ self .logger .debug ("🎥 Created raw VideoForwarder for track %s" , track_id )
900956
901957 # Track forwarders for cleanup
902958 self ._video_forwarders .append (raw_forwarder )
903959
904960 # Store track metadata
905- self ._active_video_tracks [track_id ] = (track_type , participant , raw_forwarder )
961+ self ._active_video_tracks [track_id ] = (
962+ track_type ,
963+ participant ,
964+ raw_forwarder ,
965+ )
906966
907967 # If Realtime provider supports video, switch to this new track
908968 track_type_name = TrackType .Name (track_type )
909-
969+
910970 if self .realtime_mode :
911971 if self ._video_track :
912972 # We have a video publisher (e.g., YOLO processor)
@@ -931,7 +991,9 @@ async def recv(self):
931991 self ._current_video_track_id = track_id
932992 else :
933993 # No video publisher, send raw frames - switch to this new track
934- self .logger .info (f"🎥 Switching to { track_type_name } track: { track_id } " )
994+ self .logger .info (
995+ f"🎥 Switching to { track_type_name } track: { track_id } "
996+ )
935997 if isinstance (self .llm , Realtime ):
936998 await self .llm ._watch_video_track (
937999 track , shared_forwarder = raw_forwarder
@@ -978,7 +1040,9 @@ async def recv(self):
9781040
9791041 for processor in self .image_processors :
9801042 try :
981- await processor .process_image (img , participant .user_id )
1043+ await processor .process_image (
1044+ img , participant .user_id
1045+ )
9821046 except Exception as e :
9831047 self .logger .error (
9841048 f"Error in image processor { type (processor ).__name__ } : { e } "
@@ -999,26 +1063,28 @@ async def recv(self):
9991063 except asyncio .CancelledError :
10001064 # Task was cancelled (e.g., track removed)
10011065 # Clean up forwarders that were created for this track
1002- self .logger .debug (f"🎥 Cleaning up forwarders for cancelled track { track_id } " )
1003-
1066+ self .logger .debug (
1067+ f"🎥 Cleaning up forwarders for cancelled track { track_id } "
1068+ )
1069+
10041070 # Stop and remove the raw forwarder if it was created
1005- if raw_forwarder is not None and hasattr (self , ' _video_forwarders' ):
1071+ if raw_forwarder is not None and hasattr (self , " _video_forwarders" ):
10061072 if raw_forwarder in self ._video_forwarders :
10071073 try :
10081074 await raw_forwarder .stop ()
10091075 self ._video_forwarders .remove (raw_forwarder )
10101076 except Exception as e :
10111077 self .logger .error (f"Error stopping raw forwarder: { e } " )
1012-
1078+
10131079 # Stop and remove processed forwarder if it was created
1014- if processed_forwarder is not None and hasattr (self , ' _video_forwarders' ):
1080+ if processed_forwarder is not None and hasattr (self , " _video_forwarders" ):
10151081 if processed_forwarder in self ._video_forwarders :
10161082 try :
10171083 await processed_forwarder .stop ()
10181084 self ._video_forwarders .remove (processed_forwarder )
10191085 except Exception as e :
10201086 self .logger .error (f"Error stopping processed forwarder: { e } " )
1021-
1087+
10221088 return
10231089
10241090 async def _on_turn_event (self , event : TurnStartedEvent | TurnEndedEvent ) -> None :
0 commit comments