Skip to content

Commit 17db545

Browse files
committed
update file handler
1 parent dbbbd91 commit 17db545

File tree

7 files changed

+487
-27
lines changed

7 files changed

+487
-27
lines changed

packages/livekit-recording/src/livekit_recording/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
StorageSettings,
2323
)
2424
from livekit_recording.transcript import (
25+
LocalTranscriptStorage,
2526
S3Uploader,
2627
S3UploaderProtocol,
2728
TranscriptData,
2829
TranscriptEntry,
2930
TranscriptHandler,
31+
TranscriptStorageProtocol,
3032
)
3133

3234
__all__ = [
@@ -38,6 +40,7 @@
3840
"EgressManager",
3941
"LiveKitSettings",
4042
"LocalAudioRecorder",
43+
"LocalTranscriptStorage",
4144
"S3AudioRecorder",
4245
"S3Settings",
4346
"S3Uploader",
@@ -48,6 +51,7 @@
4851
"TranscriptData",
4952
"TranscriptEntry",
5053
"TranscriptHandler",
54+
"TranscriptStorageProtocol",
5155
"create_default_egress_manager",
5256
"create_default_s3_recorder",
5357
]

packages/livekit-recording/src/livekit_recording/audio_storage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def on_local_track_published(
193193
# Recording is active and not stopping, subscribe immediately
194194
_task = asyncio.create_task(
195195
self._subscribe_to_track(track, "agent")
196-
) # noqa: RUF006
196+
)
197197
elif not self._stop_event:
198198
# Recording not started yet, queue the track
199199
self._pending_agent_tracks.append(track)

packages/livekit-recording/src/livekit_recording/settings.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from livekit import rtc
1515

1616
from livekit_recording.audio_storage import AudioRecorderProtocol
17+
from livekit_recording.transcript import TranscriptStorageProtocol
1718

1819

1920
class StorageMode(str, Enum):
@@ -276,3 +277,48 @@ def create_audio_recorder(
276277

277278
else:
278279
raise ValueError(f"Unknown storage mode: {self.storage.mode}")
280+
281+
def create_transcript_storage(
282+
self,
283+
bucket: str | None = None,
284+
prefix: str | None = None,
285+
) -> TranscriptStorageProtocol:
286+
"""Create a transcript storage backend based on the current storage mode.
287+
288+
Args:
289+
bucket: S3 bucket name (required for S3 mode, uses settings.s3.bucket if not provided)
290+
prefix: S3 prefix (uses settings.s3.prefix if not provided)
291+
292+
Returns:
293+
A TranscriptStorageProtocol implementation (LocalTranscriptStorage or S3Uploader)
294+
295+
Raises:
296+
ValueError: If required configuration is missing for S3 mode
297+
"""
298+
# Import here to avoid circular imports
299+
from livekit_recording.transcript import LocalTranscriptStorage, S3Uploader
300+
301+
if self.storage.mode == StorageMode.LOCAL:
302+
logger.info(
303+
f"Creating LocalTranscriptStorage with output_dir={self.storage.local_output_dir}"
304+
)
305+
return LocalTranscriptStorage(output_dir=self.storage.local_output_dir)
306+
307+
elif self.storage.mode == StorageMode.S3:
308+
# Use provided bucket/prefix or fall back to settings
309+
s3_bucket = bucket or self.s3.bucket
310+
s3_prefix = prefix if prefix is not None else self.s3.prefix
311+
312+
if not s3_bucket:
313+
raise ValueError(
314+
"S3 bucket is required for S3 storage mode. "
315+
"Set STORAGE_MODE=local or provide bucket parameter."
316+
)
317+
318+
logger.info(
319+
f"Creating S3Uploader with bucket={s3_bucket}, prefix={s3_prefix}"
320+
)
321+
return S3Uploader.from_settings(self, bucket=s3_bucket, prefix=s3_prefix)
322+
323+
else:
324+
raise ValueError(f"Unknown storage mode: {self.storage.mode}")

packages/livekit-recording/src/livekit_recording/transcript.py

Lines changed: 103 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
"""Transcript handler for capturing and storing STT output to S3."""
1+
"""Transcript handler for capturing and storing STT output to S3 or local files."""
22

33
from __future__ import annotations
44

55
import json
66
from dataclasses import dataclass, field
77
from datetime import UTC, datetime
8+
from pathlib import Path
89
from typing import TYPE_CHECKING, Protocol
910

1011
import boto3
@@ -52,10 +53,27 @@ def to_dict(self) -> dict:
5253
}
5354

5455

