Skip to content

Commit 2691094

Browse files
authored
[serve][llm] Data Parallel Attention: Public API and Documentation (#58301)
1 parent fe5cd57 commit 2691094

File tree

11 files changed

+667
-59
lines changed

11 files changed

+667
-59
lines changed

.buildkite/llm.rayci.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,18 @@ steps:
4646
commands:
4747
- RAYCI_DISABLE_TEST_DB=1 bazel run //ci/ray_ci:test_in_docker -- //python/ray/llm/... //doc/... llm
4848
--python-version 3.11 --build-name llmgpubuild --only-tags gpu
49+
--except-tags multi_gpu_4
50+
depends_on: llmgpubuild
51+
52+
- label: "llm gpu tests (4 GPUs)"
53+
key: "llm-gpu-tests-4gpu"
54+
tags:
55+
- llm
56+
- gpu
57+
instance_type: gpu-large
58+
commands:
59+
- RAYCI_DISABLE_TEST_DB=1 bazel run //ci/ray_ci:test_in_docker -- //doc/... llm
60+
--python-version 3.11 --build-name llmgpubuild
61+
--only-tags multi_gpu_4
62+
--gpus 4
4963
depends_on: llmgpubuild

doc/BUILD.bazel

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,11 @@ filegroup(
342342
visibility = ["//doc:__subpackages__"],
343343
)
344344

345-
# GPU Tests
345+
# GPU Tests (standard GPU tests)
346346
py_test_run_all_subdirectory(
347347
size = "large",
348348
include = ["source/llm/doc_code/serve/**/*.py"],
349-
exclude = [],
349+
exclude = ["source/llm/doc_code/serve/multi_gpu/**/*.py"],
350350
extra_srcs = [],
351351
data = ["source/llm/doc_code/serve/qwen/llm_config_example.yaml"],
352352
tags = [
@@ -356,6 +356,20 @@ py_test_run_all_subdirectory(
356356
],
357357
)
358358

359+
# Multi-GPU Tests (4+ GPUs)
360+
py_test_run_all_subdirectory(
361+
size = "large",
362+
include = ["source/llm/doc_code/serve/multi_gpu/**/*.py"],
363+
exclude = [],
364+
extra_srcs = [],
365+
tags = [
366+
"exclusive",
367+
"gpu",
368+
"multi_gpu_4",
369+
"team:llm",
370+
],
371+
)
372+
359373
# --------------------------------------------------------------------
360374
# Test all doc/source/data/doc_code/working-with-llms code included in rst/md files.
361375
# --------------------------------------------------------------------
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""
2+
This file serves as a documentation example and CI test for basic data parallel attention deployment.
3+
4+
Structure:
5+
1. Monkeypatch setup: Ensures serve.run is non-blocking and removes accelerator requirements for CI testing.
6+
2. Docs example (between __dp_basic_example_start/end__): Embedded in Sphinx docs via literalinclude.
7+
3. Test validation (deployment status polling + cleanup)
8+
"""
9+
10+
import time
11+
from ray import serve
12+
from ray.serve.schema import ApplicationStatus
13+
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
14+
from ray.serve import llm
15+
16+
_original_serve_run = serve.run
17+
_original_build_dp_openai_app = llm.build_dp_openai_app
18+
19+
20+
def _non_blocking_serve_run(app, **kwargs):
21+
"""Forces blocking=False for testing"""
22+
kwargs["blocking"] = False
23+
return _original_serve_run(app, **kwargs)
24+
25+
26+
def _testing_build_dp_openai_app(builder_config, **kwargs):
27+
"""Removes accelerator requirements for testing"""
28+
if "llm_config" in builder_config:
29+
config = builder_config["llm_config"]
30+
if hasattr(config, "accelerator_type") and config.accelerator_type is not None:
31+
config.accelerator_type = None
32+
return _original_build_dp_openai_app(builder_config, **kwargs)
33+
34+
35+
serve.run = _non_blocking_serve_run
36+
llm.build_dp_openai_app = _testing_build_dp_openai_app
37+
38+
# __dp_basic_example_start__
39+
from ray import serve
40+
from ray.serve.llm import LLMConfig, build_dp_openai_app
41+
42+
# Configure the model with data parallel settings
43+
config = LLMConfig(
44+
model_loading_config={
45+
"model_id": "Qwen/Qwen2.5-0.5B-Instruct"
46+
},
47+
engine_kwargs={
48+
"data_parallel_size": 2, # Number of DP replicas
49+
"tensor_parallel_size": 1, # TP size per replica
50+
},
51+
experimental_configs={
52+
# This is a temporary required config. We will remove this in future versions.
53+
"dp_size_per_node": 2, # DP replicas per node
54+
},
55+
)
56+
57+
app = build_dp_openai_app({
58+
"llm_config": config
59+
})
60+
61+
serve.run(app, blocking=True)
62+
# __dp_basic_example_end__
63+
64+
status = ApplicationStatus.NOT_STARTED
65+
timeout_seconds = 300
66+
start_time = time.time()
67+
68+
while (
69+
status != ApplicationStatus.RUNNING and time.time() - start_time < timeout_seconds
70+
):
71+
status = serve.status().applications[SERVE_DEFAULT_APP_NAME].status
72+
73+
if status in [ApplicationStatus.DEPLOY_FAILED, ApplicationStatus.UNHEALTHY]:
74+
raise AssertionError(f"Deployment failed with status: {status}")
75+
76+
time.sleep(1)
77+
78+
if status != ApplicationStatus.RUNNING:
79+
raise AssertionError(
80+
f"Deployment failed to reach RUNNING status within {timeout_seconds}s. Current status: {status}"
81+
)
82+
83+
serve.shutdown()
84+
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
"""
2+
This file serves as a documentation example and CI test for data parallel + prefill-decode disaggregation.
3+
4+
Structure:
5+
1. Monkeypatch setup: Ensures serve.run is non-blocking and removes accelerator requirements for CI testing.
6+
2. Docs example (between __dp_pd_example_start/end__): Embedded in Sphinx docs via literalinclude.
7+
3. Test validation (deployment status polling + cleanup)
8+
"""
9+
10+
import time
11+
from ray import serve
12+
from ray.serve.schema import ApplicationStatus
13+
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
14+
from ray.serve import llm
15+
from ray.serve.llm.deployment import PDProxyServer
16+
from ray.serve.llm.ingress import OpenAiIngress, make_fastapi_ingress
17+
18+
# Check if NIXL is available (required for NixlConnector)
19+
try:
20+
import nixl # noqa: F401
21+
NIXL_AVAILABLE = True
22+
except ImportError:
23+
NIXL_AVAILABLE = False
24+
25+
if not NIXL_AVAILABLE:
26+
raise ImportError(
27+
"NIXL is required for this example but is not installed. "
28+
"Install it with: pip install nixl or uv pip install nixl"
29+
)
30+
31+
_original_serve_run = serve.run
32+
_original_build_dp_deployment = llm.build_dp_deployment
33+
34+
35+
def _non_blocking_serve_run(app, **kwargs):
36+
"""Forces blocking=False for testing"""
37+
kwargs["blocking"] = False
38+
return _original_serve_run(app, **kwargs)
39+
40+
41+
def _testing_build_dp_deployment(llm_config, **kwargs):
42+
"""Removes accelerator requirements for testing"""
43+
if llm_config.accelerator_type is not None:
44+
llm_config.accelerator_type = None
45+
return _original_build_dp_deployment(llm_config, **kwargs)
46+
47+
48+
serve.run = _non_blocking_serve_run
49+
llm.build_dp_deployment = _testing_build_dp_deployment
50+
51+
# __dp_pd_example_start__
52+
from ray import serve
53+
from ray.serve.llm import LLMConfig, build_dp_deployment
54+
from ray.serve.llm.deployment import PDProxyServer
55+
from ray.serve.llm.ingress import OpenAiIngress, make_fastapi_ingress
56+
57+
# Configure prefill with data parallel attention
58+
prefill_config = LLMConfig(
59+
model_loading_config={
60+
"model_id": "Qwen/Qwen2.5-0.5B-Instruct"
61+
},
62+
engine_kwargs={
63+
"data_parallel_size": 2, # 2 DP replicas for prefill
64+
"tensor_parallel_size": 1,
65+
"kv_transfer_config": {
66+
"kv_connector": "NixlConnector",
67+
"kv_role": "kv_both",
68+
}
69+
},
70+
experimental_configs={
71+
"dp_size_per_node": 2,
72+
},
73+
)
74+
75+
# Configure decode with data parallel attention
76+
decode_config = LLMConfig(
77+
model_loading_config={
78+
"model_id": "Qwen/Qwen2.5-0.5B-Instruct"
79+
},
80+
engine_kwargs={
81+
"data_parallel_size": 2, # 2 DP replicas for decode (adjusted for 4 GPU limit)
82+
"tensor_parallel_size": 1,
83+
"kv_transfer_config": {
84+
"kv_connector": "NixlConnector",
85+
"kv_role": "kv_both",
86+
}
87+
},
88+
experimental_configs={
89+
"dp_size_per_node": 2,
90+
},
91+
)
92+
93+
# Build prefill and decode deployments with DP
94+
prefill_deployment = build_dp_deployment(prefill_config, name_prefix="Prefill:")
95+
decode_deployment = build_dp_deployment(decode_config, name_prefix="Decode:")
96+
97+
# Create PDProxyServer to coordinate between prefill and decode
98+
proxy_options = PDProxyServer.get_deployment_options(prefill_config, decode_config)
99+
proxy_deployment = serve.deployment(PDProxyServer).options(**proxy_options).bind(
100+
prefill_server=prefill_deployment,
101+
decode_server=decode_deployment,
102+
)
103+
104+
# Create OpenAI-compatible ingress
105+
ingress_options = OpenAiIngress.get_deployment_options([prefill_config, decode_config])
106+
ingress_cls = make_fastapi_ingress(OpenAiIngress)
107+
ingress_deployment = serve.deployment(ingress_cls).options(**ingress_options).bind(
108+
llm_deployments=[proxy_deployment]
109+
)
110+
111+
# Deploy the application
112+
serve.run(ingress_deployment, blocking=True)
113+
# __dp_pd_example_end__
114+
115+
status = ApplicationStatus.NOT_STARTED
116+
timeout_seconds = 300 # Longer timeout for DP+PD setup
117+
start_time = time.time()
118+
119+
while (
120+
status != ApplicationStatus.RUNNING and time.time() - start_time < timeout_seconds
121+
):
122+
status = serve.status().applications[SERVE_DEFAULT_APP_NAME].status
123+
124+
if status in [ApplicationStatus.DEPLOY_FAILED, ApplicationStatus.UNHEALTHY]:
125+
raise AssertionError(f"Deployment failed with status: {status}")
126+
127+
time.sleep(1)
128+
129+
if status != ApplicationStatus.RUNNING:
130+
raise AssertionError(
131+
f"Deployment failed to reach RUNNING status within {timeout_seconds}s. Current status: {status}"
132+
)
133+
134+
serve.shutdown()
135+

0 commit comments

Comments
 (0)