4646from kubernetes import config as k8s_config
4747
4848from zenml .config .base_settings import BaseSettings
49+ from zenml .constants import (
50+ METADATA_ORCHESTRATOR_RUN_ID ,
51+ )
4952from zenml .enums import StackComponentType
5053from zenml .integrations .kubernetes .flavors .kubernetes_orchestrator_flavor import (
5154 KubernetesOrchestratorConfig ,
6164)
6265from zenml .integrations .kubernetes .pod_settings import KubernetesPodSettings
6366from zenml .logger import get_logger
67+ from zenml .metadata .metadata_types import MetadataType
6468from zenml .orchestrators import ContainerizedOrchestrator , SubmissionResult
6569from zenml .orchestrators .utils import get_orchestrator_run_name
6670from zenml .stack import StackValidator
@@ -467,9 +471,7 @@ def submit_pipeline(
467471 # This will internally also build the command/args for all step pods.
468472 command = KubernetesOrchestratorEntrypointConfiguration .get_entrypoint_command ()
469473 args = KubernetesOrchestratorEntrypointConfiguration .get_entrypoint_arguments (
470- run_name = orchestrator_run_name ,
471474 deployment_id = deployment .id ,
472- kubernetes_namespace = self .config .kubernetes_namespace ,
473475 run_id = placeholder_run .id if placeholder_run else None ,
474476 )
475477
@@ -508,6 +510,18 @@ def submit_pipeline(
508510 }
509511 )
510512
513+ orchestrator_pod_labels = {
514+ "pipeline" : kube_utils .sanitize_label (pipeline_name ),
515+ }
516+
517+ if placeholder_run :
518+ orchestrator_pod_labels ["run_id" ] = kube_utils .sanitize_label (
519+ str (placeholder_run .id )
520+ )
521+ orchestrator_pod_labels ["run_name" ] = kube_utils .sanitize_label (
522+ str (placeholder_run .name )
523+ )
524+
511525 # Schedule as CRON job if CRON schedule is given.
512526 if deployment .schedule :
513527 if not deployment .schedule .cron_expression :
@@ -519,9 +533,7 @@ def submit_pipeline(
519533 cron_expression = deployment .schedule .cron_expression
520534 cron_job_manifest = build_cron_job_manifest (
521535 cron_expression = cron_expression ,
522- run_name = orchestrator_run_name ,
523536 pod_name = pod_name ,
524- pipeline_name = pipeline_name ,
525537 image_name = image ,
526538 command = command ,
527539 args = args ,
@@ -533,6 +545,7 @@ def submit_pipeline(
533545 successful_jobs_history_limit = settings .successful_jobs_history_limit ,
534546 failed_jobs_history_limit = settings .failed_jobs_history_limit ,
535547 ttl_seconds_after_finished = settings .ttl_seconds_after_finished ,
548+ labels = orchestrator_pod_labels ,
536549 )
537550
538551 self ._k8s_batch_api .create_namespaced_cron_job (
@@ -547,16 +560,15 @@ def submit_pipeline(
547560 else :
548561 # Create and run the orchestrator pod.
549562 pod_manifest = build_pod_manifest (
550- run_name = orchestrator_run_name ,
551563 pod_name = pod_name ,
552- pipeline_name = pipeline_name ,
553564 image_name = image ,
554565 command = command ,
555566 args = args ,
556567 privileged = False ,
557568 pod_settings = orchestrator_pod_settings ,
558569 service_account_name = service_account_name ,
559570 env = environment ,
571+ labels = orchestrator_pod_labels ,
560572 mount_local_stores = self .config .is_local ,
561573 )
562574
@@ -572,6 +584,11 @@ def submit_pipeline(
572584 startup_timeout = settings .pod_startup_timeout ,
573585 )
574586
587+ metadata : Dict [str , MetadataType ] = {
588+ METADATA_ORCHESTRATOR_RUN_ID : pod_name ,
589+ }
590+
591+ # Wait for the orchestrator pod to finish and stream logs.
575592 if settings .synchronous :
576593
577594 def _wait_for_run_to_finish () -> None :
@@ -588,7 +605,8 @@ def _wait_for_run_to_finish() -> None:
588605 )
589606
590607 return SubmissionResult (
591- wait_for_completion = _wait_for_run_to_finish
608+ metadata = metadata ,
609+ wait_for_completion = _wait_for_run_to_finish ,
592610 )
593611 else :
594612 logger .info (
@@ -597,7 +615,9 @@ def _wait_for_run_to_finish() -> None:
597615 f"Run the following command to inspect the logs: "
598616 f"`kubectl logs { pod_name } -n { self .config .kubernetes_namespace } `."
599617 )
600- return None
618+ return SubmissionResult (
619+ metadata = metadata ,
620+ )
601621
602622 def _get_service_account_name (
603623 self , settings : KubernetesOrchestratorSettings
@@ -642,3 +662,18 @@ def get_orchestrator_run_id(self) -> str:
642662 "Unable to read run id from environment variable "
643663 f"{ ENV_ZENML_KUBERNETES_RUN_ID } ."
644664 )
665+
666+ def get_pipeline_run_metadata (
667+ self , run_id : UUID
668+ ) -> Dict [str , "MetadataType" ]:
669+ """Get general component-specific metadata for a pipeline run.
670+
671+ Args:
672+ run_id: The ID of the pipeline run.
673+
674+ Returns:
675+ A dictionary of metadata.
676+ """
677+ return {
678+ METADATA_ORCHESTRATOR_RUN_ID : self .get_orchestrator_run_id (),
679+ }
0 commit comments