Skip to content

Commit abd6e32

Browse files
varshith15eliteproxpschroedlleszko
authored
feat: comfystream v0.0.4 ai-runner port (#479)
- Improved pipeline flexibility and async handling. - Fixed environment, event loop, and CI issues. - Updated base images and cleaned dependencies. - Added frame processing and warm video interface. - Reverted and optimized configurations for stability. --------- Co-authored-by: Elite Encoder <john@eliteencoder.net> Co-authored-by: Peter Schroedl <peter_schroedl@me.com> Co-authored-by: Rafal Leszko <rafal@livepeer.org>
1 parent aa17006 commit abd6e32

File tree

13 files changed

+267
-198
lines changed

13 files changed

+267
-198
lines changed

.devcontainer/devcontainer.json

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,52 @@
11
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
22
// README at: https://github.com/devcontainers/templates/tree/main/src/ubuntu
33
{
4-
"name": "ai-runner",
5-
// Image to use for the dev container. More info: https://containers.dev/guide/dockerfile.
6-
"build": {
7-
"dockerfile": "../Dockerfile",
8-
// "dockerfile": "../docker/Dockerfile.text_to_speech",
9-
"context": ".."
10-
},
11-
"runArgs": [
12-
"--gpus=all"
13-
],
14-
// Features to add to the dev container. More info: https://containers.dev/features.
15-
// Configure tool-specific properties.
16-
"customizations": {
17-
"vscode": {
18-
"settings": {},
19-
"extensions": [
20-
"ms-python.python",
21-
"ms-python.black-formatter"
22-
]
23-
}
24-
},
25-
// Use 'forwardPorts' to make a list of ports inside the container available locally.
26-
"forwardPorts": [
27-
8000
28-
],
29-
// Use 'mounts' to make a list of local folders available inside the container.
30-
"mounts": [
31-
// "source=${localWorkspaceFolder}/models,target=/models,type=bind"
32-
"source=${localEnv:HOME}/.lpData/models,target=/models,type=bind"
33-
]
4+
"name": "ai-runner",
5+
"initializeCommand": "ls",
6+
// Image to use for the dev container. More info: https://containers.dev/guide/dockerfile.
7+
"containerEnv": {
8+
"PIPELINE": "comfyui"
9+
},
10+
"build": {
11+
"dockerfile": "../runner/docker/Dockerfile.live-app__PIPELINE__",
12+
"args": {
13+
"PIPELINE": "comfyui"
14+
},
15+
// "dockerfile": "../Dockerfile",
16+
// "dockerfile": "../docker/Dockerfile.text_to_speech",
17+
"context": "../runner"
18+
},
19+
"runArgs": [
20+
"--gpus=all"
21+
],
22+
// Features to add to the dev container. More info: https://containers.dev/features.
23+
// Configure tool-specific properties.
24+
"customizations": {
25+
"vscode": {
26+
"settings": {
27+
"python.defaultInterpreterPath": "/workspace/miniconda3/envs/comfystream/bin/python",
28+
"python.venvPath": "/workspace/miniconda3/envs",
29+
"python.terminal.activateEnvInCurrentTerminal": false,
30+
"python.terminal.activateEnvironment": true,
31+
"terminal.integrated.shellIntegration.enabled": true
32+
},
33+
"extensions": [
34+
"ms-python.python",
35+
"ms-python.black-formatter"
36+
]
37+
}
38+
},
39+
// Use 'forwardPorts' to make a list of ports inside the container available locally.
40+
"forwardPorts": [
41+
8000
42+
],
43+
"appPort": [
44+
"8000:8000"
45+
],
46+
// Use 'mounts' to make a list of local folders available inside the container.
47+
"mounts": [
48+
// "source=${localWorkspaceFolder}/models,target=/models,type=bind"
49+
"source=${localEnv:HOME}/models/ComfyUI--models/,target=/workspace/ComfyUI/models,type=bind",
50+
"source=${localEnv:HOME}/models/ComfyUI--output/,target=/workspace/ComfyUI/output,type=bind"
51+
]
3452
}

runner/app/live/pipelines/comfyui.py

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
import os
22
import json
33
import torch
4-
from PIL import Image
54
import asyncio
65
import numpy as np
6+
from PIL import Image
77
from typing import Union
88
from pydantic import BaseModel, field_validator
99

1010
from .interface import Pipeline
1111
from comfystream.client import ComfyStreamClient
12+
from trickle import VideoFrame, VideoOutput
1213

1314
import logging
1415

1516
COMFY_UI_WORKSPACE_ENV = "COMFY_UI_WORKSPACE"
17+
WARMUP_RUNS = 1
1618
DEFAULT_WORKFLOW_JSON = json.loads("""
1719
{
1820
"1": {
@@ -263,46 +265,54 @@ def validate_prompt(cls, v) -> dict:
263265

264266

265267
class ComfyUI(Pipeline):
266-
def __init__(self, **params):
267-
super().__init__(**params)
268-
268+
def __init__(self):
269269
comfy_ui_workspace = os.getenv(COMFY_UI_WORKSPACE_ENV)
270270
self.client = ComfyStreamClient(cwd=comfy_ui_workspace)
271271
self.params: ComfyUIParams
272+
self.video_incoming_frames = asyncio.Queue()
272273

273-
self.update_params(**params)
274+
async def initialize(self, **params):
275+
new_params = ComfyUIParams(**params)
276+
logging.info(f"Initializing ComfyUI Pipeline with prompt: {new_params.prompt}")
277+
# TODO: currently its a single prompt, but need to support multiple prompts
278+
await self.client.set_prompts([new_params.prompt])
279+
self.params = new_params
280+
281+
# Warm up the pipeline
282+
dummy_frame = VideoFrame(None, 0, 0)
283+
dummy_frame.side_data.input = torch.randn(1, 512, 512, 3)
274284

275-
# Comfy will cache nodes that only need to be run once (i.e. a node that loads model weights)
276-
# We can run the prompt once before actual inputs come in to "warmup"
277-
warmup_input = torch.randn(1, 512, 512, 3)
278-
asyncio.get_event_loop().run_until_complete(self.client.queue_prompt(warmup_input))
285+
for _ in range(WARMUP_RUNS):
286+
self.client.put_video_input(dummy_frame)
287+
_ = await self.client.get_video_output()
288+
logging.info("Pipeline initialization and warmup complete")
279289

280-
def process_frame(self, image: Image.Image) -> Image.Image:
281-
# Normalize by dividing by 255 to ensure the tensor values are between 0 and 1
282-
image_np = np.array(image.convert("RGB")).astype(np.float32) / 255.0
283-
# Convert from numpy to torch.Tensor
284-
# Initially, the torch.Tensor will have shape HWC but we want BHWC
285-
# unsqueeze(0) will add a batch dimension at the beginning of 1 which means we just have 1 image
286-
image_tensor = torch.tensor(image_np).unsqueeze(0)
290+
async def put_video_frame(self, frame: VideoFrame):
291+
image_np = np.array(frame.image.convert("RGB")).astype(np.float32) / 255.0
292+
frame.side_data.input = torch.tensor(image_np).unsqueeze(0)
293+
frame.side_data.skipped = True
294+
self.client.put_video_input(frame)
295+
await self.video_incoming_frames.put(frame)
287296

288-
# Process using ComfyUI pipeline
289-
result_tensor = asyncio.get_event_loop().run_until_complete(self.client.queue_prompt(image_tensor))
297+
async def get_processed_video_frame(self, request_id):
298+
result_tensor = await self.client.get_video_output()
299+
frame = await self.video_incoming_frames.get()
300+
while frame.side_data.skipped:
301+
frame = await self.video_incoming_frames.get()
290302

291-
# Convert back from Tensor to PIL.Image
292303
result_tensor = result_tensor.squeeze(0)
293304
result_image_np = (result_tensor * 255).byte()
294305
result_image = Image.fromarray(result_image_np.cpu().numpy())
295-
return result_image
296-
297-
def update_params(self, **params):
306+
return VideoOutput(frame.replace_image(result_image), request_id)
307+
308+
async def update_params(self, **params):
298309
new_params = ComfyUIParams(**params)
299-
logging.info(f"ComfyUI Pipeline Prompt: {new_params.prompt}")
300-
self.client.set_prompt(new_params.prompt)
310+
logging.info(f"Updating ComfyUI Pipeline Prompt: {new_params.prompt}")
311+
# TODO: currently its a single prompt, but need to support multiple prompts
312+
await self.client.update_prompts([new_params.prompt])
301313
self.params = new_params
302314

303-
#TODO: This is a hack to stop the ComfyStreamClient. Use the comfystream api to stop the client in 0.0.2
304315
async def stop(self):
305316
logging.info("Stopping ComfyUI pipeline")
306-
if self.client.comfy_client.is_running:
307-
await self.client.comfy_client.__aexit__(None, None, None)
317+
await self.client.stop()
308318
logging.info("ComfyUI pipeline stopped")

runner/app/live/pipelines/interface.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
from abc import ABC, abstractmethod
21
from PIL import Image
2+
from abc import ABC, abstractmethod
3+
from trickle import VideoFrame, VideoOutput
34

45
class Pipeline(ABC):
56
"""Abstract base class for image processing pipelines.
@@ -22,21 +23,38 @@ def __init__(self, **params):
2223
pass
2324

2425
@abstractmethod
25-
def process_frame(self, frame: Image.Image) -> Image.Image:
26-
"""Process a single frame through the pipeline.
27-
28-
Called sequentially with each frame from the stream.
26+
async def put_video_frame(self, frame: VideoFrame):
27+
"""Put a frame into the pipeline.
2928
3029
Args:
31-
frame: Input PIL Image
30+
frame: Input VideoFrame
31+
"""
32+
pass
33+
34+
@abstractmethod
35+
async def get_processed_video_frame(self, request_id: str = '') -> VideoOutput:
36+
"""Get a processed frame from the pipeline.
3237
3338
Returns:
34-
Processed PIL Image
39+
Processed VideoFrame
40+
"""
41+
pass
42+
43+
@abstractmethod
44+
async def initialize(self, **params):
45+
"""Initialize the pipeline with parameters and warm up the processing.
46+
47+
This method sets up the initial pipeline state and performs warmup operations.
48+
Must maintain valid state on success or restore previous state on failure.
49+
Starts the pipeline loops in comfystream.
50+
51+
Args:
52+
**params: Implementation-specific parameters
3553
"""
3654
pass
3755

3856
@abstractmethod
39-
def update_params(self, **params):
57+
async def update_params(self, **params):
4058
"""Update pipeline parameters.
4159
4260
Must maintain valid state on success or restore previous state on failure.
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from .interface import Pipeline
22

3-
def load_pipeline(name: str, **params) -> Pipeline:
3+
def load_pipeline(name: str) -> Pipeline:
44
if name == "comfyui":
55
from .comfyui import ComfyUI
6-
return ComfyUI(**params)
6+
return ComfyUI()
77
elif name == "noop":
88
from .noop import Noop
9-
return Noop(**params)
9+
return Noop()
1010
raise ValueError(f"Unknown pipeline: {name}")

runner/app/live/pipelines/noop.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,29 @@
1-
from PIL import Image
21
import logging
2+
import asyncio
3+
from PIL import Image
4+
35

46
from .interface import Pipeline
7+
from trickle import VideoFrame, VideoOutput
58

69
class Noop(Pipeline):
7-
def __init__(self, **params):
8-
super().__init__(**params)
10+
def __init__(self):
11+
self.frame_queue = asyncio.Queue()
912

10-
def process_frame(self, image: Image.Image) -> Image.Image:
11-
return image.convert("RGB")
13+
async def put_video_frame(self, frame: VideoFrame):
14+
await self.frame_queue.put(frame)
1215

13-
def update_params(self, **params):
16+
async def get_processed_video_frame(self) -> VideoOutput:
17+
frame = await self.frame_queue.get()
18+
processed_frame = frame.image.convert("RGB")
19+
return VideoOutput(frame.replace_image(processed_frame))
20+
21+
async def initialize(self, **params):
22+
logging.info(f"Initializing Noop pipeline with params: {params}")
23+
logging.info("Pipeline initialization complete")
24+
25+
async def update_params(self, **params):
1426
logging.info(f"Updating params: {params}")
27+
28+
async def stop(self):
29+
logging.info("Stopping pipeline")

0 commit comments

Comments
 (0)