Skip to content

Commit 64f7439

Browse files
committed
adds wait for completion for audio upload
1 parent bd8dbcb commit 64f7439

File tree

3 files changed

+183
-22
lines changed

3 files changed

+183
-22
lines changed

src/agent.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,19 +160,22 @@ def on_session_close(_event):
160160
logger.info(f"Session closing for room {room_name}, saving transcript...")
161161

162162
async def cleanup():
163-
# Stop egress recording - this triggers S3 upload of the audio file
163+
# Stop egress recording and wait for S3 upload to complete
164164
if egress_manager is not None:
165165
try:
166-
logger.info("Stopping egress recording...")
167-
stopped = await egress_manager.stop_recording()
168-
if stopped:
166+
logger.info("Stopping egress recording and waiting for upload...")
167+
file_info = await egress_manager.stop_recording()
168+
if file_info:
169169
logger.info(
170-
f"Egress recording stopped for room {room_name}, "
171-
f"audio uploaded to s3://{S3_BUCKET}/{S3_PREFIX}/"
170+
f"Audio recording uploaded for room {room_name}: "
171+
f"location={file_info.location}, "
172+
f"filename={file_info.filename}, "
173+
f"size={file_info.size} bytes"
172174
)
173175
else:
174176
logger.warning(
175-
f"Failed to stop egress recording for room {room_name}"
177+
f"No audio file info returned for room {room_name} "
178+
"(recording may have failed or no audio was captured)"
176179
)
177180
except Exception as e:
178181
logger.error(f"Error stopping egress recording: {e}")

src/egress_manager.py

Lines changed: 138 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,24 @@
11
"""Egress manager for recording dual-channel audio to S3."""
22

3+
import asyncio
34
import os
5+
from dataclasses import dataclass
46

57
from livekit import api
68
from livekit.protocol import egress as egress_proto
79
from loguru import logger
810

911

12+
@dataclass
13+
class EgressFileInfo:
14+
"""Information about an uploaded egress file."""
15+
16+
filename: str
17+
location: str
18+
duration: int
19+
size: int
20+
21+
1022
class EgressConfig:
1123
"""Configuration for egress recordings."""
1224

@@ -63,6 +75,7 @@ def __init__(self, config: EgressConfig):
6375
self.config = config
6476
self._api: api.LiveKitAPI | None = None
6577
self._egress_id: str | None = None
78+
self._expected_filepath: str | None = None
6679

