Skip to content

Commit 1920690

Browse files
committed
decrease buffer size and print timestamps
1 parent 1738a10 commit 1920690

File tree

2 files changed

+62
-24
lines changed

2 files changed

+62
-24
lines changed

examples/video-stream/video_play.py

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313

1414
try:
1515
import av
16+
import cv2
1617
except ImportError:
1718
raise RuntimeError(
18-
"av is required to run this example, install with `pip install av`"
19+
"av and opencv-python is required to run this example, install with `pip install av opencv-python`"
1920
)
2021

2122
# ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set
@@ -51,36 +52,56 @@ def __init__(self, media_file: Union[str, Path]) -> None:
5152
audio_sample_rate=audio_stream.sample_rate,
5253
audio_channels=audio_stream.channels,
5354
)
55+
print(self._info)
5456

5557
@property
5658
def info(self) -> MediaInfo:
5759
return self._info
5860

59-
async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]:
61+
async def stream_video(
62+
self, av_sync: rtc.AVSynchronizer
63+
) -> AsyncIterable[tuple[rtc.VideoFrame, float]]:
6064
"""Streams video frames from the media file in an endless loop."""
61-
for av_frame in self._video_container.decode(video=0):
65+
for i, av_frame in enumerate(self._video_container.decode(video=0)):
6266
# Convert video frame to RGBA
6367
frame = av_frame.to_rgb().to_ndarray()
6468
frame_rgba = np.ones((frame.shape[0], frame.shape[1], 4), dtype=np.uint8)
6569
frame_rgba[:, :, :3] = frame
66-
yield rtc.VideoFrame(
67-
width=frame.shape[1],
68-
height=frame.shape[0],
69-
type=rtc.VideoBufferType.RGBA,
70-
data=frame_rgba.tobytes(),
70+
71+
# put fps and timestamps in the frame
72+
frame_rgba = cv2.putText(
73+
frame_rgba, f"{av_sync.actual_fps:.2f}fps", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2
74+
)
75+
76+
if i % 10 == 0:
77+
print(
78+
f"decoded frame {i} ({av_frame.time:.3f}s), {av_sync.actual_fps:.2f}fps, "
79+
f"last video time: {av_sync.last_video_time:.3f}s, last audio time: {av_sync.last_audio_time:.3f}s"
80+
)
81+
yield (
82+
rtc.VideoFrame(
83+
width=frame.shape[1],
84+
height=frame.shape[0],
85+
type=rtc.VideoBufferType.RGBA,
86+
data=frame_rgba.tobytes(),
87+
),
88+
av_frame.time,
7189
)
7290

73-
async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]:
91+
async def stream_audio(self) -> AsyncIterable[tuple[rtc.AudioFrame, float]]:
7492
"""Streams audio frames from the media file in an endless loop."""
75-
for av_frame in self._audio_container.decode(audio=0):
93+
for i, av_frame in enumerate(self._audio_container.decode(audio=0)):
7694
# Convert audio frame to raw int16 samples
7795
frame = av_frame.to_ndarray().T # Transpose to (samples, channels)
7896
frame = (frame * 32768).astype(np.int16)
79-
yield rtc.AudioFrame(
80-
data=frame.tobytes(),
81-
sample_rate=self.info.audio_sample_rate,
82-
num_channels=frame.shape[1],
83-
samples_per_channel=frame.shape[0],
97+
yield (
98+
rtc.AudioFrame(
99+
data=frame.tobytes(),
100+
sample_rate=self.info.audio_sample_rate,
101+
num_channels=frame.shape[1],
102+
samples_per_channel=frame.shape[0],
103+
),
104+
av_frame.time,
84105
)
85106

86107
def reset(self):
@@ -102,6 +123,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
102123
api.VideoGrants(
103124
room_join=True,
104125
room=room_name,
126+
agent=True,
105127
)
106128
)
107129
.to_jwt()
@@ -121,7 +143,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
121143
media_info = streamer.info
122144

123145
# Create video and audio sources/tracks
124-
queue_size_ms = 1000 # 1 second
146+
queue_size_ms = 50 # TODO: testing with different sizes
125147
video_source = rtc.VideoSource(
126148
width=media_info.video_width,
127149
height=media_info.video_height,
@@ -157,18 +179,18 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
157179
)
158180

159181
async def _push_frames(
160-
stream: AsyncIterable[rtc.VideoFrame | rtc.AudioFrame],
182+
stream: AsyncIterable[tuple[rtc.VideoFrame | rtc.AudioFrame, float]],
161183
av_sync: rtc.AVSynchronizer,
162184
):
163-
async for frame in stream:
164-
await av_sync.push(frame)
185+
async for frame, timestamp in stream:
186+
await av_sync.push(frame, timestamp)
165187
await asyncio.sleep(0)
166188

167189
try:
168190
while True:
169191
streamer.reset()
170192
video_task = asyncio.create_task(
171-
_push_frames(streamer.stream_video(), av_sync)
193+
_push_frames(streamer.stream_video(av_sync), av_sync)
172194
)
173195
audio_task = asyncio.create_task(
174196
_push_frames(streamer.stream_audio(), av_sync)

livekit-rtc/livekit/rtc/synchronizer.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def __init__(
4343
self._max_delay_tolerance_ms = _max_delay_tolerance_ms
4444

4545
self._stopped = False
46+
self._last_video_time: float = 0
47+
self._last_audio_time: float = 0
4648

4749
self._video_queue_max_size = int(
4850
self._video_fps * self._video_queue_size_ms / 1000
@@ -51,7 +53,7 @@ def __init__(
5153
# ensure queue is bounded if queue size is specified
5254
self._video_queue_max_size = max(1, self._video_queue_max_size)
5355

54-
self._video_queue = asyncio.Queue[VideoFrame](
56+
self._video_queue = asyncio.Queue[tuple[VideoFrame, float]](
5557
maxsize=self._video_queue_max_size
5658
)
5759
self._fps_controller = _FPSController(
@@ -60,12 +62,16 @@ def __init__(
6062
)
6163
self._capture_video_task = asyncio.create_task(self._capture_video())
6264

63-
async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None:
65+
async def push(
66+
self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None
67+
) -> None:
6468
if isinstance(frame, AudioFrame):
6569
await self._audio_source.capture_frame(frame)
70+
if timestamp is not None:
71+
self._last_audio_time = timestamp
6672
return
6773

68-
await self._video_queue.put(frame)
74+
await self._video_queue.put((frame, timestamp))
6975

7076
async def clear_queue(self) -> None:
7177
self._audio_source.clear_queue()
@@ -79,9 +85,11 @@ async def wait_for_playout(self) -> None:
7985

8086
async def _capture_video(self) -> None:
8187
while not self._stopped:
82-
frame = await self._video_queue.get()
88+
frame, timestamp = await self._video_queue.get()
8389
async with self._fps_controller:
8490
self._video_source.capture_frame(frame)
91+
if timestamp is not None:
92+
self._last_video_time = timestamp
8593
self._video_queue.task_done()
8694

8795
async def aclose(self) -> None:
@@ -93,6 +101,14 @@ async def aclose(self) -> None:
93101
def actual_fps(self) -> float:
94102
return self._fps_controller.actual_fps
95103

104+
@property
105+
def last_video_time(self) -> float:
106+
return self._last_video_time
107+
108+
@property
109+
def last_audio_time(self) -> float:
110+
return self._last_audio_time
111+
96112

97113
class _FPSController:
98114
def __init__(

0 commit comments

Comments
 (0)