From 9181149ce3308c078850ea470cfa45984c6c14db Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Thu, 6 Feb 2025 19:14:40 +0100 Subject: [PATCH 01/10] feat: add stream stats endpoint This commit adds a new stream stats endpoint which can be used to retrieve the fps metrics in a way that doesn't affect performance. --- server/app.py | 92 +++++++++++++++++++++++++++++++++++++++++++++---- server/utils.py | 71 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 154 insertions(+), 9 deletions(-) diff --git a/server/app.py b/server/app.py index 2a896e57..2547ba44 100644 --- a/server/app.py +++ b/server/app.py @@ -12,12 +12,14 @@ RTCConfiguration, RTCIceServer, MediaStreamTrack, - RTCDataChannel, ) +import threading +import av from aiortc.rtcrtpsender import RTCRtpSender from aiortc.codecs import h264 from pipeline import Pipeline -from utils import patch_loop_datagram +from utils import patch_loop_datagram, StreamStats +import time logger = logging.getLogger(__name__) logging.getLogger('aiortc.rtcrtpsender').setLevel(logging.WARNING) @@ -29,16 +31,82 @@ class VideoStreamTrack(MediaStreamTrack): + """video stream track that processes video frames using a pipeline. + + Attributes: + kind (str): The kind of media, which is "video" for this class. + track (MediaStreamTrack): The underlying media stream track. + pipeline (Pipeline): The processing pipeline to apply to each video frame. + """ + kind = "video" - def __init__(self, track: MediaStreamTrack, pipeline): + def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): + """Initialize the VideoStreamTrack. + + Args: + track: The underlying media stream track. + pipeline: The processing pipeline to apply to each video frame. + """ super().__init__() self.track = track self.pipeline = pipeline - - async def recv(self): - frame = await self.track.recv() - return await self.pipeline(frame) + self._frame_count = 0 + self._start_time = time.monotonic() + self._lock = threading.Lock() + self._fps = 0.0 + self._running = True + self._start_fps_thread() + + def _start_fps_thread(self): + """Start a separate thread to calculate FPS periodically.""" + self.fps_thread = threading.Thread(target=self._calculate_fps_loop, daemon=True) + self.fps_thread.start() + + def _calculate_fps_loop(self): + """Loop to calculate FPS periodically.""" + while self._running: + time.sleep(1) # Calculate FPS every second. + with self._lock: + current_time = time.monotonic() + time_diff = current_time - self._start_time + if time_diff > 0: + self._fps = self._frame_count / time_diff + + # Reset start_time and frame_count for the next interval. + self._start_time = current_time + self._frame_count = 0 + + def stop(self): + """Stop the FPS calculation thread.""" + self._running = False + self.fps_thread.join() + + @property + def fps(self) -> float: + """Get the current output frames per second (FPS). + + Returns: + The current output FPS. + """ + with self._lock: + return self._fps + + async def recv(self) -> av.VideoFrame: + """Receive and process a video frame. Called by the WebRTC library when a frame + is received. + + Returns: + The processed video frame. + """ + input_frame = await self.track.recv() + processed_frame = await self.pipeline(input_frame) + + # Increment frame count for FPS calculation. + with self._lock: + self._frame_count += 1 + + return processed_frame def force_codec(pc, sender, forced_codec): @@ -156,6 +224,10 @@ def on_track(track): tracks["video"] = videoTrack sender = pc.addTrack(videoTrack) + # Store video track in app for stats. + stream_id = track.id + request.app["video_tracks"][stream_id] = videoTrack + codec = "video/H264" force_codec(pc, sender, codec) @@ -207,6 +279,7 @@ async def on_startup(app: web.Application): cwd=app["workspace"], disable_cuda_malloc=True, gpu_only=True ) app["pcs"] = set() + app["video_tracks"] = {} async def on_shutdown(app: web.Application): @@ -251,4 +324,9 @@ async def on_shutdown(app: web.Application): app.router.add_post("/prompt", set_prompt) app.router.add_get("/", health) + # Add routes for getting stream statistics. + stream_stats = StreamStats(app) + app.router.add_get("/stats", stream_stats.get_stats) + app.router.add_get("/stats/{stream_id}", stream_stats.get_stats_by_id) + web.run_app(app, host=args.host, port=int(args.port)) diff --git a/server/utils.py b/server/utils.py index db263f88..858f431d 100644 --- a/server/utils.py +++ b/server/utils.py @@ -1,9 +1,13 @@ +"""Utility functions for the server.""" + import asyncio import random import types import logging - -from typing import List, Tuple +import json +from aiohttp import web +from aiortc import MediaStreamTrack +from typing import List, Tuple, Any, Dict logger = logging.getLogger(__name__) @@ -48,3 +52,66 @@ async def create_datagram_endpoint( loop.create_datagram_endpoint = types.MethodType(create_datagram_endpoint, loop) loop._patch_done = True + + +class StreamStats: + """Class to get stream statistics.""" + + def __init__(self, app: web.Application): + """Initialize the StreamStats class.""" + self._app = app + + def get_video_track_stats(self, video_track: MediaStreamTrack) -> Dict[str, Any]: + """Get statistics for a video track. + + Args: + video_track: The VideoStreamTrack instance. + + Returns: + A dictionary containing the statistics. + """ + return { + "fps": video_track.fps, + } + + async def get_stats(self, _) -> web.Response: + """Get the current stream statistics for all streams. + + Args: + request: The HTTP GET request. + + Returns: + The HTTP response containing the statistics. + """ + video_tracks = self._app.get("video_tracks", {}) + all_stats = { + stream_id: self.get_video_track_stats(track) + for stream_id, track in video_tracks.items() + } + + return web.Response( + content_type="application/json", + text=json.dumps(all_stats), + ) + + async def get_stats_by_id(self, request: web.Request) -> web.Response: + """Get the statistics for a specific stream by ID. + + Args: + request: The HTTP GET request. + + Returns: + The HTTP response containing the statistics. + """ + stream_id = request.match_info.get("stream_id") + video_track = self._app["video_tracks"].get(stream_id) + + if video_track: + stats = self.get_video_track_stats(video_track) + else: + stats = {"error": "Stream not found"} + + return web.Response( + content_type="application/json", + text=json.dumps(stats), + ) From 19fc0acb79820d31004508195e8010ef12d833c2 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Fri, 7 Feb 2025 08:10:34 +0100 Subject: [PATCH 02/10] refactor: add `live` prefix This commit ensures that the paths are also available under the `live` prefix. This will allow consistency with the hosted experience and improve the user experience. --- server/app.py | 14 ++++++++++---- server/utils.py | 13 +++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/server/app.py b/server/app.py index 2547ba44..43f7ba3b 100644 --- a/server/app.py +++ b/server/app.py @@ -18,7 +18,7 @@ from aiortc.rtcrtpsender import RTCRtpSender from aiortc.codecs import h264 from pipeline import Pipeline -from utils import patch_loop_datagram, StreamStats +from utils import patch_loop_datagram, StreamStats, add_prefix_to_app_routes import time logger = logging.getLogger(__name__) @@ -320,13 +320,19 @@ async def on_shutdown(app: web.Application): app.on_startup.append(on_startup) app.on_shutdown.append(on_shutdown) + app.router.add_get("/", health) + + # WebRTC signalling and control routes. app.router.add_post("/offer", offer) app.router.add_post("/prompt", set_prompt) - app.router.add_get("/", health) # Add routes for getting stream statistics. stream_stats = StreamStats(app) - app.router.add_get("/stats", stream_stats.get_stats) - app.router.add_get("/stats/{stream_id}", stream_stats.get_stats_by_id) + app.router.add_get("/streams/stats", stream_stats.get_stats) + app.router.add_get("/stream/{stream_id}/stats", stream_stats.get_stats_by_id) + + # Add hosted platform route prefix. + # NOTE: This ensures that the local and hosted experiences have consistent routes. + add_prefix_to_app_routes(app, "/live") web.run_app(app, host=args.host, port=int(args.port)) diff --git a/server/utils.py b/server/utils.py index 858f431d..7e6bc18c 100644 --- a/server/utils.py +++ b/server/utils.py @@ -54,6 +54,19 @@ async def create_datagram_endpoint( loop._patch_done = True +def add_prefix_to_app_routes(app: web.Application, prefix: str): + """Add a prefix to all routes in the given application. + + Args: + app: The web application whose routes will be prefixed. + prefix: The prefix to add to all routes. + """ + prefix = prefix.rstrip("/") + for route in list(app.router.routes()): + new_path = prefix + route.resource.canonical + app.router.add_route(route.method, new_path, route.handler) + + class StreamStats: """Class to get stream statistics.""" From fab3b64c7f4fe057b52ae467a9e813f195728ae4 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Fri, 7 Feb 2025 16:43:08 +0100 Subject: [PATCH 03/10] refactor: improve internal fps parameter naming This commit improves the naming of the parameters that are used in the fps calculation to ensure they are more descriptive. --- server/app.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/server/app.py b/server/app.py index 43f7ba3b..21389bd5 100644 --- a/server/app.py +++ b/server/app.py @@ -51,8 +51,8 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): super().__init__() self.track = track self.pipeline = pipeline - self._frame_count = 0 - self._start_time = time.monotonic() + self._fps_interval_frame_count = 0 + self._last_fps_calculation_time = time.monotonic() self._lock = threading.Lock() self._fps = 0.0 self._running = True @@ -60,8 +60,8 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): def _start_fps_thread(self): """Start a separate thread to calculate FPS periodically.""" - self.fps_thread = threading.Thread(target=self._calculate_fps_loop, daemon=True) - self.fps_thread.start() + self._fps_thread = threading.Thread(target=self._calculate_fps_loop, daemon=True) + self._fps_thread.start() def _calculate_fps_loop(self): """Loop to calculate FPS periodically.""" @@ -69,18 +69,18 @@ def _calculate_fps_loop(self): time.sleep(1) # Calculate FPS every second. with self._lock: current_time = time.monotonic() - time_diff = current_time - self._start_time + time_diff = current_time - self._last_fps_calculation_time if time_diff > 0: - self._fps = self._frame_count / time_diff + self._fps = self._fps_interval_frame_count / time_diff # Reset start_time and frame_count for the next interval. - self._start_time = current_time - self._frame_count = 0 + self._last_fps_calculation_time = current_time + self._fps_interval_frame_count = 0 def stop(self): """Stop the FPS calculation thread.""" self._running = False - self.fps_thread.join() + self._fps_thread.join() @property def fps(self) -> float: @@ -104,7 +104,7 @@ async def recv(self) -> av.VideoFrame: # Increment frame count for FPS calculation. with self._lock: - self._frame_count += 1 + self._fps_interval_frame_count += 1 return processed_frame From 85ebe5308a79a0f810df68c00707f8b4db57fdaf Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Fri, 7 Feb 2025 17:18:18 +0100 Subject: [PATCH 04/10] fix: ensure video stream is removed when stream ends This commit ensures that the video stream reference is removed from the app's `video_tracks` object when a stream ends, preventing potential memory leaks, incorrect data and ensuring proper cleanup. --- server/app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/server/app.py b/server/app.py index 21389bd5..85e9111c 100644 --- a/server/app.py +++ b/server/app.py @@ -234,6 +234,7 @@ def on_track(track): @track.on("ended") async def on_ended(): logger.info(f"{track.kind} track ended") + request.app["video_tracks"].pop(track.id, None) @pc.on("connectionstatechange") async def on_connectionstatechange(): From 40d92cf5903c9556e8eb7fbba01bdca38b809ca3 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Tue, 18 Feb 2025 12:44:48 +0100 Subject: [PATCH 05/10] feat: add FPS history and optimize code This commit adds FPS history and average minute FPS to the stats endpoints. It also replaces threads with asyncio, improving performance without increasing CPU usage. --- server/app.py | 77 ++++++++++++++++++++++++++++++++----------------- server/utils.py | 36 +++++++++++++---------- 2 files changed, 70 insertions(+), 43 deletions(-) diff --git a/server/app.py b/server/app.py index 85e9111c..01e7e098 100644 --- a/server/app.py +++ b/server/app.py @@ -4,6 +4,7 @@ import json import logging +from collections import deque from twilio.rest import Client from aiohttp import web from aiortc import ( @@ -22,8 +23,8 @@ import time logger = logging.getLogger(__name__) -logging.getLogger('aiortc.rtcrtpsender').setLevel(logging.WARNING) -logging.getLogger('aiortc.rtcrtpreceiver').setLevel(logging.WARNING) +logging.getLogger("aiortc.rtcrtpsender").setLevel(logging.WARNING) +logging.getLogger("aiortc.rtcrtpreceiver").setLevel(logging.WARNING) MAX_BITRATE = 2000000 @@ -51,27 +52,29 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): super().__init__() self.track = track self.pipeline = pipeline + + self._running = True + self._lock = threading.Lock() self._fps_interval_frame_count = 0 self._last_fps_calculation_time = time.monotonic() - self._lock = threading.Lock() self._fps = 0.0 - self._running = True - self._start_fps_thread() + self._fps_measurements = deque(maxlen=60) - def _start_fps_thread(self): - """Start a separate thread to calculate FPS periodically.""" - self._fps_thread = threading.Thread(target=self._calculate_fps_loop, daemon=True) - self._fps_thread.start() + # Start metrics collection tasks. + self._fps_stats_task = asyncio.create_task(self._calculate_fps_loop()) - def _calculate_fps_loop(self): + async def _calculate_fps_loop(self): """Loop to calculate FPS periodically.""" while self._running: - time.sleep(1) # Calculate FPS every second. + await asyncio.sleep(1) # Calculate FPS every second. with self._lock: current_time = time.monotonic() time_diff = current_time - self._last_fps_calculation_time if time_diff > 0: self._fps = self._fps_interval_frame_count / time_diff + self._fps_measurements.append( + self._fps + ) # Store the FPS measurement # Reset start_time and frame_count for the next interval. self._last_fps_calculation_time = current_time @@ -80,7 +83,6 @@ def _calculate_fps_loop(self): def stop(self): """Stop the FPS calculation thread.""" self._running = False - self._fps_thread.join() @property def fps(self) -> float: @@ -92,6 +94,28 @@ def fps(self) -> float: with self._lock: return self._fps + @property + def fps_measurements(self) -> list: + """Get the array of FPS measurements for the last minute. + + Returns: + The array of FPS measurements for the last minute. + """ + with self._lock: + return list(self._fps_measurements) + + @property + def average_fps(self) -> float: + """Calculate the average FPS from the measurements. + + Returns: + The average FPS. + """ + with self._lock: + if not self._fps_measurements: + return 0.0 + return sum(self._fps_measurements) / len(self._fps_measurements) + async def recv(self) -> av.VideoFrame: """Receive and process a video frame. Called by the WebRTC library when a frame is received. @@ -187,30 +211,29 @@ async def offer(request): @pc.on("datachannel") def on_datachannel(channel): if channel.label == "control": + @channel.on("message") async def on_message(message): try: params = json.loads(message) - + if params.get("type") == "get_nodes": nodes_info = await pipeline.get_nodes_info() - response = { - "type": "nodes_info", - "nodes": nodes_info - } + response = {"type": "nodes_info", "nodes": nodes_info} channel.send(json.dumps(response)) elif params.get("type") == "update_prompt": if "prompt" not in params: - logger.warning("[Control] Missing prompt in update_prompt message") + logger.warning( + "[Control] Missing prompt in update_prompt message" + ) return pipeline.set_prompt(params["prompt"]) - response = { - "type": "prompt_updated", - "success": True - } + response = {"type": "prompt_updated", "success": True} channel.send(json.dumps(response)) else: - logger.warning("[Server] Invalid message format - missing required fields") + logger.warning( + "[Server] Invalid message format - missing required fields" + ) except json.JSONDecodeError: logger.error("[Server] Invalid JSON received") except Exception as e: @@ -310,8 +333,8 @@ async def on_shutdown(app: web.Application): logging.basicConfig( level=args.log_level.upper(), - format='%(asctime)s [%(levelname)s] %(message)s', - datefmt='%H:%M:%S' + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", ) app = web.Application() @@ -329,8 +352,8 @@ async def on_shutdown(app: web.Application): # Add routes for getting stream statistics. stream_stats = StreamStats(app) - app.router.add_get("/streams/stats", stream_stats.get_stats) - app.router.add_get("/stream/{stream_id}/stats", stream_stats.get_stats_by_id) + app.router.add_get("/streams/stats", stream_stats.collect_all_stream_metrics) + app.router.add_get("/stream/{stream_id}/stats", stream_stats.collect_stream_metrics_by_id) # Add hosted platform route prefix. # NOTE: This ensures that the local and hosted experiences have consistent routes. diff --git a/server/utils.py b/server/utils.py index 7e6bc18c..2480d62e 100644 --- a/server/utils.py +++ b/server/utils.py @@ -68,33 +68,37 @@ def add_prefix_to_app_routes(app: web.Application, prefix: str): class StreamStats: - """Class to get stream statistics.""" + """Handles real-time video stream statistics collection.""" def __init__(self, app: web.Application): - """Initialize the StreamStats class.""" + """Initializes the StreamMetrics class. + + Args: + app: The web application instance storing video streams under the + "video_tracks" key. + """ self._app = app - def get_video_track_stats(self, video_track: MediaStreamTrack) -> Dict[str, Any]: - """Get statistics for a video track. + def collect_video_metrics(self, video_track: MediaStreamTrack) -> Dict[str, Any]: + """Collects real-time statistics for a video track. Args: - video_track: The VideoStreamTrack instance. + video_track: The video stream track instance. Returns: - A dictionary containing the statistics. + A dictionary containing FPS-related statistics. """ return { "fps": video_track.fps, + "minute_avg_fps": video_track.minute_avg_fps, + "minute_fps_array": video_track.minute_fps_array, } - async def get_stats(self, _) -> web.Response: - """Get the current stream statistics for all streams. - - Args: - request: The HTTP GET request. + async def collect_all_stream_metrics(self, _) -> web.Response: + """Retrieves real-time metrics for all active video streams. Returns: - The HTTP response containing the statistics. + A JSON response containing FPS statistics for all streams. """ video_tracks = self._app.get("video_tracks", {}) all_stats = { @@ -107,14 +111,14 @@ async def get_stats(self, _) -> web.Response: text=json.dumps(all_stats), ) - async def get_stats_by_id(self, request: web.Request) -> web.Response: - """Get the statistics for a specific stream by ID. + async def collect_stream_metrics_by_id(self, request: web.Request) -> web.Response: + """Retrieves real-time metrics for a specific video stream by ID. Args: - request: The HTTP GET request. + request: The HTTP request containing the stream ID. Returns: - The HTTP response containing the statistics. + A JSON response with stream metrics or an error message. """ stream_id = request.match_info.get("stream_id") video_track = self._app["video_tracks"].get(stream_id) From 76d1a2006be364a990fe6acf4d39f8cc2f36be60 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Tue, 18 Feb 2025 13:26:50 +0100 Subject: [PATCH 06/10] chore: revert formatter This commit reverts the black formatter to ensure we have minimal differences with the main branch. --- server/app.py | 24 ++++++++++-------------- server/utils.py | 1 - 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/server/app.py b/server/app.py index b499b019..09453668 100644 --- a/server/app.py +++ b/server/app.py @@ -14,7 +14,6 @@ RTCIceServer, MediaStreamTrack, ) -import threading from aiortc.rtcrtpsender import RTCRtpSender from aiortc.codecs import h264 from pipeline import Pipeline @@ -22,8 +21,8 @@ import time logger = logging.getLogger(__name__) -logging.getLogger("aiortc.rtcrtpsender").setLevel(logging.WARNING) -logging.getLogger("aiortc.rtcrtpreceiver").setLevel(logging.WARNING) +logging.getLogger('aiortc.rtcrtpsender').setLevel(logging.WARNING) +logging.getLogger('aiortc.rtcrtpreceiver').setLevel(logging.WARNING) MAX_BITRATE = 2000000 @@ -38,9 +37,7 @@ class VideoStreamTrack(MediaStreamTrack): track (MediaStreamTrack): The underlying media stream track. pipeline (Pipeline): The processing pipeline to apply to each video frame. """ - kind = "video" - def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): """Initialize the VideoStreamTrack. @@ -53,7 +50,7 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): self.pipeline = pipeline self._running = True - self._lock = threading.Lock() + self._lock = asyncio.Lock() self._fps_interval_frame_count = 0 self._last_fps_calculation_time = time.monotonic() self._fps = 0.0 @@ -235,15 +232,16 @@ async def offer(request): @pc.on("datachannel") def on_datachannel(channel): if channel.label == "control": - @channel.on("message") async def on_message(message): try: params = json.loads(message) - if params.get("type") == "get_nodes": nodes_info = await pipeline.get_nodes_info() - response = {"type": "nodes_info", "nodes": nodes_info} + response = { + "type": "nodes_info", + "nodes": nodes_info + } channel.send(json.dumps(response)) elif params.get("type") == "update_prompts": if "prompts" not in params: @@ -256,9 +254,7 @@ async def on_message(message): } channel.send(json.dumps(response)) else: - logger.warning( - "[Server] Invalid message format - missing required fields" - ) + logger.warning("[Server] Invalid message format - missing required fields") except json.JSONDecodeError: logger.error("[Server] Invalid JSON received") except Exception as e: @@ -367,8 +363,8 @@ async def on_shutdown(app: web.Application): logging.basicConfig( level=args.log_level.upper(), - format="%(asctime)s [%(levelname)s] %(message)s", - datefmt="%H:%M:%S", + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt='%H:%M:%S' ) app = web.Application() diff --git a/server/utils.py b/server/utils.py index 2480d62e..598a6835 100644 --- a/server/utils.py +++ b/server/utils.py @@ -1,5 +1,4 @@ """Utility functions for the server.""" - import asyncio import random import types From dfaabfff288ed22222ca6de5c7d3f8853b486af8 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Thu, 20 Feb 2025 12:47:33 +0100 Subject: [PATCH 07/10] feat: extend fps metrics This commit adds the `minute_avg_fps` and `minute_fps_array` to the stats endpoint and improves the code for optimal performance. --- server/app.py | 41 ++++++++++++++++++++++++----------------- server/utils.py | 12 ++++++------ 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/server/app.py b/server/app.py index 09453668..68795d57 100644 --- a/server/app.py +++ b/server/app.py @@ -3,8 +3,15 @@ import os import json import logging - from collections import deque + +import torch + +# Initialize CUDA before any other imports to prevent core dump. +if torch.cuda.is_available(): + torch.cuda.init() + + from twilio.rest import Client from aiohttp import web from aiortc import ( @@ -49,12 +56,12 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): self.track = track self.pipeline = pipeline - self._running = True self._lock = asyncio.Lock() self._fps_interval_frame_count = 0 self._last_fps_calculation_time = time.monotonic() self._fps = 0.0 self._fps_measurements = deque(maxlen=60) + self._running_event = asyncio.Event() asyncio.create_task(self.collect_frames()) @@ -72,9 +79,10 @@ async def collect_frames(self): async def _calculate_fps_loop(self): """Loop to calculate FPS periodically.""" - while self._running: + await self._running_event.wait() + while self.readyState != "ended": await asyncio.sleep(1) # Calculate FPS every second. - with self._lock: + async with self._lock: current_time = time.monotonic() time_diff = current_time - self._last_fps_calculation_time if time_diff > 0: @@ -87,38 +95,34 @@ async def _calculate_fps_loop(self): self._last_fps_calculation_time = current_time self._fps_interval_frame_count = 0 - def stop(self): - """Stop the FPS calculation thread.""" - self._running = False - @property - def fps(self) -> float: + async def fps(self) -> float: """Get the current output frames per second (FPS). Returns: The current output FPS. """ - with self._lock: + async with self._lock: return self._fps @property - def fps_measurements(self) -> list: + async def fps_measurements(self) -> list: """Get the array of FPS measurements for the last minute. Returns: The array of FPS measurements for the last minute. """ - with self._lock: + async with self._lock: return list(self._fps_measurements) @property - def average_fps(self) -> float: - """Calculate the average FPS from the measurements. + async def average_fps(self) -> float: + """Calculate the average FPS from the measurements taken in the last minute. Returns: - The average FPS. + The average FPS over the last minute. """ - with self._lock: + async with self._lock: if not self._fps_measurements: return 0.0 return sum(self._fps_measurements) / len(self._fps_measurements) @@ -127,8 +131,10 @@ async def recv(self): processed_frame = await self.pipeline.get_processed_video_frame() # Increment frame count for FPS calculation. - with self._lock: + async with self._lock: self._fps_interval_frame_count += 1 + if not self._running_event.is_set(): + self._running_event.set() return processed_frame @@ -375,6 +381,7 @@ async def on_shutdown(app: web.Application): app.on_shutdown.append(on_shutdown) app.router.add_get("/", health) + app.router.add_get("/health", health) # WebRTC signalling and control routes. app.router.add_post("/offer", offer) diff --git a/server/utils.py b/server/utils.py index 598a6835..c8566857 100644 --- a/server/utils.py +++ b/server/utils.py @@ -78,7 +78,7 @@ def __init__(self, app: web.Application): """ self._app = app - def collect_video_metrics(self, video_track: MediaStreamTrack) -> Dict[str, Any]: + async def collect_video_metrics(self, video_track: MediaStreamTrack) -> Dict[str, Any]: """Collects real-time statistics for a video track. Args: @@ -88,9 +88,9 @@ def collect_video_metrics(self, video_track: MediaStreamTrack) -> Dict[str, Any] A dictionary containing FPS-related statistics. """ return { - "fps": video_track.fps, - "minute_avg_fps": video_track.minute_avg_fps, - "minute_fps_array": video_track.minute_fps_array, + "fps": await video_track.fps, + "minute_avg_fps": await video_track.average_fps, + "minute_fps_array": await video_track.fps_measurements, } async def collect_all_stream_metrics(self, _) -> web.Response: @@ -101,7 +101,7 @@ async def collect_all_stream_metrics(self, _) -> web.Response: """ video_tracks = self._app.get("video_tracks", {}) all_stats = { - stream_id: self.get_video_track_stats(track) + stream_id: await self.collect_video_metrics(track) for stream_id, track in video_tracks.items() } @@ -123,7 +123,7 @@ async def collect_stream_metrics_by_id(self, request: web.Request) -> web.Respon video_track = self._app["video_tracks"].get(stream_id) if video_track: - stats = self.get_video_track_stats(video_track) + stats = await self.collect_video_metrics(video_track) else: stats = {"error": "Stream not found"} From a6a46ff37bcd4f88c4594645af9d7266c2035b2a Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Thu, 20 Feb 2025 12:50:58 +0100 Subject: [PATCH 08/10] chore: remove duplicate code This commit fixes duplicate code that was introduced due to merge conflicts. --- server/app.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/server/app.py b/server/app.py index ff9f51a5..67014535 100644 --- a/server/app.py +++ b/server/app.py @@ -12,13 +12,6 @@ torch.cuda.init() -import torch - -# Initialize CUDA before any other imports to prevent core dump. -if torch.cuda.is_available(): - torch.cuda.init() - - from twilio.rest import Client from aiohttp import web from aiortc import ( @@ -249,6 +242,7 @@ def on_datachannel(channel): async def on_message(message): try: params = json.loads(message) + if params.get("type") == "get_nodes": nodes_info = await pipeline.get_nodes_info() response = { @@ -387,13 +381,13 @@ async def on_shutdown(app: web.Application): app.on_startup.append(on_startup) app.on_shutdown.append(on_shutdown) - app.router.add_post("/offer", offer) - app.router.add_post("/prompt", set_prompt) - - # WebRTC signalling and control routes. app.router.add_get("/", health) app.router.add_get("/health", health) + # WebRTC signalling and control routes. + app.router.add_post("/offer", offer) + app.router.add_post("/prompt", set_prompt) + # Add routes for getting stream statistics. stream_stats = StreamStats(app) app.router.add_get("/streams/stats", stream_stats.collect_all_stream_metrics) From 701055362e36437c618c560d4418c4e18770a7ed Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Fri, 21 Feb 2025 17:18:00 +0100 Subject: [PATCH 09/10] feat: add timestamp to stream stats This commit adds the timestamp since the stream started to the stream stats object. --- server/app.py | 27 ++++++++++++++++++++------- server/utils.py | 1 + 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/server/app.py b/server/app.py index 67014535..3380d08b 100644 --- a/server/app.py +++ b/server/app.py @@ -59,6 +59,7 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): self._lock = asyncio.Lock() self._fps_interval_frame_count = 0 self._last_fps_calculation_time = time.monotonic() + self._fps_loop_start_time = self._last_fps_calculation_time self._fps = 0.0 self._fps_measurements = deque(maxlen=60) self._running_event = asyncio.Event() @@ -76,15 +77,16 @@ async def collect_frames(self): except Exception as e: await self.pipeline.cleanup() raise Exception(f"Error collecting video frames: {str(e)}") - + async def _calculate_fps_loop(self): """Loop to calculate FPS periodically.""" await self._running_event.wait() + self._fps_loop_start_time = time.monotonic() while self.readyState != "ended": await asyncio.sleep(1) # Calculate FPS every second. async with self._lock: - current_time = time.monotonic() - time_diff = current_time - self._last_fps_calculation_time + current_time_monotonic = time.monotonic() + time_diff = current_time_monotonic - self._last_fps_calculation_time if time_diff > 0: self._fps = self._fps_interval_frame_count / time_diff self._fps_measurements.append( @@ -92,9 +94,9 @@ async def _calculate_fps_loop(self): ) # Store the FPS measurement # Reset start_time and frame_count for the next interval. - self._last_fps_calculation_time = current_time + self._last_fps_calculation_time = current_time_monotonic self._fps_interval_frame_count = 0 - + @property async def fps(self) -> float: """Get the current output frames per second (FPS). @@ -127,17 +129,28 @@ async def average_fps(self) -> float: return 0.0 return sum(self._fps_measurements) / len(self._fps_measurements) + @property + async def last_fps_calculation_time(self) -> float: + """Get the elapsed time since the last FPS calculation. + + Returns: + The elapsed time in seconds since the last FPS calculation. + """ + async with self._lock: + return self._last_fps_calculation_time - self._fps_loop_start_time + async def recv(self): processed_frame = await self.pipeline.get_processed_video_frame() - + # Increment frame count for FPS calculation. async with self._lock: self._fps_interval_frame_count += 1 if not self._running_event.is_set(): self._running_event.set() - + return processed_frame + class AudioStreamTrack(MediaStreamTrack): kind = "audio" def __init__(self, track: MediaStreamTrack, pipeline): diff --git a/server/utils.py b/server/utils.py index c8566857..106b607b 100644 --- a/server/utils.py +++ b/server/utils.py @@ -88,6 +88,7 @@ async def collect_video_metrics(self, video_track: MediaStreamTrack) -> Dict[str A dictionary containing FPS-related statistics. """ return { + "timestamp": await video_track.last_fps_calculation_time, "fps": await video_track.fps, "minute_avg_fps": await video_track.average_fps, "minute_fps_array": await video_track.fps_measurements, From ad5970b9b4bb89961565b232e9e3b8acc15a0e53 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Mon, 3 Mar 2025 16:56:44 +0100 Subject: [PATCH 10/10] refactor: improve average fps stats format This commit adds the metric timestamp in the `minute_fps_array` so it can be used by the client to plot the data or perform calculations. Co-authored-by: Evan Mullins --- server/app.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/server/app.py b/server/app.py index 2f449a84..e15343f5 100644 --- a/server/app.py +++ b/server/app.py @@ -59,8 +59,8 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline): self._lock = asyncio.Lock() self._fps_interval_frame_count = 0 - self._last_fps_calculation_time = time.monotonic() - self._fps_loop_start_time = self._last_fps_calculation_time + self._last_fps_calculation_time = None + self._fps_loop_start_time = time.monotonic() self._fps = 0.0 self._fps_measurements = deque(maxlen=60) self._running_event = asyncio.Event() @@ -84,19 +84,22 @@ async def _calculate_fps_loop(self): await self._running_event.wait() self._fps_loop_start_time = time.monotonic() while self.readyState != "ended": - await asyncio.sleep(1) # Calculate FPS every second. async with self._lock: - current_time_monotonic = time.monotonic() - time_diff = current_time_monotonic - self._last_fps_calculation_time - if time_diff > 0: + current_time = time.monotonic() + if self._last_fps_calculation_time is not None: + time_diff = current_time - self._last_fps_calculation_time self._fps = self._fps_interval_frame_count / time_diff self._fps_measurements.append( - self._fps - ) # Store the FPS measurement + { + "timestamp": current_time - self._fps_loop_start_time, + "fps": self._fps, + } + ) # Store the FPS measurement with timestamp - # Reset start_time and frame_count for the next interval. - self._last_fps_calculation_time = current_time_monotonic - self._fps_interval_frame_count = 0 + # Reset start_time and frame_count for the next interval. + self._last_fps_calculation_time = current_time + self._fps_interval_frame_count = 0 + await asyncio.sleep(1) # Calculate FPS every second. @property async def fps(self) -> float: @@ -128,7 +131,9 @@ async def average_fps(self) -> float: async with self._lock: if not self._fps_measurements: return 0.0 - return sum(self._fps_measurements) / len(self._fps_measurements) + return sum( + measurement["fps"] for measurement in self._fps_measurements + ) / len(self._fps_measurements) @property async def last_fps_calculation_time(self) -> float: @@ -405,7 +410,9 @@ async def on_shutdown(app: web.Application): # Add routes for getting stream statistics. stream_stats = StreamStats(app) app.router.add_get("/streams/stats", stream_stats.collect_all_stream_metrics) - app.router.add_get("/stream/{stream_id}/stats", stream_stats.collect_stream_metrics_by_id) + app.router.add_get( + "/stream/{stream_id}/stats", stream_stats.collect_stream_metrics_by_id + ) # Add hosted platform route prefix. # NOTE: This ensures that the local and hosted experiences have consistent routes.