6780
@property
6881
def livekit_api(self) -> api.LiveKitAPI:
@@ -120,6 +133,11 @@ async def start_dual_channel_recording(
120133
else:
121134
filepath = f"{filepath_prefix}/{{room_name}}-{{time}}.ogg"
122135

136+
# Store expected filepath for logging
137+
self._expected_filepath = filepath
138+
expected_s3_path = f"s3://{self.config.s3_bucket}/{filepath}"
139+
logger.info(f"Expected audio file path: {expected_s3_path}")
140+
123141
file_output = egress_proto.EncodedFileOutput(
124142
filepath=filepath,
125143
s3=s3_upload,
@@ -138,35 +156,145 @@ async def start_dual_channel_recording(
138156

139157
self._egress_id = info.egress_id
140158
logger.info(
141-
f"Started dual-channel egress recording for room {room_name}, "
142-
f"egress_id={self._egress_id}"
159+
f"Started egress recording for room {room_name}, "
160+
f"egress_id={self._egress_id}, "
161+
f"expected_file={expected_s3_path}"
143162
)
144163
return self._egress_id
145164

146165
except Exception as e:
147166
logger.error(f"Failed to start egress recording: {e}")
148167
return None
149168

150-
async def stop_recording(self) -> bool:
151-
"""Stop the active egress recording.
169+
async def stop_recording(self) -> EgressFileInfo | None:
170+
"""Stop the active egress recording and wait for upload to complete.
152171
153172
Returns:
154-
True if stopped successfully or no active recording, False on error
173+
EgressFileInfo with file details if successful, None on error
155174
"""
156175
if not self._egress_id:
157176
logger.debug("No active egress to stop")
158-
return True
177+
return None
159178

179+
egress_id = self._egress_id
160180
try:
181+
logger.info(f"Stopping egress recording, egress_id={egress_id}...")
161182
await self.livekit_api.egress.stop_egress(
162-
egress_proto.StopEgressRequest(egress_id=self._egress_id)
183+
egress_proto.StopEgressRequest(egress_id=egress_id)
184+
)
185+
logger.info(
186+
f"Stop request sent for egress_id={egress_id}, waiting for upload to complete..."
163187
)
164-
logger.info(f"Stopped egress recording, egress_id={self._egress_id}")
188+
189+
# Wait for the egress to complete and get file info
190+
file_info = await self._wait_for_completion(egress_id)
165191
self._egress_id = None
166-
return True
192+
self._expected_filepath = None
193+
return file_info
194+
167195
except Exception as e:
168196
logger.error(f"Failed to stop egress recording: {e}")
169-
return False
197+
return None
198+
199+
async def _wait_for_completion(
200+
self, egress_id: str, timeout: float = 60.0, poll_interval: float = 1.0
201+
) -> EgressFileInfo | None:
202+
"""Wait for egress to complete and return file info.
203+
204+
Args:
205+
egress_id: The egress ID to wait for
206+
timeout: Maximum time to wait in seconds
207+
poll_interval: Time between status checks in seconds
208+
209+
Returns:
210+
EgressFileInfo with file details if successful, None on error/timeout
211+
"""
212+
start_time = asyncio.get_event_loop().time()
213+
last_status = None
214+
215+
while True:
216+
elapsed = asyncio.get_event_loop().time() - start_time
217+
if elapsed > timeout:
218+
logger.error(
219+
f"Timeout waiting for egress completion after {timeout}s, "
220+
f"egress_id={egress_id}, last_status={last_status}"
221+
)
222+
return None
223+
224+
try:
225+
# List egress by ID to get current status
226+
response = await self.livekit_api.egress.list_egress(
227+
egress_proto.ListEgressRequest(egress_id=egress_id)
228+
)
229+
230+
if not response.items:
231+
logger.warning(f"Egress {egress_id} not found in list response")
232+
await asyncio.sleep(poll_interval)
233+
continue
234+
235+
egress_info = response.items[0]
236+
status = egress_info.status
237+
last_status = egress_proto.EgressStatus.Name(status)
238+
239+
logger.debug(
240+
f"Egress status: {last_status}, egress_id={egress_id}, "
241+
f"elapsed={elapsed:.1f}s"
242+
)
243+
244+
# Check terminal states
245+
if status == egress_proto.EgressStatus.EGRESS_COMPLETE:
246+
# Extract file info from results
247+
if egress_info.file_results:
248+
file_result = egress_info.file_results[0]
249+
file_info = EgressFileInfo(
250+
filename=file_result.filename,
251+
location=file_result.location,
252+
duration=file_result.duration,
253+
size=file_result.size,
254+
)
255+
logger.info(
256+
f"Egress completed successfully! "
257+
f"File uploaded to: {file_info.location}, "
258+
f"filename={file_info.filename}, "
259+
f"duration={file_info.duration}ns, "
260+
f"size={file_info.size} bytes"
261+
)
262+
return file_info
263+
else:
264+
logger.warning(
265+
f"Egress completed but no file_results found, "
266+
f"egress_id={egress_id}"
267+
)
268+
return None
269+
270+
elif status == egress_proto.EgressStatus.EGRESS_FAILED:
271+
error_msg = egress_info.error or "Unknown error"
272+
logger.error(f"Egress failed: {error_msg}, egress_id={egress_id}")
273+
return None
274+
275+
elif status == egress_proto.EgressStatus.EGRESS_ABORTED:
276+
logger.warning(f"Egress was aborted, egress_id={egress_id}")
277+
return None
278+
279+
elif status == egress_proto.EgressStatus.EGRESS_LIMIT_REACHED:
280+
logger.warning(f"Egress limit reached, egress_id={egress_id}")
281+
# Still try to get file info as partial recording may exist
282+
if egress_info.file_results:
283+
file_result = egress_info.file_results[0]
284+
return EgressFileInfo(
285+
filename=file_result.filename,
286+
location=file_result.location,
287+
duration=file_result.duration,
288+
size=file_result.size,
289+
)
290+
return None
291+
292+
# Still in progress, wait and poll again
293+
await asyncio.sleep(poll_interval)
294+
295+
except Exception as e:
296+
logger.error(f"Error polling egress status: {e}")
297+
await asyncio.sleep(poll_interval)
170298

171299
async def close(self) -> None:
172300
"""Clean up resources."""

tests/test_egress_manager.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@
33
from unittest.mock import AsyncMock, MagicMock, patch
44

55
import pytest
6+
from livekit.protocol import egress as egress_proto
67

7-
from egress_manager import EgressConfig, EgressManager, create_default_egress_manager
8+
from egress_manager import (
9+
EgressConfig,
10+
EgressFileInfo,
11+
EgressManager,
12+
create_default_egress_manager,
13+
)
814

915

1016
class TestEgressConfig:
@@ -153,16 +159,35 @@ async def test_start_dual_channel_recording_failure(self):
153159

154160
@pytest.mark.asyncio
155161
async def test_stop_recording_success(self):
156-
"""Test successful recording stop."""
162+
"""Test successful recording stop with file info."""
157163
config = EgressConfig(
158164
s3_bucket="test-bucket",
159165
)
160166
manager = EgressManager(config)
161167
manager._egress_id = "EG_TEST123456"
162168

169+
# Mock file result
170+
mock_file_result = MagicMock()
171+
mock_file_result.filename = "test-room-20251212-120000.ogg"
172+
mock_file_result.location = (
173+
"s3://test-bucket/audio/test-room-20251212-120000.ogg"
174+
)
175+
mock_file_result.duration = 60000000000 # 60 seconds in nanoseconds
176+
mock_file_result.size = 1024000
177+
178+
# Mock egress info with COMPLETE status
179+
mock_egress_info = MagicMock()
180+
mock_egress_info.status = egress_proto.EgressStatus.EGRESS_COMPLETE
181+
mock_egress_info.file_results = [mock_file_result]
182+
183+
# Mock list response
184+
mock_list_response = MagicMock()
185+
mock_list_response.items = [mock_egress_info]
186+
163187
# Mock the LiveKit API
164188
mock_egress_service = AsyncMock()
165189
mock_egress_service.stop_egress = AsyncMock()
190+
mock_egress_service.list_egress = AsyncMock(return_value=mock_list_response)
166191

167192
mock_api = MagicMock()
168193
mock_api.egress = mock_egress_service
@@ -171,7 +196,12 @@ async def test_stop_recording_success(self):
171196

172197
result = await manager.stop_recording()
173198

174-
assert result is True
199+
assert result is not None
200+
assert isinstance(result, EgressFileInfo)
201+
assert result.filename == "test-room-20251212-120000.ogg"
202+
assert result.location == "s3://test-bucket/audio/test-room-20251212-120000.ogg"
203+
assert result.duration == 60000000000
204+
assert result.size == 1024000
175205
assert manager.egress_id is None
176206
mock_egress_service.stop_egress.assert_called_once()
177207

@@ -185,7 +215,7 @@ async def test_stop_recording_no_active_egress(self):
185215

186216
result = await manager.stop_recording()
187217

188-
assert result is True
218+
assert result is None # No file info when no egress was active
189219

190220
@pytest.mark.asyncio
191221
async def test_stop_recording_failure(self):
@@ -209,7 +239,7 @@ async def test_stop_recording_failure(self):
209239

210240
result = await manager.stop_recording()
211241

212-
assert result is False
242+
assert result is None # Returns None on failure
213243

214244
@pytest.mark.asyncio
215245
async def test_close(self):

0 commit comments

Comments
 (0)