|
26 | 26 | if TYPE_CHECKING: |
27 | 27 | import kubernetes as k8s |
28 | 28 |
|
29 | | - |
30 | 29 | from cloudai.core import BaseJob, System |
31 | 30 | from cloudai.util.lazy_imports import lazy |
32 | 31 |
|
@@ -325,54 +324,63 @@ def _check_model_server(self) -> bool: |
325 | 324 | logging.warning("Invalid JSON response from model server") |
326 | 325 | return False |
327 | 326 |
|
| 327 | + def _get_frontend_pod_name(self) -> str: |
| 328 | + for pod in self.core_v1.list_namespaced_pod(namespace=self.default_namespace).items: |
| 329 | + labels = pod.metadata.labels |
| 330 | + logging.debug(f"Found pod: {pod.metadata.name} with labels: {labels}") |
| 331 | + if labels and str(labels.get("nvidia.com/dynamo-component", "")).lower() == "frontend": |
| 332 | + return pod.metadata.name |
| 333 | + raise RuntimeError("No frontend pod found for the job") |
| 334 | + |
328 | 335 | def _run_genai_perf(self, job: KubernetesJob) -> None: |
329 | 336 | from cloudai.workloads.ai_dynamo.ai_dynamo import AIDynamoTestDefinition |
330 | 337 |
|
331 | | - test_definition = job.test_run.test |
332 | | - if not isinstance(test_definition, AIDynamoTestDefinition): |
| 338 | + tdef = job.test_run.test |
| 339 | + if not isinstance(tdef, AIDynamoTestDefinition): |
333 | 340 | raise TypeError("Test definition must be an instance of AIDynamoTestDefinition") |
334 | 341 |
|
335 | | - python_exec = test_definition.python_executable |
336 | | - if not python_exec or not python_exec.venv_path: |
337 | | - raise ValueError("Python executable path not set - executable may not be installed") |
338 | | - |
339 | | - genai_perf_args_obj = test_definition.cmd_args.genai_perf |
340 | | - if not genai_perf_args_obj: |
341 | | - raise ValueError("GenAI perf args not set") |
| 342 | + genai_perf_results_path = "/tmp/cloudai/genai-perf" |
342 | 343 |
|
343 | | - output_path = job.test_run.output_path |
344 | | - if not output_path: |
345 | | - raise ValueError("Output path not set") |
346 | | - |
347 | | - genai_perf_args = genai_perf_args_obj.model_dump() |
348 | | - args = [f"--artifact-dir={output_path.absolute()}"] |
349 | | - extra_args = None |
350 | | - |
351 | | - for k, v in genai_perf_args.items(): |
352 | | - if k == "extra-args": |
353 | | - extra_args = str(v) |
354 | | - else: |
355 | | - args.append(f"--{k}={v}") |
| 344 | + genai_perf_cmd = ["genai-perf", "profile", f"--artifact-dir={genai_perf_results_path}"] |
| 345 | + for k, v in tdef.cmd_args.genai_perf.model_dump( |
| 346 | + exclude={"extra_args", "extra-args"}, exclude_none=True |
| 347 | + ).items(): |
| 348 | + genai_perf_cmd.append(f"--{k}={v}") |
| 349 | + if extra_args := tdef.cmd_args.genai_perf.extra_args: |
| 350 | + genai_perf_cmd.extend(extra_args.split()) |
| 351 | + logging.debug(f"GenAI perf arguments: {genai_perf_cmd=}") |
356 | 352 |
|
357 | | - if extra_args: |
358 | | - args.append(extra_args) |
359 | | - args_str = " ".join(args) |
| 353 | + frontend_pod = self._get_frontend_pod_name() |
360 | 354 |
|
361 | | - venv_path = python_exec.venv_path.absolute() |
362 | | - cmd = f"{venv_path}/bin/genai-perf profile {args_str}" |
363 | | - logging.debug(f"Running GenAI performance test: {cmd}") |
364 | | - result: subprocess.CompletedProcess | None = None |
| 355 | + logging.debug(f"Executing genai-perf in pod={frontend_pod} cmd={genai_perf_cmd}") |
365 | 356 | try: |
366 | | - result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) |
367 | | - logging.debug("GenAI performance test completed successfully") |
368 | | - except subprocess.CalledProcessError as e: |
369 | | - logging.error(f"GenAI performance test failed: {e.stderr}") |
370 | | - |
371 | | - if result: |
372 | | - with (job.test_run.output_path / "stdout.txt").open("w") as f: |
373 | | - f.write(result.stdout) |
374 | | - with (job.test_run.output_path / "stderr.txt").open("w") as f: |
375 | | - f.write(result.stderr) |
| 357 | + genai_results = lazy.k8s.stream.stream( |
| 358 | + self.core_v1.connect_get_namespaced_pod_exec, |
| 359 | + name=frontend_pod, |
| 360 | + namespace=self.default_namespace, |
| 361 | + command=genai_perf_cmd, |
| 362 | + stderr=True, |
| 363 | + stdin=False, |
| 364 | + stdout=True, |
| 365 | + tty=False, |
| 366 | + _request_timeout=60 * 10, |
| 367 | + ) |
| 368 | + with (job.test_run.output_path / "genai_perf.log").open("w") as f: |
| 369 | + f.write(genai_results) |
| 370 | + except lazy.k8s.client.ApiException as e: |
| 371 | + logging.error(f"Error executing genai-perf command in pod '{frontend_pod}': {e}") |
| 372 | + |
| 373 | + cp_logs_cmd = " ".join( |
| 374 | + [ |
| 375 | + "kubectl", |
| 376 | + "cp", |
| 377 | + f"{self.default_namespace}/{frontend_pod}:{genai_perf_results_path}", |
| 378 | + str(job.test_run.output_path / "genai-perf"), |
| 379 | + ] |
| 380 | + ) |
| 381 | + logging.debug(f"Copying genai-perf results with command: {cp_logs_cmd}") |
| 382 | + p = subprocess.run(cp_logs_cmd, shell=True, capture_output=True, text=True) |
| 383 | + logging.debug(f"Returned code {p.returncode}, stdout: {p.stdout}, stderr: {p.stderr}") |
376 | 384 |
|
377 | 385 | def _check_deployment_conditions(self, conditions: list) -> bool: |
378 | 386 | logging.debug(f"Checking deployment conditions: {conditions}") |
|
0 commit comments