55-
class S3UploaderProtocol(Protocol):
56-
"""Protocol for S3 upload operations."""
56+
class TranscriptStorageProtocol(Protocol):
57+
"""Protocol for transcript storage operations.
5758
58-
def upload_transcript(self, transcript: TranscriptData, key: str) -> bool: ...
59+
Implementations can save transcripts to various backends (local files, S3, etc.).
60+
"""
61+
62+
def save_transcript(self, transcript: TranscriptData, key: str) -> bool:
63+
"""Save a transcript to storage.
64+
65+
Args:
66+
transcript: The transcript data to save
67+
key: The storage key/path for the transcript
68+
69+
Returns:
70+
True if save succeeded, False otherwise
71+
"""
72+
...
73+
74+
75+
# Keep S3UploaderProtocol for backward compatibility
76+
S3UploaderProtocol = TranscriptStorageProtocol
5977

6078

6179
class S3Uploader:
@@ -136,28 +154,92 @@ def upload_transcript(self, transcript: TranscriptData, key: str) -> bool:
136154
logger.error(f"Failed to upload transcript to S3: {e}")
137155
return False
138156

157+
def save_transcript(self, transcript: TranscriptData, key: str) -> bool:
158+
"""Save transcript JSON to S3 (alias for upload_transcript).
159+
160+
Args:
161+
transcript: The transcript data to save
162+
key: The S3 object key (path within bucket)
163+
164+
Returns:
165+
True if save succeeded, False otherwise
166+
"""
167+
return self.upload_transcript(transcript, key)
168+
169+
170+
class LocalTranscriptStorage:
171+
"""Handles saving transcripts to local filesystem."""
172+
173+
def __init__(self, output_dir: str | Path = "temp"):
174+
"""Initialize the local transcript storage.
175+
176+
Args:
177+
output_dir: Directory to save transcript files (default: temp/)
178+
"""
179+
self.output_dir = Path(output_dir)
180+
logger.debug(f"LocalTranscriptStorage initialized with output_dir={output_dir}")
181+
182+
def save_transcript(self, transcript: TranscriptData, key: str) -> bool:
183+
"""Save transcript JSON to local filesystem.
184+
185+
Args:
186+
transcript: The transcript data to save
187+
key: The file path relative to output_dir (e.g., transcripts/room-session.json)
188+
189+
Returns:
190+
True if save succeeded, False otherwise
191+
"""
192+
# Build full path
193+
output_path = self.output_dir / key
194+
195+
try:
196+
# Create parent directories if needed
197+
output_path.parent.mkdir(parents=True, exist_ok=True)
198+
199+
# Write JSON file
200+
json_content = json.dumps(transcript.to_dict(), indent=2)
201+
output_path.write_text(json_content, encoding="utf-8")
202+
203+
logger.info(f"Saved transcript to {output_path.absolute()}")
204+
return True
205+
except OSError as e:
206+
logger.error(f"Failed to save transcript to local file: {e}")
207+
return False
208+
209+
# Alias for backward compatibility
210+
def upload_transcript(self, transcript: TranscriptData, key: str) -> bool:
211+
"""Alias for save_transcript for backward compatibility."""
212+
return self.save_transcript(transcript, key)
213+
139214

140215
class TranscriptHandler:
141216
"""Handles capturing and storing conversation transcripts."""
142217

143218
def __init__(
144219
self,
145220
room_name: str,
146-
s3_uploader: S3UploaderProtocol | None = None,
221+
storage: TranscriptStorageProtocol | None = None,
147222
session_id: str | None = None,
223+
*,
224+
# Backward compatibility alias
225+
s3_uploader: TranscriptStorageProtocol | None = None,
148226
):
149227
"""Initialize the transcript handler.
150228
151229
Args:
152230
room_name: Name of the LiveKit room
153-
s3_uploader: S3 uploader instance for storing transcripts
231+
storage: Storage instance for saving transcripts (local or S3)
154232
session_id: Unique session identifier for matching audio/transcript files
233+
s3_uploader: Deprecated alias for storage parameter (backward compatibility)
155234
"""
156235
self.transcript = TranscriptData(
157236
room_name=room_name,
158237
session_start=datetime.now(UTC).isoformat(),
159238
)
160-
self.s3_uploader = s3_uploader
239+
# Support both 'storage' and deprecated 's3_uploader' parameter
240+
self.storage = storage or s3_uploader
241+
# Keep s3_uploader as alias for backward compatibility
242+
self.s3_uploader = self.storage
161243
# Use provided session_id or generate one
162244
self.session_id = session_id or datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
163245

@@ -199,22 +281,30 @@ def add_agent_transcript(self, text: str, is_final: bool = True) -> None:
199281
self.transcript.entries.append(entry)
200282
logger.debug(f"Agent transcript: {text}")
201283

