Skip to content

Commit a65e87b

Browse files
committed
one sided audio
1 parent 64f7439 commit a65e87b

File tree

14 files changed

+1402
-1636
lines changed

14 files changed

+1402
-1636
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[project]
2+
name = "livekit-recording"
3+
version = "0.1.0"
4+
description = "Recording and transcription utilities for LiveKit agents"
5+
requires-python = ">=3.13"
6+
7+
dependencies = [
8+
"livekit-api>=1.0.7,<2",
9+
"boto3~=1.35",
10+
"loguru~=0.7",
11+
]
12+
13+
[build-system]
14+
requires = ["hatchling"]
15+
build-backend = "hatchling.build"
16+
17+
[tool.hatch.build.targets.wheel]
18+
packages = ["src/livekit_recording"]
19+
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""LiveKit Recording - Recording and transcription utilities for LiveKit agents."""
2+
3+
from livekit_recording.audio_storage import (
4+
AudioFileInfo,
5+
AudioRecorderProtocol,
6+
LocalAudioRecorder,
7+
)
8+
from livekit_recording.egress import (
9+
EgressConfig,
10+
EgressFileInfo,
11+
EgressManager,
12+
S3AudioRecorder,
13+
create_default_egress_manager,
14+
create_default_s3_recorder,
15+
)
16+
from livekit_recording.settings import (
17+
AWSSettings,
18+
LiveKitSettings,
19+
S3Settings,
20+
Settings,
21+
StorageMode,
22+
StorageSettings,
23+
)
24+
from livekit_recording.transcript import (
25+
S3Uploader,
26+
S3UploaderProtocol,
27+
TranscriptData,
28+
TranscriptEntry,
29+
TranscriptHandler,
30+
)
31+
32+
__all__ = [
33+
"AWSSettings",
34+
"AudioFileInfo",
35+
"AudioRecorderProtocol",
36+
"EgressConfig",
37+
"EgressFileInfo",
38+
"EgressManager",
39+
"LiveKitSettings",
40+
"LocalAudioRecorder",
41+
"S3AudioRecorder",
42+
"S3Settings",
43+
"S3Uploader",
44+
"S3UploaderProtocol",
45+
"Settings",
46+
"StorageMode",
47+
"StorageSettings",
48+
"TranscriptData",
49+
"TranscriptEntry",
50+
"TranscriptHandler",
51+
"create_default_egress_manager",
52+
"create_default_s3_recorder",
53+
]
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
"""Audio storage protocol and implementations for recording audio to various backends."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import contextlib
7+
import wave
8+
from dataclasses import dataclass
9+
from datetime import UTC, datetime
10+
from pathlib import Path
11+
from typing import TYPE_CHECKING, Protocol
12+
13+
from livekit import rtc
14+
from loguru import logger
15+
16+
if TYPE_CHECKING:
17+
pass
18+
19+
20+
@dataclass
21+
class AudioFileInfo:
22+
"""Information about a recorded audio file."""
23+
24+
filename: str
25+
location: str
26+
duration: int # Duration in nanoseconds
27+
size: int # Size in bytes
28+
29+
30+
class AudioRecorderProtocol(Protocol):
31+
"""Protocol for audio recording implementations.
32+
33+
Implementations can record audio from LiveKit rooms and store them
34+
to various backends (local files, S3, etc.).
35+
"""
36+
37+
async def start_recording(
38+
self, room_name: str, session_id: str | None = None
39+
) -> str | None:
40+
"""Start recording audio for a room.
41+
42+
Args:
43+
room_name: Name of the LiveKit room to record
44+
session_id: Unique session identifier for matching audio/transcript files
45+
46+
Returns:
47+
Recording ID if started successfully, None on failure
48+
"""
49+
...
50+
51+
async def stop_recording(self) -> AudioFileInfo | None:
52+
"""Stop the active recording and finalize the output.
53+
54+
Returns:
55+
AudioFileInfo with file details if successful, None on error
56+
"""
57+
...
58+
59+
async def close(self) -> None:
60+
"""Clean up resources."""
61+
...
62+
63+
64+
class LocalAudioRecorder:
65+
"""Records audio from LiveKit room to local WAV files.
66+
67+
This implementation subscribes to audio tracks in the room and
68+
saves them to the local filesystem.
69+
"""
70+
71+
# Audio configuration
72+
SAMPLE_RATE = 48000
73+
NUM_CHANNELS = 1
74+
SAMPLE_WIDTH = 2 # 16-bit audio
75+
76+
def __init__(
77+
self,
78+
output_dir: str | Path = "temp",
79+
room: rtc.Room | None = None,
80+
):
81+
"""Initialize the local audio recorder.
82+
83+
Args:
84+
output_dir: Directory to save audio files (default: temp/)
85+
room: LiveKit room to record from (can be set later via set_room)
86+
"""
87+
self.output_dir = Path(output_dir)
88+
self.output_dir.mkdir(parents=True, exist_ok=True)
89+
self._room = room
90+
self._recording_id: str | None = None
91+
self._session_id: str | None = None
92+
self._room_name: str | None = None
93+
self._audio_frames: list[bytes] = []
94+
self._recording_task: asyncio.Task | None = None
95+
self._stop_event: asyncio.Event | None = None
96+
self._start_time: datetime | None = None
97+
self._output_path: Path | None = None
98+
self._audio_streams: list[rtc.AudioStream] = []
99+
100+
logger.debug(f"LocalAudioRecorder initialized with output_dir={output_dir}")
101+
102+
def set_room(self, room: rtc.Room) -> None:
103+
"""Set the LiveKit room to record from.
104+
105+
Args:
106+
room: LiveKit room instance
107+
"""
108+
self._room = room
109+
110+
@property
111+
def recording_id(self) -> str | None:
112+
"""Get the current recording ID if recording is active."""
113+
return self._recording_id
114+
115+
async def start_recording(
116+
self, room_name: str, session_id: str | None = None
117+
) -> str | None:
118+
"""Start recording audio from all tracks in the room.
119+
120+
Args:
121+
room_name: Name of the LiveKit room
122+
session_id: Unique session identifier for the filename
123+
124+
Returns:
125+
Recording ID if started successfully, None on failure
126+
"""
127+
if self._recording_id:
128+
logger.warning(
129+
f"Recording already active with ID {self._recording_id}, skipping start"
130+
)
131+
return self._recording_id
132+
133+
if self._room is None:
134+
logger.error("No room set for LocalAudioRecorder")
135+
return None
136+
137+
try:
138+
self._room_name = room_name
139+
self._session_id = session_id or datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
140+
self._recording_id = f"LOCAL_{self._session_id}"
141+
self._audio_frames = []
142+
self._start_time = datetime.now(UTC)
143+
self._stop_event = asyncio.Event()
144+
145+
# Build output path
146+
filename = f"{room_name}-{self._session_id}.wav"
147+
self._output_path = self.output_dir / "audio" / filename
148+
self._output_path.parent.mkdir(parents=True, exist_ok=True)
149+
150+
logger.info(
151+
f"Starting local audio recording for room {room_name}, "
152+
f"recording_id={self._recording_id}, "
153+
f"output_path={self._output_path}"
154+
)
155+
156+
# Start background task to capture audio
157+
self._recording_task = asyncio.create_task(self._capture_audio())
158+
159+
return self._recording_id
160+
161+
except Exception as e:
162+
logger.error(f"Failed to start local audio recording: {e}")
163+
self._recording_id = None
164+
return None
165+
166+
async def _capture_audio(self) -> None:
167+
"""Background task to capture audio from all tracks."""
168+
if self._room is None or self._stop_event is None:
169+
return
170+
171+
try:
172+
# Subscribe to existing audio tracks
173+
for participant in self._room.remote_participants.values():
174+
for publication in participant.track_publications.values():
175+
if (
176+
publication.track
177+
and publication.kind == rtc.TrackKind.KIND_AUDIO
178+
):
179+
await self._subscribe_to_track(publication.track)
180+
181+
# Set up listener for new tracks
182+
@self._room.on("track_subscribed")
183+
def on_track_subscribed(
184+
track: rtc.Track,
185+
publication: rtc.RemoteTrackPublication,
186+
participant: rtc.RemoteParticipant,
187+
):
188+
if track.kind == rtc.TrackKind.KIND_AUDIO:
189+
_task = asyncio.create_task(self._subscribe_to_track(track)) # noqa: RUF006
190+
191+
# Wait for stop signal
192+
await self._stop_event.wait()
193+
194+
except asyncio.CancelledError:
195+
logger.debug("Audio capture task cancelled")
196+
except Exception as e:
197+
logger.error(f"Error in audio capture: {e}")
198+
199+
async def _subscribe_to_track(self, track: rtc.Track) -> None:
200+
"""Subscribe to an audio track and capture frames."""
201+
if self._stop_event is None:
202+
return
203+
204+
try:
205+
audio_stream = rtc.AudioStream(
206+
track,
207+
sample_rate=self.SAMPLE_RATE,
208+
num_channels=self.NUM_CHANNELS,
209+
)
210+
self._audio_streams.append(audio_stream)
211+
212+
logger.debug(f"Subscribed to audio track: {track.sid}")
213+
214+
async for frame_event in audio_stream:
215+
if self._stop_event.is_set():
216+
break
217+
# Capture raw audio data
218+
frame = frame_event.frame
219+
self._audio_frames.append(bytes(frame.data))
220+
221+
except asyncio.CancelledError:
222+
pass
223+
except Exception as e:
224+
logger.error(f"Error capturing audio from track: {e}")
225+
226+
async def stop_recording(self) -> AudioFileInfo | None:
227+
"""Stop recording and save to WAV file.
228+
229+
Returns:
230+
AudioFileInfo with file details if successful, None on error
231+
"""
232+
if not self._recording_id:
233+
logger.debug("No active recording to stop")
234+
return None
235+
236+
recording_id = self._recording_id
237+
try:
238+
logger.info(f"Stopping local audio recording, recording_id={recording_id}")
239+
240+
# Signal stop to capture task
241+
if self._stop_event:
242+
self._stop_event.set()
243+
244+
# Wait for capture task to finish
245+
if self._recording_task:
246+
self._recording_task.cancel()
247+
with contextlib.suppress(asyncio.CancelledError):
248+
await self._recording_task
249+
250+
# Close audio streams
251+
for stream in self._audio_streams:
252+
await stream.aclose()
253+
self._audio_streams = []
254+
255+
# Write WAV file
256+
if self._output_path and self._audio_frames:
257+
file_info = await self._write_wav_file()
258+
logger.info(
259+
f"Local audio recording saved: "
260+
f"location={file_info.location}, "
261+
f"duration={file_info.duration}ns, "
262+
f"size={file_info.size} bytes"
263+
)
264+
return file_info
265+
else:
266+
logger.warning(
267+
f"No audio frames captured for recording_id={recording_id}"
268+
)
269+
return None
270+
271+
except Exception as e:
272+
logger.error(f"Failed to stop local audio recording: {e}")
273+
return None
274+
finally:
275+
self._recording_id = None
276+
self._recording_task = None
277+
self._stop_event = None
278+
self._audio_frames = []
279+
280+
async def _write_wav_file(self) -> AudioFileInfo:
281+
"""Write captured audio frames to a WAV file."""
282+
if not self._output_path:
283+
raise ValueError("No output path set")
284+
285+
# Calculate duration
286+
total_samples = sum(
287+
len(frame) // self.SAMPLE_WIDTH for frame in self._audio_frames
288+
)
289+
duration_seconds = total_samples / self.SAMPLE_RATE
290+
duration_ns = int(duration_seconds * 1_000_000_000)
291+
292+
# Write WAV file
293+
with wave.open(str(self._output_path), "wb") as wav_file:
294+
wav_file.setnchannels(self.NUM_CHANNELS)
295+
wav_file.setsampwidth(self.SAMPLE_WIDTH)
296+
wav_file.setframerate(self.SAMPLE_RATE)
297+
298+
for frame_data in self._audio_frames:
299+
wav_file.writeframes(frame_data)
300+
301+
# Get file size
302+
file_size = self._output_path.stat().st_size
303+
304+
return AudioFileInfo(
305+
filename=self._output_path.name,
306+
location=str(self._output_path.absolute()),
307+
duration=duration_ns,
308+
size=file_size,
309+
)
310+
311+
async def close(self) -> None:
312+
"""Clean up resources."""
313+
if self._recording_id:
314+
await self.stop_recording()
315+
316+
for stream in self._audio_streams:
317+
await stream.aclose()
318+
self._audio_streams = []

0 commit comments

Comments
 (0)