4646from kubernetes import config as k8s_config
4747
4848from zenml .config .base_settings import BaseSettings
49+ from zenml .constants import (
50+ METADATA_ORCHESTRATOR_RUN_ID ,
51+ )
4952from zenml .enums import StackComponentType
5053from zenml .integrations .kubernetes .flavors .kubernetes_orchestrator_flavor import (
5154 KubernetesOrchestratorConfig ,
6265)
6366from zenml .integrations .kubernetes .pod_settings import KubernetesPodSettings
6467from zenml .logger import get_logger
68+ from zenml .metadata .metadata_types import MetadataType
6569from zenml .orchestrators import ContainerizedOrchestrator , SubmissionResult
6670from zenml .orchestrators .utils import get_orchestrator_run_name
6771from zenml .stack import StackValidator
@@ -468,9 +472,7 @@ def submit_pipeline(
468472 # This will internally also build the command/args for all step pods.
469473 command = KubernetesOrchestratorEntrypointConfiguration .get_entrypoint_command ()
470474 args = KubernetesOrchestratorEntrypointConfiguration .get_entrypoint_arguments (
471- run_name = orchestrator_run_name ,
472475 deployment_id = deployment .id ,
473- kubernetes_namespace = self .config .kubernetes_namespace ,
474476 run_id = placeholder_run .id if placeholder_run else None ,
475477 )
476478
@@ -509,6 +511,18 @@ def submit_pipeline(
509511 }
510512 )
511513
514+ orchestrator_pod_labels = {
515+ "pipeline" : kube_utils .sanitize_label (pipeline_name ),
516+ }
517+
518+ if placeholder_run :
519+ orchestrator_pod_labels ["run_id" ] = kube_utils .sanitize_label (
520+ str (placeholder_run .id )
521+ )
522+ orchestrator_pod_labels ["run_name" ] = kube_utils .sanitize_label (
523+ str (placeholder_run .name )
524+ )
525+
512526 # Schedule as CRON job if CRON schedule is given.
513527 if deployment .schedule :
514528 if not deployment .schedule .cron_expression :
@@ -526,9 +540,7 @@ def submit_pipeline(
526540
527541 cron_job_manifest , secret_manifests = build_cron_job_manifest (
528542 cron_expression = cron_expression ,
529- run_name = orchestrator_run_name ,
530543 pod_name = pod_name ,
531- pipeline_name = pipeline_name ,
532544 image_name = image ,
533545 command = command ,
534546 args = args ,
@@ -545,6 +557,7 @@ def submit_pipeline(
545557 core_api = self ._k8s_core_api ,
546558 registry_credentials = registry_credentials ,
547559 local_stores_path = local_stores_path ,
560+ labels = orchestrator_pod_labels ,
548561 )
549562
550563 # Create imagePullSecrets first
@@ -569,14 +582,14 @@ def submit_pipeline(
569582 pod_manifest , secret_manifests = build_pod_manifest (
570583 run_name = orchestrator_run_name ,
571584 pod_name = pod_name ,
572- pipeline_name = pipeline_name ,
573585 image_name = image ,
574586 command = command ,
575587 args = args ,
576588 privileged = False ,
577589 pod_settings = orchestrator_pod_settings ,
578590 service_account_name = service_account_name ,
579591 env = environment ,
592+ labels = orchestrator_pod_labels ,
580593 mount_local_stores = self .config .is_local ,
581594 namespace = self .config .kubernetes_namespace ,
582595 auto_generate_image_pull_secrets = settings .auto_generate_image_pull_secrets ,
@@ -600,6 +613,11 @@ def submit_pipeline(
600613 startup_timeout = settings .pod_startup_timeout ,
601614 )
602615
616+ metadata : Dict [str , MetadataType ] = {
617+ METADATA_ORCHESTRATOR_RUN_ID : pod_name ,
618+ }
619+
620+ # Wait for the orchestrator pod to finish and stream logs.
603621 if settings .synchronous :
604622
605623 def _wait_for_run_to_finish () -> None :
@@ -616,7 +634,8 @@ def _wait_for_run_to_finish() -> None:
616634 )
617635
618636 return SubmissionResult (
619- wait_for_completion = _wait_for_run_to_finish
637+ metadata = metadata ,
638+ wait_for_completion = _wait_for_run_to_finish ,
620639 )
621640 else :
622641 logger .info (
@@ -625,7 +644,9 @@ def _wait_for_run_to_finish() -> None:
625644 f"Run the following command to inspect the logs: "
626645 f"`kubectl logs { pod_name } -n { self .config .kubernetes_namespace } `."
627646 )
628- return None
647+ return SubmissionResult (
648+ metadata = metadata ,
649+ )
629650
630651 def _get_service_account_name (
631652 self , settings : KubernetesOrchestratorSettings
@@ -770,3 +791,18 @@ def get_kubernetes_image_pull_secret_data(
770791 f"Returning ImagePullSecret data: registry='{ registry_uri } ', username='{ username [:3 ]} ***'"
771792 )
772793 return registry_uri , username , password
794+
795+ def get_pipeline_run_metadata (
796+ self , run_id : UUID
797+ ) -> Dict [str , "MetadataType" ]:
798+ """Get general component-specific metadata for a pipeline run.
799+
800+ Args:
801+ run_id: The ID of the pipeline run.
802+
803+ Returns:
804+ A dictionary of metadata.
805+ """
806+ return {
807+ METADATA_ORCHESTRATOR_RUN_ID : self .get_orchestrator_run_id (),
808+ }
0 commit comments