202-
async def finalize_and_upload(self) -> bool:
203-
"""Finalize the transcript and upload to S3.
284+
async def finalize_and_save(self) -> bool:
285+
"""Finalize the transcript and save to storage (local or S3).
204286
205287
Returns:
206-
True if upload succeeded or no uploader configured, False on failure
288+
True if save succeeded or no storage configured, False on failure
207289
"""
208290
self.transcript.session_end = datetime.now(UTC).isoformat()
209291

210-
if not self.s3_uploader:
211-
logger.warning("No S3 uploader configured, transcript not saved")
292+
if not self.storage:
293+
logger.warning("No transcript storage configured, transcript not saved")
212294
return True
213295

214296
# Use session_id for filename to match audio recording
215297
key = f"transcripts/{self.transcript.room_name}-{self.session_id}.json"
216298

217-
return self.s3_uploader.upload_transcript(self.transcript, key)
299+
return self.storage.save_transcript(self.transcript, key)
300+
301+
async def finalize_and_upload(self) -> bool:
302+
"""Alias for finalize_and_save (backward compatibility).
303+
304+
Returns:
305+
True if save succeeded or no storage configured, False on failure
306+
"""
307+
return await self.finalize_and_save()
218308

219309
def get_transcript_text(self) -> str:
220310
"""Get the transcript as plain text.

src/agent.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from livekit.plugins.turn_detector.multilingual import MultilingualModel
1919
from livekit_recording import (
2020
AudioRecorderProtocol,
21-
S3Uploader,
2221
Settings,
2322
StorageMode,
2423
TranscriptHandler,
@@ -106,19 +105,21 @@ async def my_agent(ctx: JobContext):
106105
except Exception as e:
107106
logger.error(f"Failed to initialize audio recorder: {e}")
108107

109-
# Initialize transcript handler for saving STT output
108+
# Initialize transcript handler for saving STT output (local or S3 based on storage mode)
110109
transcript_handler = None
111110
try:
112111
logger.info("Initializing transcript handler...")
113-
s3_uploader = S3Uploader.from_settings(
114-
settings, bucket=S3_BUCKET, prefix=S3_PREFIX
112+
transcript_storage = settings.create_transcript_storage(
113+
bucket=S3_BUCKET, prefix=S3_PREFIX
115114
)
116115
transcript_handler = TranscriptHandler(
117116
room_name=room_name,
118-
s3_uploader=s3_uploader,
117+
storage=transcript_storage,
119118
session_id=session_id,
120119
)
121-
logger.info("Transcript handler initialized successfully")
120+
logger.info(
121+
f"Transcript handler initialized successfully (mode={settings.storage.mode.value})"
122+
)
122123
except Exception as e:
123124
logger.error(f"Failed to initialize transcript handler: {e}")
124125

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
{
2+
"room_name": "playground-ggC8-V5BN",
3+
"session_start": "2025-12-12T21:10:22.555334+00:00",
4+
"session_end": "2025-12-12T21:11:46.970180+00:00",
5+
"entries": [
6+
{
7+
"timestamp": "2025-12-12T21:10:25.345108+00:00",
8+
"speaker": "agent",
9+
"text": "Hello, how can I assist you?",
10+
"is_final": true
11+
},
12+
{
13+
"timestamp": "2025-12-12T21:10:29.422298+00:00",
14+
"speaker": "user",
15+
"text": "Yes.",
16+
"is_final": true
17+
},
18+
{
19+
"timestamp": "2025-12-12T21:10:36.456330+00:00",
20+
"speaker": "user",
21+
"text": "Are there any planets that Rotate in the opposite direction than the sun.",
22+
"is_final": true
23+
},
24+
{
25+
"timestamp": "2025-12-12T21:10:56.089449+00:00",
26+
"speaker": "agent",
27+
"text": "Yes, Venus and Uranus rotate in the opposite direction to the Sun. This is called retrograde rotation. Venus spins clockwise on its axis, while the Sun spins counterclockwise when viewed from above their north poles. Uranus is unique because it rotates on its side with an axial tilt of about 98 degrees, which makes its rotation appear retrograde as well. Interesting, right?",
28+
"is_final": true
29+
},
30+
{
31+
"timestamp": "2025-12-12T21:11:09.955625+00:00",
32+
"speaker": "user",
33+
"text": "How long did the roman empire last",
34+
"is_final": true
35+
},
36+
{
37+
"timestamp": "2025-12-12T21:11:30.215535+00:00",
38+
"speaker": "agent",
39+
"text": "The Roman Empire lasted about 500 years if you start from 27 BC when Augustus became the first emperor, until 476 AD when the Western Roman Empire fell. If you include the Eastern Roman Empire, also called the Byzantine Empire, it lasted until 1453 AD, so that\u2019s almost 1500 years in total. Quite a long run!",
40+
"is_final": true
41+
}
42+
]
43+
}

0 commit comments

Comments
 (0)