Skip to content

Commit ac31663

Browse files
feat: apply DGD overrides before running interpolation (#7226) (#7241)
Signed-off-by: Hannah Zhang <hannahz@nvidia.com> Co-authored-by: Harrison Saturley-Hall <hsaturleyhal@nvidia.com>
1 parent f080402 commit ac31663

File tree

2 files changed

+262
-2
lines changed

2 files changed

+262
-2
lines changed

components/src/dynamo/profiler/profile_sla.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,17 @@ async def run_profile(
339339
dgd_config = pick_result.get("dgd_config") if not ops.dry_run else None
340340
resolved_backend = pick_result.get("resolved_backend", backend)
341341

342+
if dgd_config and dgdr.overrides and dgdr.overrides.dgd:
343+
dgd_config = apply_dgd_overrides(dgd_config, dgdr.overrides.dgd)
344+
logger.info("Applied DGD overrides to the picked DGD config.")
345+
job_tolerations = get_profiling_job_tolerations(dgdr)
346+
if job_tolerations and dgd_config:
347+
dgd_config = inject_tolerations_into_dgd(dgd_config, job_tolerations)
348+
logger.debug(
349+
"Propagated %d profiling-job toleration(s) to the picked DGD config.",
350+
len(job_tolerations),
351+
)
352+
342353
# ---------------------------------------------------------------
343354
# Interpolation curves — only needed when something consumes
344355
# the per-engine performance data (throughput scaling or mocker).
@@ -398,8 +409,8 @@ async def run_profile(
398409
final_config = apply_dgd_overrides(final_config, dgdr.overrides.dgd)
399410
logger.info("Applied DGD overrides to the final config.")
400411

401-
# Propagate profiling-job tolerations to the final DGD
402-
job_tolerations = get_profiling_job_tolerations(dgdr)
412+
# Propagate profiling-job tolerations to the final DGD (covers any
413+
# services added by assemble_final_config, e.g. Planner).
403414
if job_tolerations and final_config:
404415
final_config = _apply_tolerations_to_final_config(
405416
final_config, job_tolerations

components/src/dynamo/profiler/tests/test_profiler_protocol.py

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,22 @@
33

44
"""Unit tests for profiler config_modifiers/protocol helpers."""
55

6+
import copy
7+
import logging
8+
from unittest.mock import AsyncMock, patch
9+
610
import pytest
711

12+
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
13+
PickedParallelConfig,
14+
)
815
from dynamo.profiler.utils.config_modifiers.protocol import apply_dgd_overrides
16+
from dynamo.profiler.utils.defaults import SearchStrategy
17+
from dynamo.profiler.utils.dgdr_v1beta1_types import (
18+
DynamoGraphDeploymentRequestSpec,
19+
OverridesSpec,
20+
)
21+
from dynamo.profiler.utils.profile_common import ProfilerOperationalConfig
922

1023
pytestmark = [
1124
pytest.mark.unit,
@@ -104,3 +117,239 @@ def test_apply_dgd_overrides_metadata_only_identity_keys_dropped_entirely() -> N
104117

105118
# Only original metadata should remain — no extra keys added.
106119
assert result["metadata"] == {"name": "svc"}
120+
121+
122+
def test_apply_dgd_overrides_extrapodspec_tolerations() -> None:
123+
"""extraPodSpec.tolerations from overrides are merged into existing services.
124+
125+
Regression test for TC-5.2a: interpolation DGDs were deployed without
126+
tolerations because apply_dgd_overrides was called after run_interpolation.
127+
This test verifies the merge logic itself is correct.
128+
"""
129+
toleration = {"key": "nvidia.com/gpu", "operator": "Exists", "effect": "NoSchedule"}
130+
dgd_config = {
131+
"spec": {
132+
"services": {
133+
"VllmDecodeWorker": {
134+
"componentType": "worker",
135+
"extraPodSpec": {
136+
"mainContainer": {
137+
"image": "my-image",
138+
"args": ["--model", "Qwen3-32B"],
139+
}
140+
},
141+
"replicas": 1,
142+
},
143+
"Frontend": {
144+
"extraPodSpec": {},
145+
},
146+
}
147+
}
148+
}
149+
overrides = {
150+
"apiVersion": "nvidia.com/v1alpha1",
151+
"kind": "DynamoGraphDeployment",
152+
"metadata": {"name": "placeholder"},
153+
"spec": {
154+
"services": {
155+
"VllmDecodeWorker": {"extraPodSpec": {"tolerations": [toleration]}},
156+
"Frontend": {"extraPodSpec": {"tolerations": [toleration]}},
157+
}
158+
},
159+
}
160+
161+
result = apply_dgd_overrides(dgd_config, overrides)
162+
163+
# Tolerations must be present on both services.
164+
decode_eps = result["spec"]["services"]["VllmDecodeWorker"]["extraPodSpec"]
165+
assert decode_eps["tolerations"] == [toleration]
166+
# mainContainer must be preserved (not overwritten).
167+
assert decode_eps["mainContainer"]["image"] == "my-image"
168+
169+
frontend_eps = result["spec"]["services"]["Frontend"]["extraPodSpec"]
170+
assert frontend_eps["tolerations"] == [toleration]
171+
172+
# Original must not be mutated.
173+
assert (
174+
"tolerations"
175+
not in dgd_config["spec"]["services"]["VllmDecodeWorker"]["extraPodSpec"]
176+
)
177+
178+
179+
def test_apply_dgd_overrides_missing_service_skipped_with_warning(caplog) -> None:
180+
"""Overrides for services absent from the DGD are skipped with a warning."""
181+
dgd_config = {
182+
"spec": {
183+
"services": {
184+
"Frontend": {"replicas": 1},
185+
}
186+
}
187+
}
188+
overrides = {
189+
"spec": {
190+
"services": {
191+
"Frontend": {"replicas": 2},
192+
"NonExistentWorker": {
193+
"extraPodSpec": {"tolerations": [{"key": "foo"}]}
194+
},
195+
}
196+
}
197+
}
198+
199+
with caplog.at_level(
200+
logging.WARNING, logger="dynamo.profiler.utils.config_modifiers.protocol"
201+
):
202+
result = apply_dgd_overrides(dgd_config, overrides)
203+
204+
assert result["spec"]["services"]["Frontend"]["replicas"] == 2
205+
assert "NonExistentWorker" not in result["spec"]["services"]
206+
assert any(
207+
"NonExistentWorker" in r.getMessage() and r.levelno == logging.WARNING
208+
for r in caplog.records
209+
), "Expected a WARNING mentioning 'NonExistentWorker'"
210+
211+
212+
# ---------------------------------------------------------------------------
213+
# Orchestration-level test: run_profile applies overrides before interpolation
214+
# ---------------------------------------------------------------------------
215+
216+
_TOLERATION = {"key": "nvidia.com/gpu", "operator": "Exists", "effect": "NoSchedule"}
217+
218+
# Base DGD returned by the mocked strategy — no tolerations yet.
219+
_BASE_DGD = {
220+
"spec": {
221+
"services": {
222+
"VllmDecodeWorker": {
223+
"extraPodSpec": {
224+
"mainContainer": {"image": "my-image", "args": ["--model", "m"]},
225+
},
226+
"replicas": 1,
227+
},
228+
}
229+
}
230+
}
231+
232+
# User-supplied DGD overrides: toleration for a real service + one ghost service.
233+
_OVERRIDE_DGD = {
234+
"spec": {
235+
"services": {
236+
"VllmDecodeWorker": {"extraPodSpec": {"tolerations": [_TOLERATION]}},
237+
"GhostService": {"extraPodSpec": {"tolerations": [_TOLERATION]}},
238+
}
239+
}
240+
}
241+
242+
243+
async def test_run_profile_applies_dgd_overrides_before_interpolation(
244+
tmp_path, caplog
245+
) -> None:
246+
"""run_profile must apply DGD overrides to dgd_config before run_interpolation.
247+
248+
Regression guard for TC-5.2a: without the fix, interpolation pods were
249+
deployed without extraPodSpec.tolerations, causing them to stay Pending on
250+
GPU nodes with nvidia.com/gpu:NoSchedule taints.
251+
"""
252+
from dynamo.profiler.profile_sla import run_profile
253+
254+
base_dgd = copy.deepcopy(_BASE_DGD)
255+
dgdr = DynamoGraphDeploymentRequestSpec(
256+
model="test/model",
257+
overrides=OverridesSpec(dgd=_OVERRIDE_DGD),
258+
)
259+
ops = ProfilerOperationalConfig(output_dir=str(tmp_path), dry_run=False)
260+
261+
# Capture the disagg_config that run_interpolation receives.
262+
interpolation_kwargs: dict = {}
263+
264+
async def _fake_interpolation(dgdr_arg, ops_arg, disagg_config, *args, **kwargs):
265+
interpolation_kwargs["disagg_config"] = copy.deepcopy(disagg_config)
266+
267+
pick_result = {
268+
"dgd_config": base_dgd,
269+
"resolved_backend": "vllm",
270+
"chosen_exp": "disagg",
271+
"best_config_df": None,
272+
"best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0},
273+
}
274+
275+
with (
276+
patch("dynamo.profiler.profile_sla.valid_dgdr_spec"),
277+
patch("dynamo.profiler.profile_sla.validate_dgdr_dynamo_features"),
278+
patch(
279+
"dynamo.profiler.profile_sla.check_model_hardware_support",
280+
return_value=True,
281+
),
282+
patch(
283+
"dynamo.profiler.profile_sla._extract_profiler_params",
284+
return_value=(
285+
"test/model",
286+
"vllm",
287+
"h100_sxm",
288+
8,
289+
4000,
290+
1000,
291+
None,
292+
2000.0,
293+
30.0,
294+
SearchStrategy.RAPID,
295+
"throughput",
296+
),
297+
),
298+
patch(
299+
"dynamo.profiler.profile_sla._execute_strategy",
300+
new=AsyncMock(
301+
return_value=(
302+
pick_result,
303+
PickedParallelConfig(),
304+
PickedParallelConfig(),
305+
2000.0,
306+
30.0,
307+
)
308+
),
309+
),
310+
patch("dynamo.profiler.profile_sla.needs_profile_data", return_value=True),
311+
patch(
312+
"dynamo.profiler.profile_sla.run_interpolation",
313+
new=_fake_interpolation,
314+
),
315+
patch(
316+
"dynamo.profiler.profile_sla.assemble_final_config",
317+
return_value=copy.deepcopy(base_dgd),
318+
),
319+
patch("dynamo.profiler.profile_sla._write_final_output", return_value=True),
320+
patch("dynamo.profiler.profile_sla.write_profiler_status"),
321+
patch(
322+
"dynamo.profiler.profile_sla.cleanup_remaining_deployments",
323+
new=AsyncMock(),
324+
),
325+
):
326+
with caplog.at_level(
327+
logging.WARNING,
328+
logger="dynamo.profiler.utils.config_modifiers.protocol",
329+
):
330+
await run_profile(dgdr, ops)
331+
332+
assert interpolation_kwargs, "run_interpolation was never called"
333+
disagg_config = interpolation_kwargs["disagg_config"]
334+
335+
# Tolerations must be present on VllmDecodeWorker before interpolation.
336+
eps = disagg_config["spec"]["services"]["VllmDecodeWorker"]["extraPodSpec"]
337+
assert eps["tolerations"] == [_TOLERATION]
338+
339+
# mainContainer must be preserved (not overwritten by the tolerations merge).
340+
assert eps["mainContainer"]["image"] == "my-image"
341+
342+
# GhostService (absent from base DGD) must be silently skipped.
343+
assert "GhostService" not in disagg_config["spec"]["services"]
344+
345+
# apply_dgd_overrides must emit a WARNING about the skipped service.
346+
assert any(
347+
"GhostService" in r.getMessage() and r.levelno == logging.WARNING
348+
for r in caplog.records
349+
), "Expected a WARNING mentioning the skipped 'GhostService'"
350+
351+
# apply_dgd_overrides must not mutate its input.
352+
assert (
353+
"tolerations"
354+
not in base_dgd["spec"]["services"]["VllmDecodeWorker"]["extraPodSpec"]
355+
)

0 commit comments

Comments
 (0)