Skip to content

Commit 92f771d

Browse files
ieavesSamitHuangGaohan123
authored
[Frontend] Rewrite video API for async job lifecycle (vllm-project#1665)
Signed-off-by: Ian Eaves <ian.k.eaves@gmail.com> Signed-off-by: Samit <285365963@qq.com> Co-authored-by: Samit <285365963@qq.com> Co-authored-by: Gao Han <gaohan19@huawei.com>
1 parent e0b7105 commit 92f771d

File tree

22 files changed

+2005
-376
lines changed

22 files changed

+2005
-376
lines changed

apps/ComfyUI-vLLM-Omni/comfyui_vllm_omni/utils/api_client.py

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
Original source at https://github.com/dougbtv/comfyui-vllm-omni, distributed under the MIT License.
77
"""
88

9+
import asyncio
910
import json
1011
from typing import Any
1112

@@ -18,8 +19,8 @@
1819
audio_to_base64,
1920
base64_to_audio,
2021
base64_to_image_tensor,
21-
base64_to_video,
2222
bytes_to_audio,
23+
bytes_to_video,
2324
image_tensor_to_base64,
2425
image_tensor_to_png_bytes,
2526
video_to_base64,
@@ -31,10 +32,43 @@
3132
logger = get_logger(__name__)
3233

3334

35+
async def url_json(session: aiohttp.ClientSession, url: str, verb: str = "get", **kwargs) -> dict[str, Any]:
36+
try:
37+
async with getattr(session, verb)(url, **kwargs) as response:
38+
if not response.ok:
39+
error_text = await response.text()
40+
raise (ValueError if response.status < 500 else RuntimeError)(
41+
f"vLLM-Omni API returned status {response.status}: {error_text}"
42+
)
43+
try:
44+
return await response.json()
45+
except aiohttp.ContentTypeError as e:
46+
raise RuntimeError(f"Invalid JSON response from vLLM-Omni: {e}")
47+
except aiohttp.ClientError as e:
48+
raise RuntimeError(f"Network error connecting to vLLM-Omni at {url}: {e}")
49+
50+
51+
async def url_bytes(session: aiohttp.ClientSession, url: str, verb: str = "get", **kwargs) -> bytes:
52+
try:
53+
async with getattr(session, verb)(url, **kwargs) as response:
54+
if not response.ok:
55+
error_text = await response.text()
56+
raise (ValueError if response.status < 500 else RuntimeError)(
57+
f"vLLM-Omni API returned status {response.status}: {error_text}"
58+
)
59+
return await response.read()
60+
except aiohttp.ClientError as e:
61+
raise RuntimeError(f"Network error connecting to vLLM-Omni at {url}: {e}")
62+
63+
3464
class VLLMOmniClient:
35-
def __init__(self, base_url: str, timeout: float | None = None):
65+
def __init__(
66+
self, base_url: str, timeout: float | None = None, poll_interval: float = 5.0, max_poll_duration: float = 60 * 5
67+
):
3668
self.base_url = base_url
3769
self.timeout = aiohttp.ClientTimeout(total=timeout)
70+
self.poll_interval = poll_interval
71+
self.max_poll_duration = max_poll_duration
3872

3973
async def generate_image(
4074
self,
@@ -262,33 +296,41 @@ async def generate_video(
262296
content_type="image/png",
263297
)
264298

265-
url = self.base_url + "/videos"
266299
async with aiohttp.ClientSession(timeout=self.timeout) as session:
300+
# Start the video generation job
301+
url = f"{self.base_url}/videos"
302+
data = await url_json(session, url, "post", data=form)
303+
if (job_id := data.get("id", None)) is None:
304+
raise RuntimeError("API response missing job 'id' field - expected OpenAI compliant format")
305+
if (job_status := data.get("status", None)) is None:
306+
raise RuntimeError("API response missing job 'status' field - expected OpenAI compliant format")
307+
308+
# Poll for video generation job completion
309+
deadline = asyncio.get_running_loop().time() + self.max_poll_duration
310+
url = f"{self.base_url}/videos/{job_id}"
311+
while job_status not in {"completed", "failed"}:
312+
await asyncio.sleep(self.poll_interval)
313+
314+
data = await url_json(session, url)
315+
if (job_status := data.get("status", None)) is None:
316+
raise RuntimeError("API response missing job 'status' field - expected OpenAI compliant format")
317+
if asyncio.get_running_loop().time() >= deadline:
318+
raise RuntimeError(f"Timed out waiting for video job {job_id} to complete")
319+
320+
if job_status == "failed":
321+
raise RuntimeError(f"Video job failed: {data}")
322+
323+
# Retrieve completed content
324+
video_bytes = await url_bytes(session, f"{url}/content")
325+
326+
# Decode video and make a best effort at cleaning up server resources
267327
try:
268-
async with session.post(url, data=form) as response:
269-
if not response.ok:
270-
error_text = await response.text()
271-
raise (ValueError if response.status < 500 else RuntimeError)(
272-
f"vLLM-Omni API returned status {response.status}: {error_text}"
273-
)
274-
275-
try:
276-
data = await response.json()
277-
except aiohttp.ContentTypeError as e:
278-
raise RuntimeError(f"Invalid JSON response from vLLM-Omni: {e}")
279-
except aiohttp.ClientError as e:
280-
raise RuntimeError(f"Network error connecting to vLLM-Omni at {url}: {e}")
281-
282-
if "data" not in data:
283-
raise RuntimeError("API response missing 'data' field - expected OpenAI DALL-E format")
284-
if not data["data"]:
285-
raise RuntimeError("API returned empty data array")
286-
try:
287-
base64_str = data["data"][0]["b64_json"]
288-
except (KeyError, IndexError):
289-
raise RuntimeError("API response missing 'b64_json' field in first data item")
290-
291-
return base64_to_video(base64_str)
328+
return bytes_to_video(video_bytes)
329+
finally:
330+
try:
331+
await url_json(session, url, "delete")
332+
except Exception as exc:
333+
logger.warning("Failed to clean up video job %s: %s", job_id, exc)
292334

293335
async def generate_understanding_chat_completion(
294336
self,

apps/ComfyUI-vLLM-Omni/comfyui_vllm_omni/utils/format.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,7 @@ def video_to_base64(video: VideoInput, filename: str = "video.mp4") -> str:
143143
return f"data:{mime_type};base64,{base64_str}"
144144

145145

146-
def base64_to_video(base64_str: str) -> VideoInput:
147-
if base64_str.startswith("data:video"):
148-
_, base64_str = base64_str.split(",", 1)
149-
150-
try:
151-
video_bytes = base64.b64decode(base64_str)
152-
except Exception as e:
153-
raise ValueError(f"Invalid base64 string: {e}")
154-
146+
def bytes_to_video(video_bytes: bytes) -> VideoInput:
155147
video_buffer = BytesIO(video_bytes)
156148

157149
try:
@@ -208,6 +200,18 @@ def base64_to_video(base64_str: str) -> VideoInput:
208200
return InputImpl.VideoFromComponents(components)
209201

210202

203+
def base64_to_video(base64_str: str) -> VideoInput:
204+
if base64_str.startswith("data:video"):
205+
_, base64_str = base64_str.split(",", 1)
206+
207+
try:
208+
video_bytes = base64.b64decode(base64_str)
209+
except Exception as e:
210+
raise ValueError(f"Invalid base64 string: {e}")
211+
212+
return bytes_to_video(video_bytes)
213+
214+
211215
def audio_to_bytes(audio: AudioInput, filename: str = "audio.mp3", quality: str = "128k") -> BytesIO:
212216
waveform = audio["waveform"][0] # Shape: (C, T)
213217
sample_rate = audio["sample_rate"]

apps/ComfyUI-vLLM-Omni/comfyui_vllm_omni/utils/validators.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,17 @@ def add_sampling_parameters_to_stage(
7070

7171
stages = pipeline_spec["stages"]
7272
if isinstance(sampling_param_list, dict):
73+
sampling_param_list = sampling_param_list.__class__(sampling_param_list)
7374
sampling_param_list.update(params_to_add)
7475
elif sampling_param_list is None:
7576
sampling_param_list = params_to_add.copy()
7677
else:
7778
for i, stage in enumerate(stages):
7879
if stage == stage_type:
79-
sampling_param_list[i].update(params_to_add)
80+
stage_param = sampling_param_list[i]
81+
stage_param = stage_param.__class__(stage_param)
82+
stage_param.update(params_to_add)
83+
sampling_param_list[i] = stage_param
8084

8185
return sampling_param_list
8286

docs/user_guide/examples/online_serving/image_to_video.md

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,35 @@ The script allows overriding:
2929
- `CACHE_BACKEND` (default: `none`)
3030
- `ENABLE_CACHE_DIT_SUMMARY` (default: `0`)
3131

32+
## Async Job Behavior
33+
34+
`POST /v1/videos` is asynchronous. It creates a video job and immediately
35+
returns metadata like the job ID and initial `queued` status. To get the final
36+
artifact, poll the job status and then download the completed file from the
37+
content endpoint.
38+
39+
The main endpoints are:
40+
- `POST /v1/videos`: create a video generation job
41+
- `GET /v1/videos/{video_id}`: retrieve the current job status and metadata
42+
- `GET /v1/videos`: list stored video jobs
43+
- `GET /v1/videos/{video_id}/content`: download the generated video file
44+
- `DELETE /v1/videos/{video_id}`: delete the job and any stored output
45+
46+
## Storage
47+
48+
Generated video files are stored on local disk by the async video API.
49+
Local file storage behavior can be controlled via the following environment variables:
50+
51+
- `VLLM_OMNI_STORAGE_PATH`: directory used for generated files (default: `/tmp/storage`)
52+
- `VLLM_OMNI_STORAGE_MAX_CONCURRENCY`: max concurrent save/delete operations (default: `4`)
53+
54+
Example:
55+
56+
```bash
57+
export VLLM_OMNI_STORAGE_PATH=/var/tmp/vllm-omni-videos
58+
export VLLM_OMNI_STORAGE_MAX_CONCURRENCY=8
59+
```
60+
3261
## API Calls
3362

3463
### Method 1: Using curl
@@ -38,7 +67,7 @@ The script allows overriding:
3867
bash run_curl_image_to_video.sh
3968

4069
# Or execute directly (OpenAI-style multipart)
41-
curl -X POST http://localhost:8091/v1/videos \
70+
create_response=$(curl -s http://localhost:8091/v1/videos \
4271
-H "Accept: application/json" \
4372
-F "prompt=A bear playing with yarn, smooth motion" \
4473
-F "negative_prompt=low quality, blurry, static" \
@@ -52,7 +81,23 @@ curl -X POST http://localhost:8091/v1/videos \
5281
-F "guidance_scale_2=1.0" \
5382
-F "boundary_ratio=0.875" \
5483
-F "flow_shift=12.0" \
55-
-F "seed=42" | jq -r '.data[0].b64_json' | base64 -d > wan22_i2v_output.mp4
84+
-F "seed=42")
85+
86+
video_id=$(echo "$create_response" | jq -r '.id')
87+
while true; do
88+
status=$(curl -s "http://localhost:8091/v1/videos/${video_id}" | jq -r '.status')
89+
if [ "$status" = "completed" ]; then
90+
break
91+
fi
92+
if [ "$status" = "failed" ]; then
93+
echo "Video generation failed"
94+
exit 1
95+
fi
96+
sleep 2
97+
done
98+
99+
curl -s "http://localhost:8091/v1/videos/${video_id}" | jq .
100+
curl -L "http://localhost:8091/v1/videos/${video_id}/content" -o wan22_i2v_output.mp4
56101
```
57102

58103
## Request Format
@@ -66,6 +111,18 @@ curl -X POST http://localhost:8091/v1/videos \
66111
-F "input_reference=@/path/to/qwen-bear.png"
67112
```
68113

114+
### Alternative JSON-Safe Reference Input
115+
116+
Use `image_reference` when you want to pass a URL or JSON-safe image reference
117+
instead of uploading a file. Do not send `input_reference` and
118+
`image_reference` together.
119+
120+
```bash
121+
curl -X POST http://localhost:8091/v1/videos \
122+
-F "prompt=A bear playing with yarn, smooth motion" \
123+
-F 'image_reference={"image_url":"https://example.com/qwen-bear.png"}'
124+
```
125+
69126
### Generation with Parameters
70127

71128
```bash
@@ -85,6 +142,63 @@ curl -X POST http://localhost:8091/v1/videos \
85142
-F "seed=42"
86143
```
87144

145+
## Create Response Format
146+
147+
`POST /v1/videos` returns a job record.
148+
149+
```json
150+
{
151+
"id": "video_gen_123",
152+
"object": "video",
153+
"status": "queued",
154+
"model": "Wan-AI/Wan2.2-I2V-A14B-Diffusers",
155+
"prompt": "A bear playing with yarn, smooth motion",
156+
"created_at": 1234567890
157+
}
158+
```
159+
160+
## Retrieve, List, Download, and Delete
161+
162+
### Retrieve a job
163+
164+
```bash
165+
curl -s http://localhost:8091/v1/videos/${video_id} | jq .
166+
```
167+
168+
### List jobs
169+
170+
```bash
171+
curl -s http://localhost:8091/v1/videos | jq .
172+
```
173+
174+
### Download the completed video
175+
176+
```bash
177+
curl -L http://localhost:8091/v1/videos/${video_id}/content -o wan22_i2v_output.mp4
178+
```
179+
180+
### Delete a job and its stored file
181+
182+
```bash
183+
curl -X DELETE http://localhost:8091/v1/videos/${video_id} | jq .
184+
```
185+
186+
## Poll Until Complete
187+
188+
```bash
189+
while true; do
190+
status=$(curl -s http://localhost:8091/v1/videos/${video_id} | jq -r '.status')
191+
if [ "$status" = "completed" ]; then
192+
break
193+
fi
194+
if [ "$status" = "failed" ]; then
195+
echo "Video generation failed"
196+
exit 1
197+
fi
198+
sleep 2
199+
done
200+
```
201+
88202
## Example materials
89203

90204
??? abstract "run_curl_image_to_video.sh"

0 commit comments

Comments
 (0)