diff --git a/docs/book/component-guide/orchestrators/kubernetes.md b/docs/book/component-guide/orchestrators/kubernetes.md index 0c88c890464..b209e4b1502 100644 --- a/docs/book/component-guide/orchestrators/kubernetes.md +++ b/docs/book/component-guide/orchestrators/kubernetes.md @@ -130,6 +130,7 @@ Some configuration options for the Kubernetes orchestrator can only be set throu - **`skip_local_validations`** (default: False): If `True`, skips the local validations that would otherwise be performed when `local` is set. - **`parallel_step_startup_waiting_period`**: How long (in seconds) to wait between starting parallel steps, useful for distributing server load in highly parallel pipelines. - **`pass_zenml_token_as_secret`** (default: False): By default, the Kubernetes orchestrator will pass a short-lived API token to authenticate to the ZenML server as an environment variable as part of the Pod manifest. If you want this token to be stored in a Kubernetes secret instead, set `pass_zenml_token_as_secret=True` when registering your orchestrator. If you do so, make sure the service connector that you configure for your has permissions to create Kubernetes secrets. Additionally, the service account used for the Pods running your pipeline must have permissions to delete secrets, otherwise the cleanup will fail and you'll be left with orphaned secrets. +- **`auto_generate_image_pull_secrets`** (default: True): If `True`, automatically generates imagePullSecrets from container registry credentials in the stack. If `False`, relies on manually configured imagePullSecrets. When enabled, the service account used for running pipelines must have permissions to create and list secrets in the target namespace. The following configuration options can be set either through the orchestrator config or overridden using `KubernetesOrchestratorSettings` (at the pipeline or step level): @@ -464,6 +465,38 @@ kubectl logs job/ -n zenml Common issues include incorrect cron expressions, insufficient permissions for the service account, or resource constraints. +#### Required Kubernetes RBAC Permissions + +When using the automatic imagePullSecret generation feature (`auto_generate_image_pull_secrets=True`), the service account used by ZenML must have the following additional permissions in the target namespace: + +```yaml +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + namespace: zenml # or your configured namespace + name: zenml-image-pull-secret-manager +rules: +- apiGroups: [""] + resources: ["secrets"] + verbs: ["create", "get", "list", "update", "delete"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: zenml-image-pull-secret-binding + namespace: zenml # or your configured namespace +subjects: +- kind: ServiceAccount + name: zenml-service-account # or your configured service account + namespace: zenml +roleRef: + kind: Role + name: zenml-image-pull-secret-manager + apiGroup: rbac.authorization.k8s.io +``` + +If you're using a custom service account (via `service_account_name`), ensure it has these permissions. The default `edit` role granted to `zenml-service-account` includes these permissions. + For a tutorial on how to work with schedules in ZenML, check out our ['Managing Scheduled Pipelines'](https://docs.zenml.io/user-guides/tutorial/managing-scheduled-pipelines) diff --git a/docs/book/component-guide/service-connectors/connector-types/docker-service-connector.md b/docs/book/component-guide/service-connectors/connector-types/docker-service-connector.md index 77172f989b0..cefd76fc490 100644 --- a/docs/book/component-guide/service-connectors/connector-types/docker-service-connector.md +++ b/docs/book/component-guide/service-connectors/connector-types/docker-service-connector.md @@ -92,8 +92,7 @@ The 'dockerhub' Docker Service Connector connector was used to successfully conf The Docker Service Connector can be used by all Container Registry stack component flavors to authenticate to a remote Docker/OCI container registry. This allows container images to be built and published to private container registries without the need to configure explicit Docker credentials in the target environment or the Stack Component. {% hint style="warning" %} -ZenML does not yet support automatically configuring Docker credentials in container runtimes such as Kubernetes clusters (i.e. via imagePullSecrets) to allow container images to be pulled from the private container registries. This will be added in a future release. +ZenML does not yet support automatically configuring Docker credentials in some container runtimes to allow container images to be pulled from the private container registries. This will be added in a future release. In the meantime please make sure your orchestration environment is configured appropriately to pull from the docker registry. {% endhint %} -
ZenML Scarf
diff --git a/docs/book/component-guide/step-operators/kubernetes.md b/docs/book/component-guide/step-operators/kubernetes.md index 43c83e0bd6f..82ff957666c 100644 --- a/docs/book/component-guide/step-operators/kubernetes.md +++ b/docs/book/component-guide/step-operators/kubernetes.md @@ -126,6 +126,7 @@ The following configuration options can be set either through the step operator - **`pod_failure_max_retries`** (default: 3): The maximum number of times to retry a step pod if it fails to start. - **`pod_failure_retry_delay`** (default: 10): The delay (in seconds) between pod failure retries and pod startup retries. - **`pod_failure_backoff`** (default: 1.0): The backoff factor for pod failure retries and pod startup retries. +- **`auto_generate_image_pull_secrets`** (default: True): If `True`, automatically generates imagePullSecrets from container registry credentials in the stack. If `False`, relies on manually configured imagePullSecrets. When enabled, the service account used for running steps must have permissions to create and list secrets in the target namespace. ```python from zenml.integrations.kubernetes.flavors import KubernetesStepOperatorSettings diff --git a/src/zenml/container_registries/base_container_registry.py b/src/zenml/container_registries/base_container_registry.py index e7676b31b7b..1b92bf9aee5 100644 --- a/src/zenml/container_registries/base_container_registry.py +++ b/src/zenml/container_registries/base_container_registry.py @@ -94,16 +94,13 @@ def requires_authentication(self) -> bool: def credentials(self) -> Optional[Tuple[str, str]]: """Username and password to authenticate with this container registry. + Service connector credentials take precedence over direct authentication secrets. + Returns: Tuple with username and password if this container registry requires authentication, `None` otherwise. """ - secret = self.get_typed_authentication_secret( - expected_schema_type=BasicAuthSecretSchema - ) - if secret: - return secret.username, secret.password - + # Check service connector credentials first as they take precedence connector = self.get_connector() if connector: from zenml.service_connectors.docker_service_connector import ( @@ -116,6 +113,13 @@ def credentials(self) -> Optional[Tuple[str, str]]: connector.config.password.get_secret_value(), ) + # Fall back to direct authentication secrets + secret = self.get_typed_authentication_secret( + expected_schema_type=BasicAuthSecretSchema + ) + if secret: + return secret.username, secret.password + return None @property @@ -232,6 +236,46 @@ def get_image_repo_digest(self, image_name: str) -> Optional[str]: return cast(str, metadata.id.split(":")[-1]) + @property + def registry_server_uri(self) -> str: + """Get the normalized registry server URI. + + This property returns a normalized registry server URI suitable for + authentication and API access. Subclasses can override this property + to provide registry-specific normalization logic. + + Returns: + The normalized registry server URI. + """ + registry_uri = self.config.uri + + # Check if there's a service connector with a different registry setting + connector = self.get_connector() + if connector: + from zenml.service_connectors.docker_service_connector import ( + DockerServiceConnector, + ) + + if ( + isinstance(connector, DockerServiceConnector) + and connector.config.registry + ): + # Use the service connector's registry setting + registry_uri = connector.config.registry + + # Normalize registry URI for consistency + if registry_uri.startswith("https://"): + registry_uri = registry_uri[8:] + elif registry_uri.startswith("http://"): + registry_uri = registry_uri[7:] + + # For generic registries, extract just the domain part for better image matching + if not registry_uri.startswith(("http://", "https://")): + domain = registry_uri.split("/")[0] + return f"https://{domain}" + + return registry_uri + class BaseContainerRegistryFlavor(Flavor): """Base flavor for container registries.""" diff --git a/src/zenml/container_registries/dockerhub_container_registry.py b/src/zenml/container_registries/dockerhub_container_registry.py index ddffacbdf26..3213d608313 100644 --- a/src/zenml/container_registries/dockerhub_container_registry.py +++ b/src/zenml/container_registries/dockerhub_container_registry.py @@ -13,16 +13,34 @@ # permissions and limitations under the License. """Implementation of a DockerHub Container Registry class.""" -from typing import Optional +from typing import Optional, Type from zenml.constants import DOCKER_REGISTRY_RESOURCE_TYPE from zenml.container_registries.base_container_registry import ( + BaseContainerRegistry, BaseContainerRegistryFlavor, ) from zenml.enums import ContainerRegistryFlavor from zenml.models import ServiceConnectorRequirements +class DockerHubContainerRegistry(BaseContainerRegistry): + """Container registry implementation for DockerHub.""" + + @property + def registry_server_uri(self) -> str: + """Get the DockerHub registry server URI. + + DockerHub requires authentication against the specific + 'https://index.docker.io/v1/' endpoint regardless of how + the registry URI is configured. + + Returns: + The DockerHub registry server URI. + """ + return "https://index.docker.io/v1/" + + class DockerHubContainerRegistryFlavor(BaseContainerRegistryFlavor): """Class for DockerHub Container Registry.""" @@ -80,3 +98,12 @@ def logo_url(self) -> str: The flavor logo. """ return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/container_registry/docker.png" + + @property + def implementation_class(self) -> Type[DockerHubContainerRegistry]: + """Implementation class for DockerHub container registry. + + Returns: + The DockerHub container registry implementation class. + """ + return DockerHubContainerRegistry diff --git a/src/zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py b/src/zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py index 7f2bd57f155..2b4e9b3cfeb 100644 --- a/src/zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py +++ b/src/zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py @@ -69,6 +69,9 @@ class KubernetesOrchestratorSettings(BaseSettings): scheduling a pipeline. prevent_orchestrator_pod_caching: If `True`, the orchestrator pod will not try to compute cached steps before starting the step pods. + auto_generate_image_pull_secrets: If `True`, automatically generates + imagePullSecrets from container registry credentials in the stack. + If `False`, relies on manually configured imagePullSecrets. """ synchronous: bool = True @@ -88,6 +91,7 @@ class KubernetesOrchestratorSettings(BaseSettings): failed_jobs_history_limit: Optional[NonNegativeInt] = None ttl_seconds_after_finished: Optional[NonNegativeInt] = None prevent_orchestrator_pod_caching: bool = False + auto_generate_image_pull_secrets: bool = True class KubernetesOrchestratorConfig( diff --git a/src/zenml/integrations/kubernetes/flavors/kubernetes_step_operator_flavor.py b/src/zenml/integrations/kubernetes/flavors/kubernetes_step_operator_flavor.py index 7e28ad1d40a..a6cbe6fe589 100644 --- a/src/zenml/integrations/kubernetes/flavors/kubernetes_step_operator_flavor.py +++ b/src/zenml/integrations/kubernetes/flavors/kubernetes_step_operator_flavor.py @@ -43,6 +43,9 @@ class KubernetesStepOperatorSettings(BaseSettings): failure retries and pod startup retries (in seconds) pod_failure_backoff: The backoff factor for pod failure retries and pod startup retries. + auto_generate_image_pull_secrets: If `True`, automatically generates + imagePullSecrets from container registry credentials in the stack. + If `False`, relies on manually configured imagePullSecrets. """ pod_settings: Optional[KubernetesPodSettings] = None @@ -52,6 +55,7 @@ class KubernetesStepOperatorSettings(BaseSettings): pod_failure_max_retries: int = 3 pod_failure_retry_delay: int = 10 pod_failure_backoff: float = 1.0 + auto_generate_image_pull_secrets: bool = True class KubernetesStepOperatorConfig( diff --git a/src/zenml/integrations/kubernetes/orchestrators/kube_utils.py b/src/zenml/integrations/kubernetes/orchestrators/kube_utils.py index ca3193c5af0..541325e623e 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/kube_utils.py +++ b/src/zenml/integrations/kubernetes/orchestrators/kube_utils.py @@ -386,6 +386,7 @@ def create_secret( namespace: str, secret_name: str, data: Dict[str, Optional[str]], + secret_type: str = "Opaque", ) -> None: """Create a Kubernetes secret. @@ -394,10 +395,13 @@ def create_secret( namespace: The namespace in which to create the secret. secret_name: The name of the secret to create. data: The secret data. + secret_type: The secret type. """ core_api.create_namespaced_secret( namespace=namespace, - body=build_secret_manifest(name=secret_name, data=data), + body=build_secret_manifest( + name=secret_name, data=data, secret_type=secret_type + ), ) @@ -416,10 +420,20 @@ def update_secret( data: The secret data. If the value is None, the key will be removed from the secret. """ + # For updates, we only patch the data field, not the type (which is immutable) + import base64 + + encoded_data = { + key: base64.b64encode(value.encode()).decode() if value else None + for key, value in data.items() + } + + patch_body = {"data": encoded_data} + core_api.patch_namespaced_secret( namespace=namespace, name=secret_name, - body=build_secret_manifest(name=secret_name, data=data), + body=patch_body, ) @@ -428,6 +442,7 @@ def create_or_update_secret( namespace: str, secret_name: str, data: Dict[str, Optional[str]], + secret_type: str = "Opaque", ) -> None: """Create a Kubernetes secret if it doesn't exist, or update it if it does. @@ -437,17 +452,85 @@ def create_or_update_secret( secret_name: The name of the secret to create or update. data: The secret data. If the value is None, the key will be removed from the secret. + secret_type: The secret type. Raises: ApiException: If the secret creation failed for any reason other than the secret already existing. """ try: - create_secret(core_api, namespace, secret_name, data) + create_secret(core_api, namespace, secret_name, data, secret_type) except ApiException as e: if e.status != 409: raise - update_secret(core_api, namespace, secret_name, data) + + # Check if the existing secret has a different type + try: + existing_secret = core_api.read_namespaced_secret( + name=secret_name, namespace=namespace + ) + if existing_secret.type != secret_type: + # Delete the secret if the type is different (type is immutable) + delete_secret(core_api, namespace, secret_name) + create_secret( + core_api, namespace, secret_name, data, secret_type + ) + else: + # Same type, just update the data + update_secret(core_api, namespace, secret_name, data) + except ApiException: + # If we can't read the existing secret, try to update anyway + update_secret(core_api, namespace, secret_name, data) + + +def create_or_update_secret_from_manifest( + core_api: k8s_client.CoreV1Api, + secret_manifest: Dict[str, Any], +) -> None: + """Create or update a Kubernetes secret from a complete manifest. + + Args: + core_api: Client of Core V1 API of Kubernetes API. + secret_manifest: Complete Kubernetes secret manifest dict. + + Raises: + ApiException: If the secret creation failed for any reason other than + the secret already existing. + """ + namespace = secret_manifest["metadata"]["namespace"] + secret_name = secret_manifest["metadata"]["name"] + + # Extract data from manifest - handle both 'data' and 'stringData' fields + secret_data = {} + + # Handle base64-encoded 'data' field + if "data" in secret_manifest: + import base64 + + for key, encoded_value in secret_manifest["data"].items(): + if encoded_value is not None: + # Decode base64 data back to string + secret_data[key] = base64.b64decode(encoded_value).decode( + "utf-8" + ) + else: + secret_data[key] = None + + # Handle plain text 'stringData' field + if "stringData" in secret_manifest: + secret_data.update(secret_manifest["stringData"]) + + # Extract secret type from manifest, default to "Opaque" if not specified + secret_type = secret_manifest.get("type", "Opaque") + + # Use the existing create_or_update_secret function + create_or_update_secret( + core_api=core_api, + namespace=namespace, + secret_name=secret_name, + data=secret_data, + secret_type=secret_type, + ) def delete_secret( diff --git a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py index c3e008fbe1b..28d5672f603 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py +++ b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py @@ -61,6 +61,7 @@ from zenml.integrations.kubernetes.orchestrators.manifest_utils import ( build_cron_job_manifest, build_pod_manifest, + create_image_pull_secrets_from_manifests, ) from zenml.integrations.kubernetes.pod_settings import KubernetesPodSettings from zenml.logger import get_logger @@ -531,7 +532,13 @@ def submit_pipeline( "schedule. Use `Schedule(cron_expression=...)` instead." ) cron_expression = deployment.schedule.cron_expression - cron_job_manifest = build_cron_job_manifest( + + # Prepare dependencies for pod creation + registry_credentials, local_stores_path = ( + self._prepare_pod_dependencies(settings, stack) + ) + + cron_job_manifest, secret_manifests = build_cron_job_manifest( cron_expression=cron_expression, pod_name=pod_name, image_name=image, @@ -545,9 +552,17 @@ def submit_pipeline( successful_jobs_history_limit=settings.successful_jobs_history_limit, failed_jobs_history_limit=settings.failed_jobs_history_limit, ttl_seconds_after_finished=settings.ttl_seconds_after_finished, + namespace=self.config.kubernetes_namespace, + auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets, + core_api=self._k8s_core_api, + registry_credentials=registry_credentials, + local_stores_path=local_stores_path, labels=orchestrator_pod_labels, ) + # Create imagePullSecrets first + self._create_image_pull_secrets(secret_manifests) + self._k8s_batch_api.create_namespaced_cron_job( body=cron_job_manifest, namespace=self.config.kubernetes_namespace, @@ -558,8 +573,13 @@ def submit_pipeline( ) return None else: + # Prepare dependencies for pod creation + registry_credentials, local_stores_path = ( + self._prepare_pod_dependencies(settings, stack) + ) + # Create and run the orchestrator pod. - pod_manifest = build_pod_manifest( + pod_manifest, secret_manifests = build_pod_manifest( pod_name=pod_name, image_name=image, command=command, @@ -570,8 +590,16 @@ def submit_pipeline( env=environment, labels=orchestrator_pod_labels, mount_local_stores=self.config.is_local, + namespace=self.config.kubernetes_namespace, + auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets, + core_api=self._k8s_core_api, + registry_credentials=registry_credentials, + local_stores_path=local_stores_path, ) + # Create imagePullSecrets first + self._create_image_pull_secrets(secret_manifests) + kube_utils.create_and_wait_for_pod_to_start( core_api=self._k8s_core_api, pod_display_name="Kubernetes orchestrator pod", @@ -645,6 +673,49 @@ def _get_service_account_name( ) return service_account_name + def _prepare_pod_dependencies( + self, settings: KubernetesOrchestratorSettings, stack: "Stack" + ) -> Tuple[Optional[Tuple[str, str, str]], Optional[str]]: + """Prepare dependencies needed for pod manifest creation. + + Args: + settings: The orchestrator settings. + stack: The stack the pipeline will run on. + + Returns: + Tuple of (registry_credentials, local_stores_path). + """ + # Get registry credentials if auto-generation is enabled + registry_credentials = None + if settings.auto_generate_image_pull_secrets: + registry_credentials = self.get_kubernetes_image_pull_secret_data( + stack.container_registry + ) + + # Get local stores path if mounting local stores + local_stores_path = None + if self.config.is_local: + from zenml.config.global_config import GlobalConfiguration + + stack.check_local_paths() + local_stores_path = GlobalConfiguration().local_stores_path + + return registry_credentials, local_stores_path + + def _create_image_pull_secrets( + self, + secret_manifests: List[Dict[str, str]], + ) -> None: + """Create imagePullSecrets in the cluster. + + Args: + secret_manifests: List of secret manifests for imagePullSecrets. + """ + create_image_pull_secrets_from_manifests( + secret_manifests=secret_manifests, + core_api=self._k8s_core_api, + ) + def get_orchestrator_run_id(self) -> str: """Returns the active orchestrator run id. @@ -663,6 +734,61 @@ def get_orchestrator_run_id(self) -> str: f"{ENV_ZENML_KUBERNETES_RUN_ID}." ) + def get_kubernetes_image_pull_secret_data( + self, container_registry + ) -> Optional[Tuple[str, str, str]]: + """Get container registry credentials for Kubernetes imagePullSecrets. + + This method extracts credentials from the container registry for use + in Kubernetes imagePullSecrets. + + Args: + container_registry: The container registry to get credentials from. + + Returns: + Tuple of (registry_uri, username, password) if credentials are available, + None otherwise. The registry_uri is normalized for use in Kubernetes + imagePullSecrets. + """ + from zenml.logger import get_logger + + logger = get_logger(__name__) + + # Check if this is a local Kubernetes orchestrator + if self.config.is_local: + logger.debug( + "Skipping ImagePullSecret generation for local Kubernetes orchestrator" + ) + return None + + logger.debug( + f"Getting ImagePullSecret data for registry: {container_registry.config.uri}" + ) + + credentials = container_registry.credentials + if not credentials: + logger.debug("No credentials found for container registry") + return None + + username, password = credentials + registry_uri = container_registry.registry_server_uri + + logger.debug( + f"Found credentials - username: {username[:3]}***, registry_uri: {registry_uri}" + ) + + # Validate the final result + if not registry_uri or not username or not password: + logger.warning( + f"Invalid ImagePullSecret data: registry_uri='{registry_uri}', username='{username}', password_length={len(password) if password else 0}" + ) + return None + + logger.debug( + f"Returning ImagePullSecret data: registry='{registry_uri}', username='{username[:3]}***'" + ) + return registry_uri, username, password + def get_pipeline_run_metadata( self, run_id: UUID ) -> Dict[str, "MetadataType"]: diff --git a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py index a6f6b99594c..408cfc5e1d8 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py +++ b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py @@ -36,6 +36,8 @@ ) from zenml.integrations.kubernetes.orchestrators.manifest_utils import ( build_pod_manifest, + cleanup_old_image_pull_secrets, + create_image_pull_secrets_from_manifests, ) from zenml.logger import get_logger from zenml.orchestrators import publish_utils @@ -235,8 +237,19 @@ def run_step_on_kubernetes(step_name: str) -> None: } ) - # Define Kubernetes pod manifest. - pod_manifest = build_pod_manifest( + logger.info(f"Container registry: {active_stack.container_registry}") + + # Get registry credentials for imagePullSecrets + registry_credentials = None + if pipeline_settings.auto_generate_image_pull_secrets: + registry_credentials = ( + orchestrator.get_kubernetes_image_pull_secret_data( + active_stack.container_registry + ) + ) + + # Define Kubernetes pod manifest and any required secrets. + pod_manifest, secret_manifests = build_pod_manifest( pod_name=pod_name, image_name=image, command=step_command, @@ -248,9 +261,20 @@ def run_step_on_kubernetes(step_name: str) -> None: or settings.service_account_name, mount_local_stores=mount_local_stores, owner_references=owner_references, + namespace=namespace, + auto_generate_image_pull_secrets=pipeline_settings.auto_generate_image_pull_secrets, + registry_credentials=registry_credentials, + core_api=core_api, labels=step_pod_labels, ) + # Step pods should reuse secrets created by the orchestrator pod + # Only create secrets if they don't already exist + create_image_pull_secrets_from_manifests( + secret_manifests=secret_manifests, + core_api=core_api, + ) + kube_utils.create_and_wait_for_pod_to_start( core_api=core_api, pod_display_name=f"pod for step `{step_name}`", @@ -363,6 +387,16 @@ def finalize_run(node_states: Dict[str, NodeStatus]) -> None: except k8s_client.rest.ApiException as e: logger.error(f"Error cleaning up secret {secret_name}: {e}") + # Clean up old imagePullSecrets to prevent accumulation + # Only clean up for non-scheduled runs to avoid interfering with running schedules + if deployment.schedule is None: + logger.info("Cleaning up old imagePullSecrets...") + cleanup_old_image_pull_secrets( + core_api=core_api, + namespace=namespace, + max_age_hours=24, # Keep secrets for 24 hours + ) + if __name__ == "__main__": main() diff --git a/src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py b/src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py index b33be6a17d6..6dcfdbb36d9 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py +++ b/src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py @@ -14,44 +14,317 @@ """Utility functions for building manifests for k8s pods.""" import base64 +import json import os import sys -from typing import Any, Dict, List, Mapping, Optional +import time +from typing import Any, Dict, List, Mapping, Optional, Tuple from kubernetes import client as k8s_client -from zenml.client import Client -from zenml.config.global_config import GlobalConfiguration from zenml.constants import ENV_ZENML_ENABLE_REPO_INIT_WARNINGS from zenml.integrations.airflow.orchestrators.dag_generator import ( ENV_ZENML_LOCAL_STORES_PATH, ) +from zenml.integrations.kubernetes.orchestrators import kube_utils from zenml.integrations.kubernetes.pod_settings import KubernetesPodSettings from zenml.logger import get_logger logger = get_logger(__name__) +def _create_image_pull_secret_data( + registry_uri: str, username: str, password: str +) -> Dict[str, str]: + """Create Docker registry authentication data for imagePullSecrets. + + Args: + registry_uri: The normalized registry server URI. + username: The registry username. + password: The registry password or token. + + Returns: + Dictionary containing the base64-encoded .dockerconfigjson data. + """ + # Create Docker config JSON - credentials are safely encoded in base64 + # This handles special characters that might cause issues in CLI usage + docker_config = { + "auths": { + registry_uri: { + "username": username, + "password": password, + "auth": base64.b64encode( + f"{username}:{password}".encode() + ).decode(), + } + } + } + + return { + ".dockerconfigjson": base64.b64encode( + json.dumps(docker_config).encode() + ).decode() + } + + +def _should_refresh_image_pull_secret( + secret_name: str, namespace: str, core_api +) -> bool: + """Check if an existing imagePullSecret needs to be refreshed. + + Args: + secret_name: Name of the secret to check. + namespace: Kubernetes namespace. + core_api: Kubernetes Core API client. + + Returns: + True if the secret should be refreshed, False otherwise. + """ + try: + secret = core_api.read_namespaced_secret( + name=secret_name, namespace=namespace + ) + + # Check if secret has refresh annotations + annotations = secret.metadata.annotations or {} + refresh_after_str = annotations.get("zenml.io/refresh-after") + + if not refresh_after_str: + # No refresh annotation, assume it needs refresh + return True + + refresh_after = int(refresh_after_str) + current_time = int(time.time()) + + # Refresh if current time is past the refresh time + return current_time >= refresh_after + + except Exception: + # Secret doesn't exist or can't be read, needs creation + return True + + +def _generate_image_pull_secrets( + namespace: str = "default", + registry_credentials: Optional[Tuple[str, str, str]] = None, + force_refresh: bool = False, + core_api=None, +) -> Tuple[List[Dict[str, Any]], List[k8s_client.V1LocalObjectReference]]: + """Generate Kubernetes secrets and references for container registry credentials. + + Args: + namespace: The Kubernetes namespace to create secrets in. + registry_credentials: Tuple of (registry_uri, username, password). + force_refresh: If True, forces regeneration of secrets even if they exist. + core_api: Optional Kubernetes Core API client for checking existing secrets. + + Returns: + Tuple of (secret_manifests, local_object_references) where: + - secret_manifests: List of Kubernetes secret manifests to create + - local_object_references: List of V1LocalObjectReference objects for imagePullSecrets + """ + if not registry_credentials: + return [], [] + + credentials = [registry_credentials] + + secret_manifests = [] + local_object_references = [] + + # Check if credentials need refresh (for service connectors) + needs_refresh = force_refresh + + for i, (registry_uri, username, password) in enumerate(credentials): + # Create a unique secret name for this registry + # Use registry URI to make it more descriptive and avoid conflicts + # Remove protocol and normalize for Kubernetes naming rules + safe_registry_name = registry_uri + # Remove protocol prefixes + if safe_registry_name.startswith("https://"): + safe_registry_name = safe_registry_name[8:] + elif safe_registry_name.startswith("http://"): + safe_registry_name = safe_registry_name[7:] + + # Replace invalid characters for Kubernetes names + safe_registry_name = ( + safe_registry_name.replace(".", "-") + .replace("/", "-") + .replace(":", "-") + .replace("_", "-") + .lower() + ) + + # Ensure it starts and ends with alphanumeric character + safe_registry_name = safe_registry_name.strip("-") + if not safe_registry_name: + safe_registry_name = "registry" + + secret_name = f"zenml-registry-{safe_registry_name}-{i}"[ + :63 + ] # K8s name limit + + # Final validation: ensure name is valid for Kubernetes + # Must start and end with alphanumeric character + if not secret_name[0].isalnum(): + secret_name = f"r{secret_name[1:]}" + if not secret_name[-1].isalnum(): + secret_name = f"{secret_name[:-1]}r" + + # Check if secret needs refresh + should_refresh = needs_refresh or ( + core_api + and _should_refresh_image_pull_secret( + secret_name, namespace, core_api + ) + ) + + # Always add to local object references for pod spec + local_object_references.append( + k8s_client.V1LocalObjectReference(name=secret_name) + ) + + # Only include in manifests if it needs to be created/updated + if not should_refresh: + continue + + # Create the secret data + secret_data = _create_image_pull_secret_data( + registry_uri, username, password + ) + + # Add metadata for credential refresh tracking + current_time = int(time.time()) + # Default refresh interval: 1 hour (3600 seconds) + # This is conservative for most cloud providers that have longer-lived tokens + refresh_interval = 3600 + + # Create the secret manifest + secret_manifest = { + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "name": secret_name, + "namespace": namespace, + "labels": { + "app.kubernetes.io/name": "zenml", + "app.kubernetes.io/component": "image-pull-secret", + "app.kubernetes.io/managed-by": "zenml", + }, + "annotations": { + "zenml.io/created-at": str(current_time), + "zenml.io/refresh-after": str( + current_time + refresh_interval + ), + "zenml.io/registry-uri": registry_uri, + }, + }, + "type": "kubernetes.io/dockerconfigjson", + "data": secret_data, + } + + secret_manifests.append(secret_manifest) + + return secret_manifests, local_object_references + + +def create_image_pull_secrets_from_manifests( + secret_manifests: List[Dict[str, Any]], + core_api, +) -> None: + """Create imagePullSecrets from manifests. + + Args: + secret_manifests: List of secret manifests to create. + core_api: Kubernetes Core API client. + """ + for secret_manifest in secret_manifests: + secret_name = secret_manifest["metadata"]["name"] + + try: + kube_utils.create_or_update_secret_from_manifest( + core_api=core_api, + secret_manifest=secret_manifest, + ) + logger.debug( + f"Successfully created/updated imagePullSecret {secret_name}" + ) + except Exception as e: + logger.error( + f"Failed to create imagePullSecret {secret_name}: {e}" + ) + raise + + +def cleanup_old_image_pull_secrets( + core_api, + namespace: str = "default", + max_age_hours: int = 24, +) -> None: + """Clean up old ZenML-managed imagePullSecrets. + + Args: + core_api: Kubernetes Core API client. + namespace: The Kubernetes namespace to clean up secrets in. + max_age_hours: Maximum age in hours for secrets to be kept. + Secrets older than this will be deleted. + """ + try: + # List all secrets in the namespace with ZenML labels + secret_list = core_api.list_namespaced_secret( + namespace=namespace, + label_selector="app.kubernetes.io/managed-by=zenml,app.kubernetes.io/component=image-pull-secret", + ) + + current_time = int(time.time()) + max_age_seconds = max_age_hours * 3600 + + for secret in secret_list.items: + try: + # Check if secret has creation time annotation + annotations = secret.metadata.annotations or {} + created_at_str = annotations.get("zenml.io/created-at") + + if not created_at_str: + # No creation time annotation, skip + continue + + created_at = int(created_at_str) + age_seconds = current_time - created_at + + # Delete if older than max age + if age_seconds > max_age_seconds: + logger.info( + f"Cleaning up old imagePullSecret {secret.metadata.name} " + f"(age: {age_seconds // 3600}h)" + ) + core_api.delete_namespaced_secret( + name=secret.metadata.name, + namespace=namespace, + ) + except Exception as e: + logger.warning( + f"Failed to process secret {secret.metadata.name} for cleanup: {e}" + ) + + except Exception as e: + logger.warning(f"Failed to cleanup old imagePullSecrets: {e}") + + def add_local_stores_mount( pod_spec: k8s_client.V1PodSpec, + local_stores_path: str, ) -> None: """Makes changes in place to the configuration of the pod spec. - Configures mounted volumes for stack components that write to a local - path. + Configures mounted volumes for local storage paths. Args: pod_spec: The pod spec to update. + local_stores_path: The local stores path to mount. """ assert len(pod_spec.containers) == 1 container_spec: k8s_client.V1Container = pod_spec.containers[0] - stack = Client().active_stack - - stack.check_local_paths() - - local_stores_path = GlobalConfiguration().local_stores_path - host_path = k8s_client.V1HostPathVolumeSource( path=local_stores_path, type="Directory" ) @@ -106,7 +379,12 @@ def build_pod_manifest( labels: Optional[Dict[str, str]] = None, mount_local_stores: bool = False, owner_references: Optional[List[k8s_client.V1OwnerReference]] = None, -) -> k8s_client.V1Pod: + auto_generate_image_pull_secrets: bool = True, + namespace: str = "default", + core_api=None, + registry_credentials: Optional[Tuple[str, str, str]] = None, + local_stores_path: Optional[str] = None, +) -> Tuple[k8s_client.V1Pod, List[Dict[str, Any]]]: """Build a Kubernetes pod manifest for a ZenML run or step. Args: @@ -124,9 +402,18 @@ def build_pod_manifest( mount_local_stores: Whether to mount the local stores path inside the pod. owner_references: List of owner references for the pod. + auto_generate_image_pull_secrets: Whether to automatically generate + imagePullSecrets from registry credentials. + namespace: The Kubernetes namespace to create secrets in. + core_api: Optional Kubernetes Core API client for checking existing secrets. + registry_credentials: Optional tuple of (registry_uri, username, password) + for generating imagePullSecrets. + local_stores_path: Optional local stores path to mount when mount_local_stores is True. Returns: - Pod manifest. + Tuple of (pod_manifest, secret_manifests) where: + - pod_manifest: The Kubernetes pod manifest + - secret_manifests: List of secret manifests for imagePullSecrets """ env = env.copy() if env else {} env.setdefault(ENV_ZENML_ENABLE_REPO_INIT_WARNINGS, "False") @@ -143,12 +430,37 @@ def build_pod_manifest( ], security_context=security_context, ) + # Handle imagePullSecrets - combine manual and auto-generated + # This maintains backward compatibility by preserving existing manual configurations + # while adding automatic registry authentication when available. image_pull_secrets = [] - if pod_settings: - image_pull_secrets = [ - k8s_client.V1LocalObjectReference(name=name) - for name in pod_settings.image_pull_secrets - ] + secret_manifests = [] + + # Add manually configured imagePullSecrets from pod_settings first + # This ensures existing configurations continue to work unchanged + if pod_settings and pod_settings.image_pull_secrets: + image_pull_secrets.extend( + [ + k8s_client.V1LocalObjectReference(name=name) + for name in pod_settings.image_pull_secrets + ] + ) + + # Auto-generate imagePullSecrets from container registry credentials + if auto_generate_image_pull_secrets and registry_credentials: + try: + generated_secrets, generated_refs = _generate_image_pull_secrets( + namespace=namespace, + registry_credentials=registry_credentials, + core_api=core_api, + ) + secret_manifests.extend(generated_secrets) + image_pull_secrets.extend(generated_refs) + except Exception as e: + logger.warning( + f"Failed to auto-generate imagePullSecrets from container " + f"registry credentials: {e}. Falling back to manual configuration." + ) pod_spec = k8s_client.V1PodSpec( containers=[container_spec], @@ -184,10 +496,10 @@ def build_pod_manifest( spec=pod_spec, ) - if mount_local_stores: - add_local_stores_mount(pod_spec) + if mount_local_stores and local_stores_path: + add_local_stores_mount(pod_spec, local_stores_path) - return pod_manifest + return pod_manifest, secret_manifests def add_pod_settings( @@ -273,7 +585,12 @@ def build_cron_job_manifest( successful_jobs_history_limit: Optional[int] = None, failed_jobs_history_limit: Optional[int] = None, ttl_seconds_after_finished: Optional[int] = None, -) -> k8s_client.V1CronJob: + auto_generate_image_pull_secrets: bool = True, + namespace: str = "default", + core_api=None, + registry_credentials: Optional[Tuple[str, str, str]] = None, + local_stores_path: Optional[str] = None, +) -> Tuple[k8s_client.V1CronJob, List[Dict[str, Any]]]: """Create a manifest for launching a pod as scheduled CRON job. Args: @@ -295,11 +612,20 @@ def build_cron_job_manifest( failed_jobs_history_limit: The number of failed jobs to retain. ttl_seconds_after_finished: The amount of seconds to keep finished jobs before deleting them. + auto_generate_image_pull_secrets: Whether to automatically generate + imagePullSecrets from registry credentials. + namespace: The Kubernetes namespace to create secrets in. + core_api: Optional Kubernetes Core API client for checking existing secrets. + registry_credentials: Optional tuple of (registry_uri, username, password) + for generating imagePullSecrets. + local_stores_path: Optional local stores path to mount when mount_local_stores is True. Returns: - CRON job manifest. + Tuple of (cron_job_manifest, secret_manifests) where: + - cron_job_manifest: The Kubernetes CronJob manifest + - secret_manifests: List of secret manifests for imagePullSecrets """ - pod_manifest = build_pod_manifest( + pod_manifest, secret_manifests = build_pod_manifest( pod_name=pod_name, image_name=image_name, command=command, @@ -310,6 +636,11 @@ def build_cron_job_manifest( env=env, labels=labels, mount_local_stores=mount_local_stores, + auto_generate_image_pull_secrets=auto_generate_image_pull_secrets, + namespace=namespace, + core_api=core_api, + registry_credentials=registry_credentials, + local_stores_path=local_stores_path, ) job_spec = k8s_client.V1CronJobSpec( @@ -335,7 +666,7 @@ def build_cron_job_manifest( spec=job_spec, ) - return job_manifest + return job_manifest, secret_manifests def build_role_binding_manifest_for_service_account( diff --git a/src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py b/src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py index 76c978aecf3..b48f362fd44 100644 --- a/src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py +++ b/src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py @@ -27,6 +27,7 @@ from zenml.integrations.kubernetes.orchestrators import kube_utils from zenml.integrations.kubernetes.orchestrators.manifest_utils import ( build_pod_manifest, + create_image_pull_secrets_from_manifests, ) from zenml.logger import get_logger from zenml.stack import Stack, StackValidator @@ -204,7 +205,8 @@ def launch( args = entrypoint_command[3:] # Create and run the orchestrator pod. - pod_manifest = build_pod_manifest( + pod_manifest, secret_manifests = build_pod_manifest( + run_name=info.run_name, pod_name=pod_name, image_name=image_name, command=command, @@ -214,6 +216,17 @@ def launch( pod_settings=settings.pod_settings, env=environment, mount_local_stores=False, + namespace=self.config.kubernetes_namespace, + auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets, + core_api=self._k8s_core_api, + ) + + # Create imagePullSecrets, reusing existing ones + create_image_pull_secrets_from_manifests( + secret_manifests=secret_manifests, + core_api=self._k8s_core_api, + namespace=self.config.kubernetes_namespace, + reuse_existing=True, # Step operator reuses orchestrator-created secrets labels={ "run_id": kube_utils.sanitize_label(str(info.run_id)), "pipeline": kube_utils.sanitize_label(info.pipeline.name),