Skip to content

Commit 522b34f

Browse files
authored
feat: add AVSynchronizer and examples (#324)
1 parent 42fb199 commit 522b34f

File tree

5 files changed

+736
-0
lines changed

5 files changed

+736
-0
lines changed

examples/video-stream/README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Video and Audio Synchronization Examples
2+
3+
This example demonstrates how to synchronize video and audio streams using the `AVSynchronizer` utility.
4+
5+
## AVSynchronizer Usage
6+
7+
The `AVSynchronizer` helps maintain synchronization between video and audio frames. The key principle is to push the initial synchronized video and audio frames together. After that, subsequent frames will be automatically synchronized according to the configured video FPS and audio sample rate.
8+
9+
```python
10+
av_sync = AVSynchronizer(
11+
audio_source=audio_source,
12+
video_source=video_source,
13+
video_fps=30.0,
14+
video_queue_size_ms=100
15+
)
16+
17+
# Push frames to synchronizer
18+
await av_sync.push(video_frame)
19+
await av_sync.push(audio_frame)
20+
```
21+
22+
## Examples
23+
24+
### 1. Video File Playback (`video_play.py`)
25+
Shows how to stream video and audio from separate sources while maintaining sync:
26+
27+
- Reads video and audio streams separately from a media file
28+
- Uses separate tasks to push video and audio frames to the synchronizer
29+
- Since the streams are continuous, a larger `queue_size_ms` can be used, though this will increase memory usage
30+
31+
### 2. Audio Visualization (`audio_wave.py`)
32+
Demonstrates generating video based on audio input:
33+
34+
- Generates audio frames with alternating sine waves and silence
35+
- Creates video frames visualizing the audio waveform
36+
- Shows how to handle cases with and without audio:
37+
- When audio is present: Push synchronized video and audio frames
38+
- During silence: Push only video frames
39+
- Since video and audio frames are pushed in the same loop, audio frames must be smaller than the audio source queue size to avoid blocking
40+
- Uses a small `queue_size_ms` (e.g. 50ms) to control frame generation speed during silence periods
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
import asyncio
2+
import logging
3+
import os
4+
import signal
5+
import time
6+
from collections import deque
7+
from dataclasses import dataclass
8+
from typing import AsyncIterable, Optional, Union
9+
10+
import numpy as np
11+
from livekit import rtc, api
12+
13+
try:
14+
import cv2
15+
except ImportError:
16+
raise RuntimeError(
17+
"cv2 is required to run this example, "
18+
"install with `pip install opencv-python`"
19+
)
20+
21+
# ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
@dataclass
27+
class MediaInfo:
28+
video_width: int
29+
video_height: int
30+
video_fps: float
31+
audio_sample_rate: int
32+
audio_channels: int
33+
34+
35+
class _AudioEndSentinel:
36+
pass
37+
38+
39+
async def audio_generator(
40+
media_info: MediaInfo,
41+
output_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]],
42+
):
43+
"""Generates audio frames with alternating sine wave and silence periods"""
44+
frequency = 480 # Hz
45+
amplitude = 0.5
46+
period = 7.0
47+
sine_duration = 5.0 # Duration of sine wave in each period
48+
chunk_size = 1024
49+
50+
while True:
51+
current_time = 0.0
52+
53+
# Generate audio for sine_duration seconds
54+
while current_time < sine_duration:
55+
t = np.linspace(
56+
current_time,
57+
current_time + chunk_size / media_info.audio_sample_rate,
58+
num=chunk_size,
59+
endpoint=False,
60+
)
61+
# Create volume envelope using sine wave
62+
volume = np.abs(np.sin(2 * np.pi * current_time / sine_duration))
63+
samples = amplitude * volume * np.sin(2 * np.pi * frequency * t)
64+
65+
# Convert to int16, (samples, channels)
66+
samples = (samples[:, np.newaxis] * 32767).astype(np.int16)
67+
if media_info.audio_channels > 1:
68+
samples = np.repeat(samples, media_info.audio_channels, axis=1)
69+
70+
# Create audio frame
71+
audio_frame = rtc.AudioFrame(
72+
data=samples.tobytes(),
73+
sample_rate=media_info.audio_sample_rate,
74+
num_channels=samples.shape[1],
75+
samples_per_channel=samples.shape[0],
76+
)
77+
await output_audio.put(audio_frame)
78+
current_time += chunk_size / media_info.audio_sample_rate
79+
await asyncio.sleep(0)
80+
await output_audio.put(_AudioEndSentinel())
81+
82+
# Simulate silence
83+
silence_duration = period - sine_duration
84+
await asyncio.sleep(silence_duration)
85+
86+
87+
class WaveformVisualizer:
88+
def __init__(self, history_length: int = 1000):
89+
self.history_length = history_length
90+
self.volume_history: deque[float] = deque(maxlen=history_length)
91+
self.start_time = time.time()
92+
93+
def draw_timestamp(self, canvas: np.ndarray, fps: float):
94+
height, width = canvas.shape[:2]
95+
text = f"{time.time() - self.start_time:.1f}s @ {fps:.1f}fps"
96+
font_face = cv2.FONT_HERSHEY_SIMPLEX
97+
font_scale = 2.0
98+
thickness = 2
99+
100+
(text_width, text_height), baseline = cv2.getTextSize(
101+
text, font_face, font_scale, thickness
102+
)
103+
x = (width - text_width) // 2
104+
y = int((height - text_height) * 0.4 + baseline)
105+
cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness)
106+
107+
def draw_current_wave(
108+
self, canvas: np.ndarray, audio_samples: np.ndarray
109+
) -> np.ndarray:
110+
"""Draw the current waveform and return the current values"""
111+
height, width = canvas.shape[:2]
112+
center_y = height // 2 + 100
113+
114+
normalized_samples = audio_samples.astype(np.float32) / 32767.0
115+
normalized_samples = normalized_samples.mean(axis=1) # (samples,)
116+
num_points = min(width, len(normalized_samples))
117+
118+
if len(normalized_samples) > num_points:
119+
indices = np.linspace(0, len(normalized_samples) - 1, num_points, dtype=int)
120+
plot_data = normalized_samples[indices]
121+
else:
122+
plot_data = normalized_samples
123+
124+
x_coords = np.linspace(0, width, num_points, dtype=int)
125+
y_coords = (plot_data * 200) + center_y
126+
127+
cv2.line(canvas, (0, center_y), (width, center_y), (200, 200, 200), 1)
128+
points = np.column_stack((x_coords, y_coords.astype(int)))
129+
for i in range(len(points) - 1):
130+
cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (0, 255, 0), 2)
131+
132+
return plot_data
133+
134+
def draw_volume_history(self, canvas: np.ndarray, current_volume: float):
135+
height, width = canvas.shape[:2]
136+
center_y = height // 2
137+
138+
self.volume_history.append(current_volume)
139+
cv2.line(
140+
canvas, (0, center_y - 250), (width, center_y - 250), (200, 200, 200), 1
141+
)
142+
143+
volume_x = np.linspace(0, width, len(self.volume_history), dtype=int)
144+
volume_y = center_y - 250 + (np.array(self.volume_history) * 200)
145+
points = np.column_stack((volume_x, volume_y.astype(int)))
146+
for i in range(len(points) - 1):
147+
cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (255, 0, 0), 2)
148+
149+
def draw(self, canvas: np.ndarray, audio_samples: np.ndarray, fps: float):
150+
self.draw_timestamp(canvas, fps)
151+
plot_data = self.draw_current_wave(canvas, audio_samples)
152+
current_volume = np.abs(plot_data).mean()
153+
self.draw_volume_history(canvas, current_volume)
154+
155+
156+
async def video_generator(
157+
media_info: MediaInfo,
158+
input_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]],
159+
av_sync: rtc.AVSynchronizer, # only used for drawing the actual fps on the video
160+
) -> AsyncIterable[tuple[rtc.VideoFrame, Optional[rtc.AudioFrame]]]:
161+
canvas = np.zeros(
162+
(media_info.video_height, media_info.video_width, 4), dtype=np.uint8
163+
)
164+
canvas.fill(255)
165+
166+
def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame:
167+
return rtc.VideoFrame(
168+
width=image.shape[1],
169+
height=image.shape[0],
170+
type=rtc.VideoBufferType.RGBA,
171+
data=image.tobytes(),
172+
)
173+
174+
audio_samples_per_frame = int(media_info.audio_sample_rate / media_info.video_fps)
175+
audio_buffer = np.zeros((0, media_info.audio_channels), dtype=np.int16)
176+
wave_visualizer = WaveformVisualizer()
177+
while True:
178+
try:
179+
# timeout has to be shorter than the frame interval to avoid starvation
180+
audio_frame = await asyncio.wait_for(
181+
input_audio.get(), timeout=0.5 / media_info.video_fps
182+
)
183+
except asyncio.TimeoutError:
184+
# generate frame without audio (e.g. silence state)
185+
new_frame = canvas.copy()
186+
wave_visualizer.draw(new_frame, np.zeros((1, 2)), av_sync.actual_fps)
187+
video_frame = _np_to_video_frame(new_frame)
188+
yield video_frame, None
189+
190+
# speed is controlled by the video fps in av_sync
191+
await asyncio.sleep(0)
192+
continue
193+
194+
if isinstance(audio_frame, _AudioEndSentinel):
195+
# drop the audio buffer when the audio finished
196+
audio_buffer = np.zeros((0, media_info.audio_channels), dtype=np.int16)
197+
continue
198+
199+
audio_samples = np.frombuffer(audio_frame.data, dtype=np.int16).reshape(
200+
-1, audio_frame.num_channels
201+
) # (samples, channels)
202+
# accumulate audio samples to the buffer
203+
audio_buffer = np.concatenate([audio_buffer, audio_samples], axis=0)
204+
205+
while audio_buffer.shape[0] >= audio_samples_per_frame:
206+
sub_samples = audio_buffer[:audio_samples_per_frame, :]
207+
audio_buffer = audio_buffer[audio_samples_per_frame:, :]
208+
209+
new_frame = canvas.copy()
210+
wave_visualizer.draw(new_frame, sub_samples, av_sync.actual_fps)
211+
video_frame = _np_to_video_frame(new_frame)
212+
sub_audio_frame = rtc.AudioFrame(
213+
data=sub_samples.tobytes(),
214+
sample_rate=audio_frame.sample_rate,
215+
num_channels=sub_samples.shape[1],
216+
samples_per_channel=sub_samples.shape[0],
217+
)
218+
yield video_frame, sub_audio_frame
219+
220+
221+
async def main(room: rtc.Room):
222+
token = (
223+
api.AccessToken()
224+
.with_identity("python-publisher")
225+
.with_name("Python Publisher")
226+
.with_grants(
227+
api.VideoGrants(
228+
room_join=True,
229+
room="room-ysBA-Q0hM",
230+
agent=True,
231+
)
232+
)
233+
.to_jwt()
234+
)
235+
url = os.getenv("LIVEKIT_URL")
236+
logging.info("connecting to %s", url)
237+
238+
try:
239+
await room.connect(url, token)
240+
logging.info("connected to room %s", room.name)
241+
except rtc.ConnectError as e:
242+
logging.error("failed to connect to the room: %s", e)
243+
return
244+
245+
# Create media info
246+
media_info = MediaInfo(
247+
video_width=1280,
248+
video_height=720,
249+
video_fps=30.0,
250+
audio_sample_rate=48000,
251+
audio_channels=2,
252+
)
253+
254+
# Create video and audio sources/tracks
255+
queue_size_ms = 50
256+
video_source = rtc.VideoSource(
257+
width=media_info.video_width,
258+
height=media_info.video_height,
259+
)
260+
audio_source = rtc.AudioSource(
261+
sample_rate=media_info.audio_sample_rate,
262+
num_channels=media_info.audio_channels,
263+
queue_size_ms=queue_size_ms,
264+
)
265+
266+
video_track = rtc.LocalVideoTrack.create_video_track("video", video_source)
267+
audio_track = rtc.LocalAudioTrack.create_audio_track("audio", audio_source)
268+
269+
# Publish tracks
270+
video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA)
271+
audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
272+
273+
await room.local_participant.publish_track(video_track, video_options)
274+
await room.local_participant.publish_track(audio_track, audio_options)
275+
276+
# Create AV synchronizer
277+
av_sync = rtc.AVSynchronizer(
278+
audio_source=audio_source,
279+
video_source=video_source,
280+
video_fps=media_info.video_fps,
281+
video_queue_size_ms=queue_size_ms,
282+
)
283+
284+
# Start audio generator
285+
audio_queue = asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]](maxsize=1)
286+
audio_task = asyncio.create_task(audio_generator(media_info, audio_queue))
287+
288+
try:
289+
async for video_frame, audio_frame in video_generator(
290+
media_info, audio_queue, av_sync=av_sync
291+
):
292+
await av_sync.push(video_frame)
293+
if audio_frame:
294+
await av_sync.push(audio_frame)
295+
finally:
296+
audio_task.cancel()
297+
await av_sync.aclose()
298+
299+
300+
if __name__ == "__main__":
301+
logging.basicConfig(
302+
level=logging.INFO,
303+
handlers=[logging.FileHandler("audio_wave.log"), logging.StreamHandler()],
304+
)
305+
306+
loop = asyncio.get_event_loop()
307+
room = rtc.Room(loop=loop)
308+
309+
async def cleanup():
310+
await room.disconnect()
311+
loop.stop()
312+
313+
asyncio.ensure_future(main(room))
314+
for signal in [signal.SIGINT, signal.SIGTERM]:
315+
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))
316+
317+
try:
318+
loop.run_forever()
319+
finally:
320+
loop.close()

0 commit comments

Comments
 (0)