Skip to content

Commit 6c71504

Browse files
authored
feat(exporters): put the NEL config in the MLflow parameters (#653)
Signed-off-by: Piotr Januszewski <pjanuszewski@nvidia.com>
1 parent 0257b14 commit 6c71504

File tree

4 files changed

+304
-16
lines changed

4 files changed

+304
-16
lines changed

packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/exporters/mlflow.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from nemo_evaluator_launcher.exporters.utils import (
3636
extract_accuracy_metrics,
3737
extract_exporter_config,
38+
flatten_config,
3839
get_artifact_root,
3940
get_available_artifacts,
4041
get_benchmark_info,
@@ -178,6 +179,15 @@ def export_job(self, job_data: JobData) -> ExportResult:
178179
if mlflow_config.get("extra_metadata"):
179180
all_params.update(mlflow_config["extra_metadata"])
180181

182+
# Add flattened config as params if enabled
183+
if mlflow_config.get("log_config_params", False):
184+
config_params = flatten_config(
185+
job_data.config or {},
186+
parent_key="config",
187+
max_depth=mlflow_config.get("log_config_params_max_depth", 10),
188+
)
189+
all_params.update(config_params)
190+
181191
# Add webhook info if available
182192
if mlflow_config.get("triggered_by_webhook"):
183193
all_params.update(
@@ -525,16 +535,31 @@ def export_invocation(self, invocation_id: str) -> Dict[str, Any]:
525535
if mlflow_config.get("extra_metadata"):
526536
all_params.update(mlflow_config["extra_metadata"])
527537

538+
# Add flattened config as params if enabled
539+
if mlflow_config.get("log_config_params", False):
540+
config_params = flatten_config(
541+
first_job.config or {},
542+
parent_key="config",
543+
max_depth=mlflow_config.get("log_config_params_max_depth", 10),
544+
)
545+
all_params.update(config_params)
546+
528547
# Prepare tags
529548
tags = {"invocation_id": invocation_id}
530549
if mlflow_config.get("tags"):
531550
tags.update({k: v for k, v in mlflow_config["tags"].items() if v})
532551

533-
# Truncate
552+
# Sanitize params and tags
534553
safe_params = {
535-
str(k)[:250]: str(v)[:250] for k, v in all_params.items() if v
554+
mlflow_sanitize(k, "param_key"): mlflow_sanitize(v, "param_value")
555+
for k, v in (all_params or {}).items()
556+
if v is not None
557+
}
558+
safe_tags = {
559+
mlflow_sanitize(k, "tag_key"): mlflow_sanitize(v, "tag_value")
560+
for k, v in (tags or {}).items()
561+
if v is not None
536562
}
537-
safe_tags = {str(k)[:250]: str(v)[:5000] for k, v in tags.items() if v}
538563

539564
# Check for existing run
540565
exists, existing_run_id = self._get_existing_run_info(

packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/exporters/utils.py

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -416,12 +416,19 @@ def scp_file(remote_path: str, local_path: Path) -> bool:
416416
if scp_file(remote_file, local_file):
417417
exported_files.append(str(local_file))
418418
else:
419-
# Copy known files individually to avoid subfolders and satisfy tests
420-
for artifact in get_available_artifacts(paths.get("artifacts_dir", Path())):
421-
remote_file = f"{paths['remote_path']}/artifacts/{artifact}"
422-
local_file = art_dir / artifact
423-
if scp_file(remote_file, local_file):
424-
exported_files.append(str(local_file))
419+
# Copy all artifacts recursively when only_required=False
420+
cmd = (
421+
["scp", "-r"]
422+
+ ssh_opts
423+
+ [
424+
f"{paths['username']}@{paths['hostname']}:{paths['remote_path']}/artifacts/.",
425+
str(art_dir),
426+
]
427+
)
428+
if subprocess.run(cmd, capture_output=True).returncode == 0:
429+
exported_files.extend(
430+
[str(f) for f in art_dir.rglob("*") if f.is_file()]
431+
)
425432

426433
# Logs (top-level only)
427434
if copy_logs:
@@ -586,6 +593,60 @@ def _safe_update_metrics(
586593
_safe_set_metric(target, k, v, context)
587594

588595

596+
# =============================================================================
597+
# CONFIG FLATTENING
598+
# =============================================================================
599+
600+
601+
def flatten_config(
602+
config: Any,
603+
parent_key: str = "",
604+
sep: str = ".",
605+
max_depth: int = 10,
606+
) -> Dict[str, str]:
607+
"""
608+
Flatten a nested config dict into dot-notation keys.
609+
610+
Args:
611+
config: Nested configuration (dict, list, or scalar)
612+
parent_key: Prefix for keys (used in recursion)
613+
sep: Separator between nested keys
614+
max_depth: Maximum recursion depth to prevent infinite loops
615+
616+
Returns:
617+
Flattened dictionary with string values
618+
619+
Examples:
620+
>>> flatten_config({"a": {"b": 1}})
621+
{"a.b": "1"}
622+
>>> flatten_config({"tasks": [{"name": "foo"}, {"name": "bar"}]})
623+
{"tasks.0.name": "foo", "tasks.1.name": "bar"}
624+
"""
625+
if max_depth <= 0:
626+
return {parent_key: str(config)} if parent_key else {}
627+
628+
if isinstance(config, dict):
629+
items: Dict[str, str] = {}
630+
for key, value in config.items():
631+
new_key = f"{parent_key}{sep}{key}" if parent_key else key
632+
items.update(flatten_config(value, new_key, sep, max_depth - 1))
633+
return items
634+
635+
if isinstance(config, list):
636+
items: Dict[str, str] = {}
637+
for idx, item in enumerate(config):
638+
item_key = f"{parent_key}{sep}{idx}" if parent_key else str(idx)
639+
items.update(flatten_config(item, item_key, sep, max_depth - 1))
640+
return items
641+
642+
# Scalar value
643+
if not parent_key:
644+
return {}
645+
if config is None:
646+
return {parent_key: "null"}
647+
return {parent_key: str(config)}
648+
649+
589650
# =============================================================================
590651
# MLFLOW FUNCTIONS
591652
# =============================================================================

packages/nemo-evaluator-launcher/tests/unit_tests/exporters/test_mlflow_exporter.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,3 +377,104 @@ def _log_artifact(path, artifact_path=None):
377377
assert res.metadata["artifacts_logged"] == 1
378378
assert calls["config"] == 1
379379
assert calls["files"] == [("results.yml", "taskX/artifacts")]
380+
381+
def test_log_config_params_flattens_config(
382+
self, monkeypatch, mlflow_fake, tmp_path: Path
383+
):
384+
"""Test that log_config_params=True flattens the config into MLflow params."""
385+
_ML, _RunCtx = mlflow_fake
386+
387+
# Job with nested config
388+
config = {
389+
"deployment": {"tensor_parallel_size": 8, "model": "test-model"},
390+
"evaluation": {
391+
"tasks": [
392+
{"name": "task1", "config": {"param": "value1"}},
393+
{"name": "task2"},
394+
]
395+
},
396+
}
397+
jd = JobData("mP", "mP.0", 0.0, "local", {"output_dir": str(tmp_path)}, config)
398+
399+
monkeypatch.setattr(
400+
"nemo_evaluator_launcher.exporters.mlflow.extract_accuracy_metrics",
401+
lambda *_: {"task_accuracy": 0.9},
402+
raising=True,
403+
)
404+
405+
# Capture log_params calls
406+
logged_params = {}
407+
monkeypatch.setattr(
408+
"nemo_evaluator_launcher.exporters.mlflow.mlflow.log_params",
409+
lambda p: logged_params.update(p),
410+
raising=True,
411+
)
412+
413+
exp = MLflowExporter(
414+
{
415+
"tracking_uri": "http://mlflow",
416+
"log_config_params": True,
417+
}
418+
)
419+
monkeypatch.setattr(
420+
exp, "_get_existing_run_info", lambda *a, **k: (False, None), raising=False
421+
)
422+
423+
res = exp.export_job(jd)
424+
assert res.success
425+
426+
# Verify flattened config params are logged
427+
assert "config.deployment.tensor_parallel_size" in logged_params
428+
assert logged_params["config.deployment.tensor_parallel_size"] == "8"
429+
assert "config.deployment.model" in logged_params
430+
assert logged_params["config.deployment.model"] == "test-model"
431+
assert "config.evaluation.tasks.0.name" in logged_params
432+
assert logged_params["config.evaluation.tasks.0.name"] == "task1"
433+
assert "config.evaluation.tasks.1.name" in logged_params
434+
assert logged_params["config.evaluation.tasks.1.name"] == "task2"
435+
436+
def test_log_config_params_with_max_depth(
437+
self, monkeypatch, mlflow_fake, tmp_path: Path
438+
):
439+
"""Test that log_config_params_max_depth limits flattening depth."""
440+
_ML, _RunCtx = mlflow_fake
441+
442+
config = {"a": {"b": {"c": {"d": "deep"}}}}
443+
jd = JobData(
444+
"mDepth", "mDepth.0", 0.0, "local", {"output_dir": str(tmp_path)}, config
445+
)
446+
447+
monkeypatch.setattr(
448+
"nemo_evaluator_launcher.exporters.mlflow.extract_accuracy_metrics",
449+
lambda *_: {"acc": 0.9},
450+
raising=True,
451+
)
452+
453+
logged_params = {}
454+
monkeypatch.setattr(
455+
"nemo_evaluator_launcher.exporters.mlflow.mlflow.log_params",
456+
lambda p: logged_params.update(p),
457+
raising=True,
458+
)
459+
460+
exp = MLflowExporter(
461+
{
462+
"tracking_uri": "http://mlflow",
463+
"log_config_params": True,
464+
"log_config_params_max_depth": 2,
465+
}
466+
)
467+
monkeypatch.setattr(
468+
exp, "_get_existing_run_info", lambda *a, **k: (False, None), raising=False
469+
)
470+
471+
res = exp.export_job(jd)
472+
assert res.success
473+
474+
# At depth 2, a.b should be stringified (not further flattened)
475+
assert "config.a.b" in logged_params
476+
# The value should be a string representation of the remaining dict
477+
assert "c" in logged_params["config.a.b"]
478+
# Deeper keys should not exist
479+
assert "config.a.b.c" not in logged_params
480+
assert "config.a.b.c.d" not in logged_params

packages/nemo-evaluator-launcher/tests/unit_tests/exporters/test_utils.py

Lines changed: 108 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
MetricConflictError,
3838
_safe_update_metrics,
3939
extract_accuracy_metrics,
40+
flatten_config,
4041
get_available_artifacts,
4142
get_benchmark_info,
4243
get_container_from_mapping,
@@ -323,24 +324,41 @@ def test_download_artifacts_only_required_with_logs(self, tmp_path: Path):
323324
}
324325
assert set(out).issuperset(expected_artifacts)
325326

326-
def test_download_artifacts_available_only(self, tmp_path: Path):
327-
local_artifacts = tmp_path / "local_artifacts"
328-
local_artifacts.mkdir()
329-
(local_artifacts / "results.yml").write_text("x")
330-
327+
def test_download_artifacts_only_required_false_uses_scp_recursive(
328+
self, tmp_path: Path
329+
):
330+
"""Test only_required=False uses scp -r to copy all artifacts recursively."""
331331
paths = {
332332
"username": "user",
333333
"hostname": "host",
334334
"remote_path": "/remote",
335-
"artifacts_dir": local_artifacts,
336335
}
336+
scp_calls = []
337+
338+
# Mock scp -r to simulate creating files in the target directory
339+
def fake_run(cmd, capture_output=True):
340+
scp_calls.append(cmd)
341+
if "-r" in cmd:
342+
# Simulate scp -r by creating files including nested dirs
343+
art_dir = tmp_path / "artifacts"
344+
art_dir.mkdir(parents=True, exist_ok=True)
345+
(art_dir / "results.yml").write_text("x")
346+
(art_dir / "extra.json").write_text("{}")
347+
(art_dir / "subdir").mkdir(exist_ok=True)
348+
(art_dir / "subdir" / "nested.txt").write_text("nested")
349+
return SimpleNamespace(returncode=0)
337350

338-
with patch("subprocess.run", return_value=SimpleNamespace(returncode=0)):
351+
with patch("subprocess.run", side_effect=fake_run):
339352
out = U.ssh_download_artifacts(
340353
paths, tmp_path, config={"only_required": False}, control_paths=None
341354
)
342355

356+
# Verify scp -r was called
357+
assert any("-r" in c for c in scp_calls)
358+
# Verify all files including nested are listed
343359
assert str(tmp_path / "artifacts" / "results.yml") in out
360+
assert str(tmp_path / "artifacts" / "extra.json") in out
361+
assert str(tmp_path / "artifacts" / "subdir" / "nested.txt") in out
344362

345363
def test_download_with_control_paths(self, tmp_path: Path, monkeypatch):
346364
paths = {"username": "u", "hostname": "h", "remote_path": "/remote"}
@@ -593,3 +611,86 @@ def export_invocation(self, inv_id):
593611
# metadata injected for each job
594612
for job in payload["jobs"].values():
595613
assert "metadata" in job
614+
615+
616+
class TestFlattenConfig:
617+
def test_simple_dict(self):
618+
config = {"a": 1, "b": "hello"}
619+
result = flatten_config(config)
620+
assert result == {"a": "1", "b": "hello"}
621+
622+
def test_nested_dict(self):
623+
config = {"a": {"b": {"c": 42}}}
624+
result = flatten_config(config)
625+
assert result == {"a.b.c": "42"}
626+
627+
def test_with_parent_key(self):
628+
config = {"x": 1}
629+
result = flatten_config(config, parent_key="config")
630+
assert result == {"config.x": "1"}
631+
632+
def test_list_with_scalars(self):
633+
config = {"items": ["a", "b", "c"]}
634+
result = flatten_config(config)
635+
assert result == {"items.0": "a", "items.1": "b", "items.2": "c"}
636+
637+
def test_list_with_dicts(self):
638+
config = {"tasks": [{"name": "foo"}, {"name": "bar"}]}
639+
result = flatten_config(config)
640+
assert result == {"tasks.0.name": "foo", "tasks.1.name": "bar"}
641+
642+
def test_nested_list_of_dicts(self):
643+
config = {
644+
"evaluation": {
645+
"tasks": [
646+
{"name": "task1", "config": {"param": "value1"}},
647+
{"name": "task2", "config": {"param": "value2"}},
648+
]
649+
}
650+
}
651+
result = flatten_config(config, parent_key="config")
652+
assert result["config.evaluation.tasks.0.name"] == "task1"
653+
assert result["config.evaluation.tasks.0.config.param"] == "value1"
654+
assert result["config.evaluation.tasks.1.name"] == "task2"
655+
assert result["config.evaluation.tasks.1.config.param"] == "value2"
656+
657+
def test_null_values(self):
658+
config = {"a": None, "b": {"c": None}}
659+
result = flatten_config(config)
660+
assert result == {"a": "null", "b.c": "null"}
661+
662+
def test_max_depth_limit(self):
663+
config = {"a": {"b": {"c": {"d": "deep"}}}}
664+
result = flatten_config(config, max_depth=2)
665+
# At depth 2, the inner dict should be stringified
666+
assert "a.b" in result
667+
assert "{'c': {'d': 'deep'}}" in result["a.b"]
668+
669+
def test_empty_dict(self):
670+
result = flatten_config({})
671+
assert result == {}
672+
673+
def test_empty_list(self):
674+
config = {"items": []}
675+
result = flatten_config(config)
676+
assert result == {}
677+
678+
def test_mixed_types(self):
679+
config = {
680+
"string": "hello",
681+
"number": 42,
682+
"float": 3.14,
683+
"bool": True,
684+
"none": None,
685+
}
686+
result = flatten_config(config)
687+
assert result["string"] == "hello"
688+
assert result["number"] == "42"
689+
assert result["float"] == "3.14"
690+
assert result["bool"] == "True"
691+
assert result["none"] == "null"
692+
693+
def test_custom_separator(self):
694+
config = {"a": {"b": 1}}
695+
result = flatten_config(config, sep="/")
696+
assert result == {"a/b": "1"}

0 commit comments

Comments
 (0)