1+ import asyncio
2+
13from dotenv import load_dotenv
24from livekit import rtc
35from livekit .agents import (
@@ -71,25 +73,39 @@ async def my_agent(ctx: JobContext):
7173 }
7274
7375 room_name = ctx .room .name
76+ logger .info (f"=== Agent session handler called for room: { room_name } ===" )
7477
7578 # Initialize egress manager for dual-channel audio recording
76- egress_config = EgressConfig (
77- s3_bucket = S3_BUCKET ,
78- s3_prefix = S3_PREFIX ,
79- )
80- egress_manager = EgressManager (egress_config )
79+ egress_manager = None
80+ try :
81+ logger .info ("Initializing egress manager..." )
82+ egress_config = EgressConfig (
83+ s3_bucket = S3_BUCKET ,
84+ s3_prefix = S3_PREFIX ,
85+ )
86+ egress_manager = EgressManager (egress_config )
87+ logger .info ("Egress manager initialized successfully" )
88+ except Exception as e :
89+ logger .error (f"Failed to initialize egress manager: { e } " )
8190
8291 # Initialize transcript handler for saving STT output
83- s3_uploader = S3Uploader (
84- bucket = S3_BUCKET ,
85- prefix = S3_PREFIX ,
86- )
87- transcript_handler = TranscriptHandler (
88- room_name = room_name ,
89- s3_uploader = s3_uploader ,
90- )
92+ transcript_handler = None
93+ try :
94+ logger .info ("Initializing transcript handler..." )
95+ s3_uploader = S3Uploader (
96+ bucket = S3_BUCKET ,
97+ prefix = S3_PREFIX ,
98+ )
99+ transcript_handler = TranscriptHandler (
100+ room_name = room_name ,
101+ s3_uploader = s3_uploader ,
102+ )
103+ logger .info ("Transcript handler initialized successfully" )
104+ except Exception as e :
105+ logger .error (f"Failed to initialize transcript handler: { e } " )
91106
92107 # Set up a voice AI pipeline using OpenAI, Cartesia, AssemblyAI, and the LiveKit turn detector
108+ logger .info ("Creating AgentSession..." )
93109 session = AgentSession (
94110 # Speech-to-text (STT) is your agent's ears, turning the user's speech into text that the LLM can understand
95111 # See all available models at https://docs.livekit.io/agents/models/stt/
@@ -110,11 +126,16 @@ async def my_agent(ctx: JobContext):
110126 # See more at https://docs.livekit.io/agents/build/audio/#preemptive-generation
111127 preemptive_generation = True ,
112128 )
129+ logger .info ("AgentSession created successfully" )
113130
114131 # Subscribe to conversation events to capture transcripts
132+ logger .info ("Registering event handlers..." )
133+
115134 @session .on ("conversation_item_added" )
116135 def on_conversation_item_added (event : ConversationItemAddedEvent ):
117136 """Capture user and agent transcripts from conversation events."""
137+ if transcript_handler is None :
138+ return
118139 item = event .item
119140 text = item .text_content
120141 if not text :
@@ -132,14 +153,24 @@ async def on_session_close(_event):
132153 logger .info (f"Session closing for room { room_name } , saving transcript..." )
133154
134155 # Upload transcript to S3
135- success = await transcript_handler .finalize_and_upload ()
136- if success :
137- logger .info (f"Transcript saved for room { room_name } " )
138- else :
139- logger .error (f"Failed to save transcript for room { room_name } " )
156+ if transcript_handler is not None :
157+ try :
158+ success = await transcript_handler .finalize_and_upload ()
159+ if success :
160+ logger .info (f"Transcript saved for room { room_name } " )
161+ else :
162+ logger .error (f"Failed to save transcript for room { room_name } " )
163+ except Exception as e :
164+ logger .error (f"Error saving transcript: { e } " )
140165
141166 # Clean up egress manager resources
142- await egress_manager .close ()
167+ if egress_manager is not None :
168+ try :
169+ await egress_manager .close ()
170+ except Exception as e :
171+ logger .error (f"Error closing egress manager: { e } " )
172+
173+ logger .info ("Event handlers registered" )
143174
144175 # To use a realtime model instead of a voice pipeline, use the following session setup instead.
145176 # (Note: This is for the OpenAI Realtime API. For other providers, see https://docs.livekit.io/agents/models/realtime/))
@@ -159,17 +190,8 @@ async def on_session_close(_event):
159190 # # Start the avatar and wait for it to join
160191 # await avatar.start(session, room=ctx.room)
161192
162- # Start dual-channel audio recording via egress
163- egress_id = await egress_manager .start_dual_channel_recording (room_name )
164- if egress_id :
165- logger .info (f"Started dual-channel recording for room { room_name } " )
166- else :
167- logger .warning (
168- f"Failed to start egress recording for room { room_name } , "
169- "continuing without recording"
170- )
171-
172193 # Start the session, which initializes the voice pipeline and warms up the models
194+ logger .info ("Starting session..." )
173195 await session .start (
174196 agent = Assistant (),
175197 room = ctx .room ,
@@ -181,9 +203,39 @@ async def on_session_close(_event):
181203 ),
182204 ),
183205 )
206+ logger .info ("Session started successfully" )
184207
185208 # Join the room and connect to the user
209+ logger .info ("Connecting to room..." )
186210 await ctx .connect ()
211+ logger .info ("Connected to room successfully" )
212+
213+ # Greet the user
214+ await session .say ("Hello, how can I assist you?" )
215+
216+ # Start dual-channel audio recording via egress (non-blocking, after room is active)
217+ async def start_egress_background ():
218+ """Start egress recording in background so it doesn't block the agent."""
219+ if egress_manager is None :
220+ logger .warning ("Egress manager not initialized, skipping recording" )
221+ return
222+ try :
223+ logger .info ("Starting egress recording in background..." )
224+ egress_id = await egress_manager .start_dual_channel_recording (room_name )
225+ if egress_id :
226+ logger .info (f"Started dual-channel recording for room { room_name } " )
227+ else :
228+ logger .warning (
229+ f"Failed to start egress recording for room { room_name } , "
230+ "continuing without recording"
231+ )
232+ except Exception as e :
233+ logger .error (f"Error starting egress recording: { e } " )
234+
235+ # Run egress start in background task so it doesn't block
236+ _egress_task = asyncio .create_task (start_egress_background ()) # noqa: RUF006
237+
238+ logger .info (f"=== Agent setup complete for room: { room_name } ===" )
187239
188240
189241if __name__ == "__main__" :
0 commit comments