Skip to content

Commit 51b51e0

Browse files
committed
add reset for av sync
1 parent 964dcb2 commit 51b51e0

File tree

2 files changed

+56
-44
lines changed

2 files changed

+56
-44
lines changed

examples/video-stream/video_play.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async def stream_video(self) -> AsyncIterable[tuple[rtc.VideoFrame, float]]:
7575

7676
async def stream_audio(self) -> AsyncIterable[tuple[rtc.AudioFrame, float]]:
7777
"""Streams audio frames from the media file in an endless loop."""
78-
for i, av_frame in enumerate(self._audio_container.decode(audio=0)):
78+
for av_frame in self._audio_container.decode(audio=0):
7979
# Convert audio frame to raw int16 samples
8080
frame = av_frame.to_ndarray().T # Transpose to (samples, channels)
8181
frame = (frame * 32768).astype(np.int16)
@@ -129,7 +129,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
129129
media_info = streamer.info
130130

131131
# Create video and audio sources/tracks
132-
queue_size_ms = 1000 # TODO: testing with different sizes
132+
queue_size_ms = 1000
133133
video_source = rtc.VideoSource(
134134
width=media_info.video_width,
135135
height=media_info.video_height,
@@ -172,19 +172,45 @@ async def _push_frames(
172172
await av_sync.push(frame, timestamp)
173173
await asyncio.sleep(0)
174174

175+
async def _log_fps(av_sync: rtc.AVSynchronizer):
176+
while True:
177+
await asyncio.sleep(2)
178+
diff = av_sync.last_video_time - av_sync.last_audio_time
179+
180+
logger.info(
181+
f"fps: {av_sync.actual_fps:.2f}, video_time: {av_sync.last_video_time:.3f}s, "
182+
f"audio_time: {av_sync.last_audio_time:.3f}s, diff: {diff:.3f}s"
183+
)
184+
175185
try:
176186
while True:
177187
streamer.reset()
178-
video_task = asyncio.create_task(
179-
_push_frames(streamer.stream_video(), av_sync)
180-
)
181-
audio_task = asyncio.create_task(
182-
_push_frames(streamer.stream_audio(), av_sync)
188+
189+
video_stream = streamer.stream_video()
190+
audio_stream = streamer.stream_audio()
191+
192+
# read the head frames and push them at the same time
193+
first_video_frame, video_timestamp = await video_stream.__anext__()
194+
first_audio_frame, audio_timestamp = await audio_stream.__anext__()
195+
logger.info(
196+
f"first video duration: {1/media_info.video_fps:.3f}s, "
197+
f"first audio duration: {first_audio_frame.duration:.3f}s"
183198
)
199+
await av_sync.push(first_video_frame, video_timestamp)
200+
await av_sync.push(first_audio_frame, audio_timestamp)
201+
202+
video_task = asyncio.create_task(_push_frames(video_stream, av_sync))
203+
audio_task = asyncio.create_task(_push_frames(audio_stream, av_sync))
204+
205+
log_fps_task = asyncio.create_task(_log_fps(av_sync))
184206

185207
# wait for both tasks to complete
186208
await asyncio.gather(video_task, audio_task)
187209
await av_sync.wait_for_playout()
210+
211+
# clean up
212+
av_sync.reset()
213+
log_fps_task.cancel()
188214
logger.info("playout finished")
189215
finally:
190216
await streamer.aclose()

livekit-rtc/livekit/rtc/synchronizer.py

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
self._max_delay_tolerance_ms = _max_delay_tolerance_ms
4646

4747
self._stopped = False
48+
# the time of the last video/audio frame captured
4849
self._last_video_time: float = 0
4950
self._last_audio_time: float = 0
5051

@@ -55,7 +56,7 @@ def __init__(
5556
# ensure queue is bounded if queue size is specified
5657
self._video_queue_max_size = max(1, self._video_queue_max_size)
5758

58-
self._video_queue = asyncio.Queue[tuple[VideoFrame, float]](
59+
self._video_queue = asyncio.Queue[tuple[VideoFrame, Optional[float]]](
5960
maxsize=self._video_queue_max_size
6061
)
6162
self._fps_controller = _FPSController(
@@ -67,6 +68,13 @@ def __init__(
6768
async def push(
6869
self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None
6970
) -> None:
71+
"""Push a frame to the synchronizer
72+
73+
Args:
74+
frame: The video or audio frame to push.
75+
timestamp: (optional) The timestamp of the frame, for logging purposes for now.
76+
For AudioFrame, it should be the end time of the frame.
77+
"""
7078
if isinstance(frame, AudioFrame):
7179
await self._audio_source.capture_frame(frame)
7280
if timestamp is not None:
@@ -79,53 +87,25 @@ async def clear_queue(self) -> None:
7987
self._audio_source.clear_queue()
8088
while not self._video_queue.empty():
8189
await self._video_queue.get()
90+
self._video_queue.task_done()
8291

8392
async def wait_for_playout(self) -> None:
8493
"""Wait until all video and audio frames are played out."""
85-
await self._audio_source.wait_for_playout()
86-
await self._video_queue.join()
94+
await asyncio.gather(
95+
self._audio_source.wait_for_playout(),
96+
self._video_queue.join(),
97+
)
98+
99+
def reset(self) -> None:
100+
self._fps_controller.reset()
87101

88102
async def _capture_video(self) -> None:
89-
count = 0
90103
while not self._stopped:
91104
frame, timestamp = await self._video_queue.get()
92-
93105
async with self._fps_controller:
94-
# debug
95-
frame_rgba = np.frombuffer(frame.data, dtype=np.uint8).reshape(
96-
frame.height, frame.width, 4
97-
)
98-
frame_bgr = cv2.cvtColor(frame_rgba[:, :, :3], cv2.COLOR_RGBA2BGR)
99-
frame_bgr = cv2.putText(
100-
frame_bgr,
101-
f"{self.actual_fps:.2f}fps, video time: {timestamp:.3f}s, "
102-
f"audio time: {self.last_audio_time:.3f}s, diff: {timestamp - self.last_audio_time:.3f}s",
103-
(10, 100),
104-
cv2.FONT_HERSHEY_SIMPLEX,
105-
1,
106-
(0, 0, 255),
107-
2,
108-
)
109-
frame_rgba = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGBA)
110-
frame = VideoFrame(
111-
width=frame.width,
112-
height=frame.height,
113-
type=frame.type,
114-
data=frame_rgba.tobytes(),
115-
)
116-
count += 1
117-
# end debug
118-
119106
self._video_source.capture_frame(frame)
120107
if timestamp is not None:
121108
self._last_video_time = timestamp
122-
123-
if count % 30 == 0:
124-
diff = self.last_video_time - self.last_audio_time
125-
print(
126-
f"{self.actual_fps:.2f}fps, last video time: {self.last_video_time:.3f}s, "
127-
f"last audio time: {self.last_audio_time:.3f}s, diff: {diff:.3f}s"
128-
)
129109
self._video_queue.task_done()
130110

131111
async def aclose(self) -> None:
@@ -139,10 +119,12 @@ def actual_fps(self) -> float:
139119

140120
@property
141121
def last_video_time(self) -> float:
122+
"""The time of the last video frame captured"""
142123
return self._last_video_time
143124

144125
@property
145126
def last_audio_time(self) -> float:
127+
"""The time of the last audio frame played out"""
146128
return self._last_audio_time - self._audio_source.queued_duration
147129

148130

@@ -175,6 +157,10 @@ async def __aenter__(self) -> None:
175157
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
176158
self.after_process()
177159

160+
def reset(self) -> None:
161+
self._next_frame_time = None
162+
self._send_timestamps.clear()
163+
178164
async def wait_next_process(self) -> None:
179165
"""Wait until it's time for the next frame.
180166

0 commit comments

Comments
 (0)