diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index c06c2e0e3..942ab0f10 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -1,16 +1,16 @@ import os import json import torch -import asyncio import numpy as np from PIL import Image from typing import Union from pydantic import BaseModel, field_validator import pathlib +import av from .interface import Pipeline -from comfystream.client import ComfyStreamClient -from trickle import VideoFrame, VideoOutput +from comfystream.pipeline import Pipeline as ComfyStreamPipeline +from trickle import VideoFrame, VideoOutput, AudioFrame, AudioOutput import logging @@ -52,52 +52,61 @@ def validate_prompt(cls, v) -> dict: class ComfyUI(Pipeline): def __init__(self): comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV) - self.client = ComfyStreamClient(cwd=comfy_ui_workspace) + self.comfystream = ComfyStreamPipeline(width=512, height=512, cwd=comfy_ui_workspace) self.params: ComfyUIParams - self.video_incoming_frames: asyncio.Queue[VideoOutput] = asyncio.Queue() async def initialize(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Initializing ComfyUI Pipeline with prompt: {new_params.prompt}") - # TODO: currently its a single prompt, but need to support multiple prompts - await self.client.set_prompts([new_params.prompt]) + await self.comfystream.set_prompts([new_params.prompt]) self.params = new_params # Warm up the pipeline - dummy_frame = VideoFrame(None, 0, 0) - dummy_frame.side_data.input = torch.randn(1, 512, 512, 3) - - for _ in range(WARMUP_RUNS): - self.client.put_video_input(dummy_frame) - _ = await self.client.get_video_output() + await self.comfystream.warm_video() logging.info("Pipeline initialization and warmup complete") async def put_video_frame(self, frame: VideoFrame, request_id: str): - image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0 - frame.side_data.input = torch.tensor(image_np).unsqueeze(0) - frame.side_data.skipped = True - self.client.put_video_input(frame) - await self.video_incoming_frames.put(VideoOutput(frame, request_id)) - - async def get_processed_video_frame(self): - result_tensor = await self.client.get_video_output() - out = await self.video_incoming_frames.get() - while out.frame.side_data.skipped: - out = await self.video_incoming_frames.get() - - result_tensor = result_tensor.squeeze(0) - result_image_np = (result_tensor * 255).byte() - result_image = Image.fromarray(result_image_np.cpu().numpy()) - return out.replace_image(result_image) + await self.comfystream.put_video_frame(self._convert_to_av_frame(frame), request_id) + + async def put_audio_frame(self, frame: AudioFrame, request_id: str): + await self.comfystream.put_audio_frame(self._convert_to_av_frame(frame), request_id) + + async def get_processed_video_frame(self) -> VideoOutput: + av_frame = await self.comfystream.get_processed_video_frame() + video_frame = VideoFrame.from_av_video(av_frame) + return VideoOutput(video_frame, av_frame.side_data.request_id) + + async def get_processed_audio_frame(self) -> AudioOutput: + av_frame = await self.comfystream.get_processed_audio_frame() + audio_frame = AudioFrame.from_av_audio(av_frame) + return AudioOutput(audio_frame, av_frame.side_data.request_id) async def update_params(self, **params): new_params = ComfyUIParams(**params) logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}") - # TODO: currently its a single prompt, but need to support multiple prompts - await self.client.update_prompts([new_params.prompt]) + await self.comfystream.update_prompts([new_params.prompt]) self.params = new_params async def stop(self): logging.info("Stopping ComfyUI pipeline") - await self.client.cleanup() + await self.comfystream.cleanup() logging.info("ComfyUI pipeline stopped") + + def _convert_to_av_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Union[av.VideoFrame, av.AudioFrame]: + """Convert trickle frame to av frame""" + if isinstance(frame, VideoFrame): + av_frame = av.VideoFrame.from_ndarray( + np.array(frame.image.convert("RGB")), + format='rgb24' + ) + elif isinstance(frame, AudioFrame): + av_frame = av.AudioFrame.from_ndarray( + frame.samples.reshape(-1, 1), + layout='mono', + rate=frame.rate + ) + + # Common frame properties + av_frame.pts = frame.timestamp + av_frame.time_base = frame.time_base + return av_frame diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index a2b603d5c..35d6a8ca6 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -16,7 +16,7 @@ async def put_video_frame(self, frame: VideoFrame, request_id: str): async def get_processed_video_frame(self) -> VideoOutput: out = await self.frame_queue.get() processed_frame = out.image.convert("RGB") - return out.replace_image(processed_frame) + return VideoOutput(out.frame.replace_image(processed_frame), out.request_id) async def initialize(self, **params): logging.info(f"Initializing Noop pipeline with params: {params}") diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index dcdca0b4e..7ffe3b40c 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -113,7 +113,6 @@ def get_recent_logs(self, n=None) -> list[str]: def process_loop(self): self._setup_logging() - pipeline = None # Ensure CUDA environment is available inside the subprocess. # Multiprocessing (spawn mode) does not inherit environment variables by default, @@ -146,6 +145,7 @@ def _handle_logging_params(self, params: dict) -> dict: async def _initialize_pipeline(self): try: + pipeline = None stream_id = "" params = {} try: diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index ae6f902af..b857754d7 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -55,11 +55,13 @@ async def start(self, params: dict): run_in_background("ingress_loop", self.run_ingress_loop()), run_in_background("egress_loop", self.run_egress_loop()), run_in_background("report_status_loop", self.report_status_loop()), - run_in_background("control_loop", self.run_control_loop()), + run_in_background("control_loop", self.run_control_loop()) ] # auxiliary tasks that are not critical to the supervisor, but which we want to run # TODO: maybe remove this since we had to move the control loop to main tasks - self.auxiliary_tasks: list[asyncio.Task] = [] + self.auxiliary_tasks = [ + + ] self.tasks_supervisor_task = run_in_background( "tasks_supervisor", self.tasks_supervisor() ) diff --git a/runner/app/live/trickle/frame.py b/runner/app/live/trickle/frame.py index 9101a8e4d..963d1585a 100644 --- a/runner/app/live/trickle/frame.py +++ b/runner/app/live/trickle/frame.py @@ -10,6 +10,7 @@ class SideData: """ skipped: bool = True input: Image.Image | np.ndarray | None + request_id: str = '' class InputFrame: """ diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index 74b1b2811..31733ad05 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base@sha256:4435bad85c3a2fce2b491135bee49eedb8edbd8bdf5d124cb0a95a1d4ecb6856 +ARG BASE_IMAGE=livepeer/comfyui-base@sha256:9364f249297ea9aebc053f361a41efa78ce0fbbb15b73b4a73215f3edb8ada2f FROM ${BASE_IMAGE} # -----------------------------------------------------------------------------