Skip to content

Commit 7eb3385

Browse files
committed
v1
1 parent 17f59a7 commit 7eb3385

File tree

9 files changed

+1134
-5
lines changed

9 files changed

+1134
-5
lines changed

.cursor/livekit-egress.mdc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
---
2+
description: recording audio and transcripts on livekit using Egress APIs
23
alwaysApply: false
34
---
45
# LiveKit Egress - Python Audio Recording Guide

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ The starter project includes:
1111
- A simple voice AI assistant, ready for extension and customization
1212
- A voice AI pipeline with [models](https://docs.livekit.io/agents/models) from OpenAI, Cartesia, and AssemblyAI served through LiveKit Cloud
1313
- Easily integrate your preferred [LLM](https://docs.livekit.io/agents/models/llm/), [STT](https://docs.livekit.io/agents/models/stt/), and [TTS](https://docs.livekit.io/agents/models/tts/) instead, or swap to a realtime model like the [OpenAI Realtime API](https://docs.livekit.io/agents/models/realtime/openai)
14+
- **Dual-channel audio recording** to S3 via [LiveKit Egress](https://docs.livekit.io/home/egress/overview/) (agent on one channel, user on the other)
15+
- **Real-time transcript capture** saved to S3 as JSON
1416
- Eval suite based on the LiveKit Agents [testing & evaluation framework](https://docs.livekit.io/agents/build/testing/)
1517
- [LiveKit Turn Detector](https://docs.livekit.io/agents/build/turns/turn-detector/) for contextually-aware speaker detection, with multilingual support
1618
- [Background voice cancellation](https://docs.livekit.io/home/cloud/noise-cancellation/)
@@ -19,6 +21,41 @@ The starter project includes:
1921

2022
This starter app is compatible with any [custom web/mobile frontend](https://docs.livekit.io/agents/start/frontend/) or [SIP-based telephony](https://docs.livekit.io/agents/start/telephony/).
2123

24+
## Recording & Transcription
25+
26+
This project includes built-in support for:
27+
28+
- **Dual-channel audio recording** via LiveKit Egress (agent on one channel, user on the other)
29+
- **Real-time transcript capture** from STT output, saved as JSON
30+
31+
### S3 Output Structure
32+
33+
Recordings and transcripts are saved to S3:
34+
35+
```
36+
s3://audivi-audio-recordings/livekit-demos/
37+
├── audio/{room_name}-{time}.ogg # Dual-channel OGG audio
38+
├── audio/{room_name}-{time}.ogg.json # Egress manifest
39+
└── transcripts/{room_name}-{timestamp}.json # Conversation transcript
40+
```
41+
42+
### AWS Configuration
43+
44+
Add these environment variables to your `.env.local`:
45+
46+
```bash
47+
AWS_ACCESS_KEY_ID=your_access_key
48+
AWS_SECRET_ACCESS_KEY=your_secret_key
49+
AWS_REGION=us-east-1
50+
```
51+
52+
To change the S3 bucket or prefix, modify the constants in `src/agent.py`:
53+
54+
```python
55+
S3_BUCKET = "audivi-audio-recordings"
56+
S3_PREFIX = "livekit-demos"
57+
```
58+
2259
## Coding agents and MCP
2360

2461
This project is designed to work with coding agents like [Cursor](https://www.cursor.com/) and [Claude Code](https://www.anthropic.com/claude-code).
@@ -61,6 +98,9 @@ Sign up for [LiveKit Cloud](https://cloud.livekit.io/) then set up the environme
6198
- `LIVEKIT_URL`
6299
- `LIVEKIT_API_KEY`
63100
- `LIVEKIT_API_SECRET`
101+
- `AWS_ACCESS_KEY_ID` (for recording/transcripts)
102+
- `AWS_SECRET_ACCESS_KEY` (for recording/transcripts)
103+
- `AWS_REGION` (for recording/transcripts, defaults to `us-east-1`)
64104

65105
You can load the LiveKit environment automatically using the [LiveKit CLI](https://docs.livekit.io/home/cli/cli-setup):
66106

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ requires-python = ">=3.9"
1111
dependencies = [
1212
"livekit-agents[silero,turn-detector]~=1.3",
1313
"livekit-plugins-noise-cancellation~=0.2",
14+
"boto3~=1.35",
15+
"loguru~=0.7",
1416
"python-dotenv",
1517
]
1618

src/agent.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
import logging
2-
31
from dotenv import load_dotenv
42
from livekit import rtc
53
from livekit.agents import (
64
Agent,
75
AgentServer,
86
AgentSession,
7+
ConversationItemAddedEvent,
98
JobContext,
109
JobProcess,
1110
cli,
@@ -14,11 +13,17 @@
1413
)
1514
from livekit.plugins import noise_cancellation, silero
1615
from livekit.plugins.turn_detector.multilingual import MultilingualModel
16+
from loguru import logger
1717

18-
logger = logging.getLogger("agent")
18+
from egress_manager import EgressConfig, EgressManager
19+
from transcript_handler import S3Uploader, TranscriptHandler
1920

2021
load_dotenv(".env.local")
2122

23+
# S3 bucket configuration for recordings and transcripts
24+
S3_BUCKET = "audivi-audio-recordings"
25+
S3_PREFIX = "livekit-demos"
26+
2227

2328
class Assistant(Agent):
2429
def __init__(self) -> None:
@@ -65,6 +70,25 @@ async def my_agent(ctx: JobContext):
6570
"room": ctx.room.name,
6671
}
6772

73+
room_name = ctx.room.name
74+
75+
# Initialize egress manager for dual-channel audio recording
76+
egress_config = EgressConfig(
77+
s3_bucket=S3_BUCKET,
78+
s3_prefix=S3_PREFIX,
79+
)
80+
egress_manager = EgressManager(egress_config)
81+
82+
# Initialize transcript handler for saving STT output
83+
s3_uploader = S3Uploader(
84+
bucket=S3_BUCKET,
85+
prefix=S3_PREFIX,
86+
)
87+
transcript_handler = TranscriptHandler(
88+
room_name=room_name,
89+
s3_uploader=s3_uploader,
90+
)
91+
6892
# Set up a voice AI pipeline using OpenAI, Cartesia, AssemblyAI, and the LiveKit turn detector
6993
session = AgentSession(
7094
# Speech-to-text (STT) is your agent's ears, turning the user's speech into text that the LLM can understand
@@ -87,6 +111,36 @@ async def my_agent(ctx: JobContext):
87111
preemptive_generation=True,
88112
)
89113

114+
# Subscribe to conversation events to capture transcripts
115+
@session.on("conversation_item_added")
116+
def on_conversation_item_added(event: ConversationItemAddedEvent):
117+
"""Capture user and agent transcripts from conversation events."""
118+
item = event.item
119+
text = item.text_content
120+
if not text:
121+
return
122+
123+
if item.role == "user":
124+
transcript_handler.add_user_transcript(text, is_final=True)
125+
elif item.role == "assistant":
126+
transcript_handler.add_agent_transcript(text, is_final=True)
127+
128+
# Handle session close to finalize and upload transcript
129+
@session.on("close")
130+
async def on_session_close(_event):
131+
"""Finalize transcript and clean up egress when session ends."""
132+
logger.info(f"Session closing for room {room_name}, saving transcript...")
133+
134+
# Upload transcript to S3
135+
success = await transcript_handler.finalize_and_upload()
136+
if success:
137+
logger.info(f"Transcript saved for room {room_name}")
138+
else:
139+
logger.error(f"Failed to save transcript for room {room_name}")
140+
141+
# Clean up egress manager resources
142+
await egress_manager.close()
143+
90144
# To use a realtime model instead of a voice pipeline, use the following session setup instead.
91145
# (Note: This is for the OpenAI Realtime API. For other providers, see https://docs.livekit.io/agents/models/realtime/))
92146
# 1. Install livekit-agents[openai]
@@ -105,6 +159,16 @@ async def my_agent(ctx: JobContext):
105159
# # Start the avatar and wait for it to join
106160
# await avatar.start(session, room=ctx.room)
107161

162+
# Start dual-channel audio recording via egress
163+
egress_id = await egress_manager.start_dual_channel_recording(room_name)
164+
if egress_id:
165+
logger.info(f"Started dual-channel recording for room {room_name}")
166+
else:
167+
logger.warning(
168+
f"Failed to start egress recording for room {room_name}, "
169+
"continuing without recording"
170+
)
171+
108172
# Start the session, which initializes the voice pipeline and warms up the models
109173
await session.start(
110174
agent=Assistant(),

src/egress_manager.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
"""Egress manager for recording dual-channel audio to S3."""
2+
3+
import os
4+
5+
from livekit import api
6+
from livekit.protocol import egress as egress_proto
7+
from loguru import logger
8+
9+
10+
class EgressConfig:
11+
"""Configuration for egress recordings."""
12+
13+
def __init__(
14+
self,
15+
s3_bucket: str,
16+
s3_prefix: str = "",
17+
aws_access_key: str | None = None,
18+
aws_secret_key: str | None = None,
19+
aws_region: str | None = None,
20+
livekit_url: str | None = None,
21+
livekit_api_key: str | None = None,
22+
livekit_api_secret: str | None = None,
23+
):
24+
"""Initialize egress configuration.
25+
26+
Args:
27+
s3_bucket: S3 bucket name for recordings
28+
s3_prefix: Prefix/path within the bucket
29+
aws_access_key: AWS access key (defaults to env var)
30+
aws_secret_key: AWS secret key (defaults to env var)
31+
aws_region: AWS region (defaults to env var or us-east-1)
32+
livekit_url: LiveKit server URL (defaults to env var)
33+
livekit_api_key: LiveKit API key (defaults to env var)
34+
livekit_api_secret: LiveKit API secret (defaults to env var)
35+
"""
36+
self.s3_bucket = s3_bucket
37+
self.s3_prefix = s3_prefix.rstrip("/")
38+
39+
# AWS credentials
40+
self.aws_access_key = aws_access_key or os.environ.get("AWS_ACCESS_KEY_ID", "")
41+
self.aws_secret_key = aws_secret_key or os.environ.get(
42+
"AWS_SECRET_ACCESS_KEY", ""
43+
)
44+
self.aws_region = aws_region or os.environ.get("AWS_REGION", "us-east-1")
45+
46+
# LiveKit credentials
47+
self.livekit_url = livekit_url or os.environ.get("LIVEKIT_URL", "")
48+
self.livekit_api_key = livekit_api_key or os.environ.get("LIVEKIT_API_KEY", "")
49+
self.livekit_api_secret = livekit_api_secret or os.environ.get(
50+
"LIVEKIT_API_SECRET", ""
51+
)
52+
53+
54+
class EgressManager:
55+
"""Manages LiveKit egress for dual-channel audio recording to S3."""
56+
57+
def __init__(self, config: EgressConfig):
58+
"""Initialize the egress manager.
59+
60+
Args:
61+
config: Egress configuration
62+
"""
63+
self.config = config
64+
self._api: api.LiveKitAPI | None = None
65+
self._egress_id: str | None = None
66+
67+
@property
68+
def livekit_api(self) -> api.LiveKitAPI:
69+
"""Lazily initialize LiveKit API client."""
70+
if self._api is None:
71+
self._api = api.LiveKitAPI(
72+
url=self.config.livekit_url,
73+
api_key=self.config.livekit_api_key,
74+
api_secret=self.config.livekit_api_secret,
75+
)
76+
return self._api
77+
78+
@property
79+
def egress_id(self) -> str | None:
80+
"""Get the current egress ID if recording is active."""
81+
return self._egress_id
82+
83+
def _create_s3_upload(self) -> egress_proto.S3Upload:
84+
"""Create S3 upload configuration."""
85+
return egress_proto.S3Upload(
86+
access_key=self.config.aws_access_key,
87+
secret=self.config.aws_secret_key,
88+
bucket=self.config.s3_bucket,
89+
region=self.config.aws_region,
90+
)
91+
92+
async def start_dual_channel_recording(self, room_name: str) -> str | None:
93+
"""Start dual-channel audio recording for a room.
94+
95+
The agent's audio will be on one channel, and all other participants
96+
(users) will be on the other channel.
97+
98+
Args:
99+
room_name: Name of the LiveKit room to record
100+
101+
Returns:
102+
Egress ID if started successfully, None on failure
103+
"""
104+
if self._egress_id:
105+
logger.warning(
106+
f"Egress already active with ID {self._egress_id}, skipping start"
107+
)
108+
return self._egress_id
109+
110+
try:
111+
s3_upload = self._create_s3_upload()
112+
113+
# Build the filepath with prefix
114+
filepath_prefix = (
115+
f"{self.config.s3_prefix}/audio" if self.config.s3_prefix else "audio"
116+
)
117+
filepath = f"{filepath_prefix}/{{room_name}}-{{time}}.ogg"
118+
119+
file_output = egress_proto.EncodedFileOutput(
120+
filepath=filepath,
121+
s3=s3_upload,
122+
)
123+
124+
# Start room composite egress with dual-channel audio
125+
# DUAL_CHANNEL_AGENT puts agent audio on one channel, all other participants on the other
126+
info = await self.livekit_api.egress.start_room_composite_egress(
127+
egress_proto.RoomCompositeEgressRequest(
128+
room_name=room_name,
129+
audio_only=True,
130+
audio_mixing=egress_proto.AudioMixing.DUAL_CHANNEL_AGENT,
131+
file_outputs=[file_output],
132+
)
133+
)
134+
135+
self._egress_id = info.egress_id
136+
logger.info(
137+
f"Started dual-channel egress recording for room {room_name}, "
138+
f"egress_id={self._egress_id}"
139+
)
140+
return self._egress_id
141+
142+
except Exception as e:
143+
logger.error(f"Failed to start egress recording: {e}")
144+
return None
145+
146+
async def stop_recording(self) -> bool:
147+
"""Stop the active egress recording.
148+
149+
Returns:
150+
True if stopped successfully or no active recording, False on error
151+
"""
152+
if not self._egress_id:
153+
logger.debug("No active egress to stop")
154+
return True
155+
156+
try:
157+
await self.livekit_api.egress.stop_egress(
158+
egress_proto.StopEgressRequest(egress_id=self._egress_id)
159+
)
160+
logger.info(f"Stopped egress recording, egress_id={self._egress_id}")
161+
self._egress_id = None
162+
return True
163+
except Exception as e:
164+
logger.error(f"Failed to stop egress recording: {e}")
165+
return False
166+
167+
async def close(self) -> None:
168+
"""Clean up resources."""
169+
if self._api:
170+
await self._api.aclose()
171+
self._api = None
172+
173+
174+
def create_default_egress_manager() -> EgressManager:
175+
"""Create an egress manager with default configuration for the target S3 bucket.
176+
177+
Returns:
178+
Configured EgressManager instance
179+
"""
180+
config = EgressConfig(
181+
s3_bucket="audivi-audio-recordings",
182+
s3_prefix="livekit-demos",
183+
)
184+
return EgressManager(config)

0 commit comments

Comments
 (0)