diff --git a/docs/book/component-guide/deployers/README.md b/docs/book/component-guide/deployers/README.md index 173423301b3..073f5be9fe7 100644 --- a/docs/book/component-guide/deployers/README.md +++ b/docs/book/component-guide/deployers/README.md @@ -31,8 +31,9 @@ Out of the box, ZenML comes with a `local` deployer already part of the default |------------------------------------|-----------|---------------|------------------------------------------------------------------------------| | [Local](local.md) | `local` | _built-in_ | This is the default Deployer. It deploys pipelines on your local machine in the form of background processes. Should be used only for running ZenML locally. | | [Docker](docker.md) | `docker` | Built-in | Deploys pipelines as locally running Docker containers | -| [GCP Cloud Run](gcp-cloud-run.md) | `gcp` | `gcp` | Deploys pipelines to Google Cloud Run for serverless execution | -| [AWS App Runner](aws-app-runner.md) | `aws` | `aws` | Deploys pipelines to AWS App Runner for serverless execution | +| [Kubernetes](kubernetes.md) | `kubernetes` | `kubernetes` | Deploys pipelines to a Kubernetes cluster with customizable services, replicas, and pod settings | +| [GCP Cloud Run](gcp-cloud-run.md) | `gcp` | `gcp` | Deploys pipelines to Google Cloud Run for serverless execution | +| [AWS App Runner](aws-app-runner.md)| `aws` | `aws` | Deploys pipelines to AWS App Runner for serverless execution | If you would like to see the available flavors of deployers, you can use the command: diff --git a/docs/book/component-guide/deployers/kubernetes.md b/docs/book/component-guide/deployers/kubernetes.md new file mode 100644 index 00000000000..55ec7caae43 --- /dev/null +++ b/docs/book/component-guide/deployers/kubernetes.md @@ -0,0 +1,433 @@ +--- +description: Deploy ZenML pipeline services on Kubernetes clusters. +--- + +# Kubernetes Deployer + +The Kubernetes deployer is a [deployer](./) flavor provided by the Kubernetes +integration. It provisions ZenML pipeline deployments as long-running services +inside a Kubernetes cluster. + +{% hint style="warning" %} +This component is only meant to be used within the context of a [remote ZenML installation](https://docs.zenml.io/getting-started/deploying-zenml/). Usage with a local ZenML setup may lead to unexpected behavior! +{% endhint %} + +## When to use it + +Reach for the Kubernetes deployer when you need: + +* you're already using Kubernetes. +* **Production-grade serving** with managed Kubernetes clusters (EKS, GKE, AKS, + etc.) or self-hosted clusters. +* **Multi-replica deployments** and load-balanced access to your pipeline + service. +* **Fine-grained pod customization** (resources, tolerations, secrets, + affinity, custom command/args, probes). +* **Cluster networking integrations** (e.g., exposing services via + LoadBalancer, NodePort, or internal ClusterIP plus your own Ingress/mesh). + +If you only need local development or single-node deployments, consider the +[local](local.md) or [docker](docker.md) deployer flavors instead. + +## How to deploy it + +1. Install the Kubernetes integration: + + ```bash + zenml integration install kubernetes + ``` + +2. Ensure your stack contains a container registry and an image builder that the + Kubernetes cluster can access. Remote clusters require remote registries. + +3. Configure access to your cluster. You can either: + + - Provide a kubeconfig context via `kubernetes_context`. + - Run the client inside the cluster (`incluster=True`). + - Link a [Kubernetes service connector](../../service-connectors/connector-types/kubernetes-cluster.md). + +4. Register and activate the deployer: + + ```bash + zenml deployer register k8s-deployer \ + --flavor=kubernetes \ + --kubernetes_namespace=zenml-deployments \ + + zenml stack register prod-stack \ + -o default -a default \ + -c dockerhub \ + -D k8s-deployer \ + --set + ``` + + If you use a service connector: + + ```bash + zenml service-connector register k8s-connector \ + --type kubernetes-cluster \ + --use-kubeconfig + + zenml deployer connect k8s-deployer --connector k8s-connector + ``` + +## How to use it + +Once the deployer is part of your active stack you can deploy pipelines or +snapshots exactly like with other flavors: + +```bash +zenml pipeline deploy my_module.my_pipeline --deployment k8s-example +zenml deployment invoke k8s-example --json '{"parameters": {"name": "Ada"}}' +``` + +You can also configure deployer-specific settings directly on the pipeline: + +```python +from zenml import pipeline, step +from zenml.integrations.kubernetes.deployers import KubernetesDeployerSettings + + +@step +def greet(name: str) -> str: + return f"Hello {name}!" + + +@pipeline( + settings={ + "deployer": KubernetesDeployerSettings( + namespace="team-namespace", + replicas=2, + service_type="ClusterIP", + service_annotations={"alb.ingress.kubernetes.io/scheme": "internet-facing"}, + ) + } +) +def greeting_pipeline(name: str = "ZenML") -> str: + return greet(name=name) +``` + +## Configuration reference + +The deployer combines two configuration layers: + +- `KubernetesDeployerConfig` (component-level configuration): + - `kubernetes_context`: kubeconfig context to use when no connector is + linked. Required for out-of-cluster clients. + - `incluster`: set to `True` if the client runs inside the target cluster. + - `kubernetes_namespace`: default namespace for deployments (default + `zenml-deployments`). +- `KubernetesDeployerSettings` (pipeline/deployment-level overrides): + - **Networking** + - `namespace`: override namespace per deployment. + - `service_type`: `LoadBalancer`, `NodePort`, or `ClusterIP`. + - `service_port`: exposed container port (default `8000`). + - `node_port`: explicit NodePort (30000–32767) when using `NodePort`. + - `session_affinity`: set to `ClientIP` for sticky sessions. + - `load_balancer_ip`: pre-allocated LoadBalancer IP. + - `load_balancer_source_ranges`: CIDR ranges allowed to reach the service. + - `service_annotations`: attach provider-specific annotations (e.g. ALB, + firewall rules). + - **Ingress** (for production HTTP/HTTPS access) + - `ingress_enabled`: create an Ingress resource (default `False`). + - `ingress_class`: ingress controller class name (e.g., `nginx`, `traefik`). + - `ingress_host`: hostname for the Ingress (e.g., `app.example.com`). + - `ingress_path`: path prefix (default `/`). + - `ingress_path_type`: `Prefix`, `Exact`, or `ImplementationSpecific`. + - `ingress_tls_enabled`: enable TLS/HTTPS (default `False`). + - `ingress_tls_secret_name`: Kubernetes Secret containing TLS certificate. + - `ingress_annotations`: controller-specific annotations (rewrite rules, rate limits, etc.). + - **Image & command** + - `image_pull_policy`: `IfNotPresent`, `Always`, or `Never`. + - `image_pull_secrets`: reference Kubernetes image pull secrets. + - `command` / `args`: override container entrypoint/arguments. + - **Health probes** + - `readiness_probe_*` and `liveness_probe_*`: tune probe timings, thresholds, + and timeouts. + - **Authorization & customization** + - `service_account_name`: run pods under a specific service account. + - `labels` / `annotations`: attach metadata to all managed resources. + - `pod_settings`: use + [`KubernetesPodSettings`](../../orchestrators/kubernetes.md#customize-pod-specs) + to mount volumes, set node selectors, tolerations, affinity rules, etc. + - `hpa_manifest`: optional HorizontalPodAutoscaler manifest (dict) for automatic scaling. + See [Horizontal Pod Autoscaling](#horizontal-pod-autoscaling-hpa). + +## Resource Configuration + +You can specify the resource and scaling requirements for your pipeline deployment using the `ResourceSettings` class at the pipeline level: + +```python +from zenml import pipeline, step +from zenml.config import ResourceSettings + + +@step +def greet(name: str) -> str: + return f"Hello {name}!" + + +resource_settings = ResourceSettings( + cpu_count=2, # 2 CPU cores + memory="4GB", # 4 GB RAM + min_replicas=1, # Minimum 1 pod + max_replicas=5, # Maximum 5 pods (for autoscaling) +) + +@pipeline(settings={"resources": resource_settings}) +def greeting_pipeline(name: str = "ZenML") -> str: + return greet(name=name) +``` + +If resource settings are not specified, the default values are: +* `cpu_count` defaults to 1 CPU core +* `memory` defaults to 2 GiB +* `min_replicas` defaults to 1 +* `max_replicas` defaults to 1 (fixed scaling) + +### Resource Mapping + +The Kubernetes deployer converts `ResourceSettings` to Kubernetes resource requests and limits: + +- **CPU**: The `cpu_count` value is used for both requests and limits. For values < 1, it's converted to millicores (e.g., 0.5 = "500m"). For values >= 1, integer values are used directly. +- **Memory**: The `memory` value is used for both requests and limits. ResourceSettings accepts formats like "2GB", "512Mi", etc. The deployer converts these to Kubernetes-native formats (Mi, Gi). +- **Replicas**: + - If `min_replicas` == `max_replicas`, a fixed number of pods is deployed + - If they differ, `min_replicas` is used as the baseline (Horizontal Pod Autoscaler would be needed for actual autoscaling) + - If only `max_replicas` is specified, it's used as a fixed value + +### Additional Resource Settings + +ResourceSettings also supports autoscaling configuration: + +- `autoscaling_metric`: Metric to scale on ("cpu", "memory", "concurrency", or "rps") +- `autoscaling_target`: Target value for the metric (e.g., 70.0 for 70% CPU) +- `max_concurrency`: Maximum concurrent requests per pod + +To enable actual autoscaling, see the [Horizontal Pod Autoscaling](#horizontal-pod-autoscaling-hpa) section below. + +## RBAC requirements + +The deployer (either via the service connector or the client credentials) must +be able to: + +- Read, create, patch, and delete `Deployments`, `Services`, and `Pods` in the + target namespace. +- Create, patch, and delete `Secrets` (used for environment variables and + auth keys). +- Create, patch, and delete `Ingresses` when `ingress_enabled=True` + (requires permissions on `networking.k8s.io/v1/Ingress` resources). +- Create, patch, and delete `HorizontalPodAutoscalers` when `hpa_manifest` is provided + (requires permissions on `autoscaling/v2/HorizontalPodAutoscaler` resources). +- Create namespaces when they do not exist, unless you pre-create them. +- If you rely on automatic service-account provisioning, create service + accounts and role bindings (`create`, `patch`, `get`, `list` on + `ServiceAccount` and `RoleBinding`). +- Read cluster nodes when using the `NodePort` service type (to expose IPs). + +For production environments we recommend creating a dedicated service account +with minimal permissions scoped to the deployer namespace. + +## Using Ingress Controllers + +For production deployments, you can configure an Ingress resource to provide +HTTP/HTTPS access with custom domains, TLS termination, and advanced routing. +The Kubernetes deployer supports standard Kubernetes Ingress resources and works +with popular ingress controllers like nginx, Traefik, and cloud provider solutions. + +### Basic Ingress Configuration + +Enable ingress and specify your domain: + +```python +from zenml import pipeline, step +from zenml.integrations.kubernetes.deployers import KubernetesDeployerSettings + + +@step +def greet(name: str) -> str: + return f"Hello {name}!" + + +@pipeline( + settings={ + "deployer": KubernetesDeployerSettings( + service_type="ClusterIP", # Use ClusterIP with Ingress + ingress_enabled=True, + ingress_class="nginx", + ingress_host="my-app.example.com", + ) + } +) +def greeting_pipeline(name: str = "ZenML") -> str: + return greet(name=name) +``` + +### TLS/HTTPS Configuration + +Enable TLS for secure HTTPS access: + +```python +settings={ + "deployer": KubernetesDeployerSettings( + service_type="ClusterIP", + ingress_enabled=True, + ingress_class="nginx", + ingress_host="my-app.example.com", + ingress_tls_enabled=True, + ingress_tls_secret_name="my-app-tls", # Must exist in namespace + ) +} +``` + +The TLS secret should be created separately: + +```bash +kubectl create secret tls my-app-tls \ + --cert=path/to/tls.crt \ + --key=path/to/tls.key \ + -n +``` + +Or use cert-manager for automatic certificate provisioning: + +```python +settings={ + "deployer": KubernetesDeployerSettings( + service_type="ClusterIP", + ingress_enabled=True, + ingress_class="nginx", + ingress_host="my-app.example.com", + ingress_tls_enabled=True, + ingress_tls_secret_name="my-app-tls", + ingress_annotations={ + "cert-manager.io/cluster-issuer": "letsencrypt-prod", + }, + ) +} +``` + +### Controller-Specific Annotations + +Different ingress controllers support specific annotations for advanced features: + +**nginx-ingress**: +```python +ingress_annotations={ + "nginx.ingress.kubernetes.io/rewrite-target": "/", + "nginx.ingress.kubernetes.io/rate-limit": "100", + "nginx.ingress.kubernetes.io/ssl-redirect": "true", +} +``` + +**Traefik**: +```python +ingress_annotations={ + "traefik.ingress.kubernetes.io/router.entrypoints": "websecure", + "traefik.ingress.kubernetes.io/router.middlewares": "default-ratelimit@kubernetescrd", +} +``` + +**AWS ALB**: +```python +ingress_class="alb" +ingress_annotations={ + "alb.ingress.kubernetes.io/scheme": "internet-facing", + "alb.ingress.kubernetes.io/target-type": "ip", + "alb.ingress.kubernetes.io/certificate-arn": "arn:aws:acm:...", +} +``` + +### Path-Based Routing + +Configure path prefixes for multi-service deployments: + +```python +settings={ + "deployer": KubernetesDeployerSettings( + ingress_enabled=True, + ingress_host="api.example.com", + ingress_path="/weather", # Access at api.example.com/weather + ingress_path_type="Prefix", + ) +} +``` + +### Limitations + +The deployer currently creates standard Kubernetes Ingress resources +(networking.k8s.io/v1). For service mesh solutions like Istio that use different +APIs (Gateway/VirtualService), you'll need to create those resources separately +and use `service_type="ClusterIP"` to expose the service internally. + +## Horizontal Pod Autoscaling (HPA) + +The Kubernetes deployer supports HPA by passing autoscaling/v2 manifests via the `hpa_manifest` setting. The deployer handles lifecycle management (create/update/delete). + +### Example + +```python +from zenml import pipeline +from zenml.integrations.kubernetes.flavors import KubernetesDeployerSettings + +# Define your HPA manifest +hpa_manifest = { + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "metadata": {"name": "my-pipeline-hpa"}, + "spec": { + "scaleTargetRef": { + "apiVersion": "apps/v1", + "kind": "Deployment", + "name": "zenml-deployment-", + }, + "minReplicas": 2, + "maxReplicas": 10, + "metrics": [ + # CPU-based scaling + { + "type": "Resource", + "resource": { + "name": "cpu", + "target": {"type": "Utilization", "averageUtilization": 75}, + }, + }, + # Memory-based scaling (optional) + { + "type": "Resource", + "resource": { + "name": "memory", + "target": {"type": "Utilization", "averageUtilization": 80}, + }, + }, + ], + # Optional: scaling behavior policies + "behavior": { + "scaleDown": {"stabilizationWindowSeconds": 300}, + "scaleUp": {"stabilizationWindowSeconds": 0}, + }, + }, +} + +@pipeline(settings={"deployer": KubernetesDeployerSettings(hpa_manifest=hpa_manifest)}) +def my_pipeline(): + ... +``` + +### Requirements + +- **Metrics Server**: Required for CPU/memory metrics + ```bash + kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml + ``` +- **Resource requests**: Set via `ResourceSettings` (ZenML does this automatically) +- **Custom Metrics API**: Optional, for application-specific metrics (Prometheus Adapter, cloud provider solutions) + +### Monitoring + +```bash +kubectl get hpa -n zenml-deployments --watch +kubectl describe hpa -n zenml-deployments +``` + +For end-to-end deployment workflows, see the +[deployment user guide](../../user-guide/production-guide/deployment.md). diff --git a/docs/book/component-guide/toc.md b/docs/book/component-guide/toc.md index 6ee235b169c..e5829078253 100644 --- a/docs/book/component-guide/toc.md +++ b/docs/book/component-guide/toc.md @@ -23,6 +23,7 @@ * [Deployers](deployers/README.md) * [Local Deployer](deployers/local.md) * [Docker Deployer](deployers/docker.md) + * [Kubernetes Deployer](deployers/kubernetes.md) * [AWS App Runner Deployer](deployers/aws-app-runner.md) * [GCP Cloud Run Deployer](deployers/gcp-cloud-run.md) * [Artifact Stores](artifact-stores/README.md) diff --git a/examples/weather_agent/README.md b/examples/weather_agent/README.md index dc7d4e0c503..4161c07af95 100644 --- a/examples/weather_agent/README.md +++ b/examples/weather_agent/README.md @@ -28,6 +28,20 @@ Deploy the agent pipeline as a real-time service ([see code](pipelines/weather_a zenml pipeline deploy pipelines.weather_agent.weather_agent ``` +For Kubernetes deployments, use the provided configuration files: + +```bash +# Simple Kubernetes deployment with basic settings +zenml pipeline deploy pipelines.weather_agent.weather_agent \ + --name weather-simple \ + --config k8s_deploy_simple.yaml + +# Advanced Kubernetes deployment with production-ready configuration +zenml pipeline deploy pipelines.weather_agent.weather_agent \ + --name weather-advanced \ + --config k8s_deploy_advanced.yaml +``` + **Make predictions via API** ```bash @@ -66,6 +80,8 @@ weather_agent/ │ └── weather_agent.py - Weather data fetching and analysis steps ├── ui/ │ └── index.html - Interactive web interface (optional) +├── k8s_deploy_simple.yaml - Simple Kubernetes deployment config +├── k8s_deploy_advanced.yaml - Advanced Kubernetes deployment config ├── run.py - CLI for batch and real-time execution └── requirements.txt - Dependencies ``` @@ -123,6 +139,28 @@ def weather_agent_pipeline(city: str) -> Dict[str, Any]: This makes agents reproducible, versionable, and observable—the same as classical ML pipelines. +### **Kubernetes Deployment Configurations** + +Two deployment configurations are provided for Kubernetes: + +**Simple Configuration** (`k8s_deploy_simple.yaml`): +- Single replica deployment +- ClusterIP service (internal access only) +- Minimal resource allocation +- Ideal for development and testing + +**Advanced Configuration** (`k8s_deploy_advanced.yaml`): +- High availability with 3 replicas +- LoadBalancer service for external access +- Resource requests and limits (500m-1000m CPU, 512Mi-2Gi memory) +- Health check configuration (readiness and liveness probes) +- Node selectors and tolerations for targeted scheduling +- Service annotations for cloud provider integrations (e.g., AWS NLB) +- Pod labels and annotations for monitoring and organization +- CORS configuration for cross-origin requests + +These configurations demonstrate how to deploy the same pipeline with different operational characteristics based on your environment and requirements. + ### **Graceful Fallback Logic** Handle external API failures without breaking the service by falling back to deterministic rules: diff --git a/examples/weather_agent/k8s_deploy_advanced.yaml b/examples/weather_agent/k8s_deploy_advanced.yaml new file mode 100644 index 00000000000..3e0fe075946 --- /dev/null +++ b/examples/weather_agent/k8s_deploy_advanced.yaml @@ -0,0 +1,45 @@ +# Advanced Kubernetes Deployment Configuration for Weather Agent +# This configuration demonstrates advanced Kubernetes deployment options including +# resource management, scaling, health checks, node placement, service exposure, and ingress. + +# Override default pipeline parameters +parameters: + city: "San Francisco" + +settings: + docker: + parent_image: safoinme/zenml:deployer-0.4 + # For local testing: force code inclusion in image + allow_download_from_artifact_store: false + allow_including_files_in_images: true + # Resource configuration with high availability + resources: + cpu_count: 1 + memory: "2GB" + min_replicas: 3 # Run 3 replicas for redundancy + max_replicas: 3 # Fixed scaling (no autoscaling) + # Advanced Kubernetes deployer configuration + deployer: + # Namespace for isolation and organization + namespace: weather-prod + + # Use LoadBalancer for external access + service_type: LoadBalancer + + # Health check configuration + readiness_probe_initial_delay: 5 + readiness_probe_period: 10 + liveness_probe_initial_delay: 20 + liveness_probe_period: 15 + + # Advanced pod placement and scheduling + pod_settings: + + # Pod labels for organization and filtering + labels: + app: weather-agent + environment: production + version: "1.0" + + # Image pull policy + image_pull_policy: IfNotPresent \ No newline at end of file diff --git a/examples/weather_agent/k8s_deploy_simple.yaml b/examples/weather_agent/k8s_deploy_simple.yaml new file mode 100644 index 00000000000..121d6d21fa2 --- /dev/null +++ b/examples/weather_agent/k8s_deploy_simple.yaml @@ -0,0 +1,23 @@ +# Simple Kubernetes Deployment Configuration for Weather Agent +# This configuration provides a basic deployment setup with minimal settings. + +# Override default pipeline parameters +parameters: + city: "London" + +# Basic Kubernetes deployer configuration +settings: + docker: + parent_image: safoinme/zenml:deployer-0.4 + # For local testing: force code inclusion in image + allow_download_from_artifact_store: false + allow_including_files_in_images: true + # Resource configuration + resources: + cpu_count: 1 + memory: "2GB" + min_replicas: 1 + max_replicas: 1 + deployer: + namespace: weather-demo + service_type: LoadBalancer diff --git a/src/zenml/enums.py b/src/zenml/enums.py index 85aae8b6f48..1010e144675 100644 --- a/src/zenml/enums.py +++ b/src/zenml/enums.py @@ -540,6 +540,14 @@ class DeploymentStatus(StrEnum): ERROR = "error" +class KubernetesServiceType(StrEnum): + """Kubernetes Service types for the Kubernetes deployer.""" + + LOAD_BALANCER = "LoadBalancer" + NODE_PORT = "NodePort" + CLUSTER_IP = "ClusterIP" + + class PipelineRunTriggeredByType(StrEnum): """All possible types that can trigger a pipeline run.""" diff --git a/src/zenml/integrations/kubernetes/__init__.py b/src/zenml/integrations/kubernetes/__init__.py index 6a9d9bd07e3..54b26057db4 100644 --- a/src/zenml/integrations/kubernetes/__init__.py +++ b/src/zenml/integrations/kubernetes/__init__.py @@ -25,6 +25,7 @@ KUBERNETES_ORCHESTRATOR_FLAVOR = "kubernetes" KUBERNETES_STEP_OPERATOR_FLAVOR = "kubernetes" +KUBERNETES_DEPLOYER_FLAVOR = "kubernetes" class KubernetesIntegration(Integration): @@ -43,8 +44,14 @@ def flavors(cls) -> List[Type[Flavor]]: List of new stack component flavors. """ from zenml.integrations.kubernetes.flavors import ( - KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor + KubernetesDeployerFlavor, + KubernetesOrchestratorFlavor, + KubernetesStepOperatorFlavor, ) - return [KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor] + return [ + KubernetesDeployerFlavor, + KubernetesOrchestratorFlavor, + KubernetesStepOperatorFlavor, + ] diff --git a/src/zenml/integrations/kubernetes/deployers/__init__.py b/src/zenml/integrations/kubernetes/deployers/__init__.py new file mode 100644 index 00000000000..e203c085e84 --- /dev/null +++ b/src/zenml/integrations/kubernetes/deployers/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Kubernetes deployer implementation.""" + +from zenml.integrations.kubernetes.deployers.kubernetes_deployer import ( # noqa + KubernetesDeployer, +) + +__all__ = [ + "KubernetesDeployer", +] + diff --git a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py new file mode 100644 index 00000000000..3f14905ec15 --- /dev/null +++ b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py @@ -0,0 +1,1696 @@ +# Copyright (c) ZenML GmbH 2021. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of the ZenML Kubernetes deployer.""" + +import re +from datetime import datetime, timezone +from typing import ( + TYPE_CHECKING, + Dict, + Generator, + List, + Optional, + Tuple, + Type, + cast, +) + +from kubernetes import client as k8s_client +from kubernetes import config as k8s_config +from kubernetes import watch as k8s_watch +from kubernetes.client.rest import ApiException +from pydantic import BaseModel, Field + +from zenml.deployers.containerized_deployer import ( + ContainerizedDeployer, +) +from zenml.deployers.exceptions import ( + DeployerError, + DeploymentDeprovisionError, + DeploymentLogsNotFoundError, + DeploymentNotFoundError, + DeploymentProvisionError, +) +from zenml.deployers.server.entrypoint_configuration import ( + DEPLOYMENT_ID_OPTION, +) +from zenml.enums import DeploymentStatus, StackComponentType +from zenml.integrations.kubernetes import kube_utils +from zenml.integrations.kubernetes.flavors.kubernetes_deployer_flavor import ( + KubernetesDeployerConfig, + KubernetesDeployerSettings, +) +from zenml.integrations.kubernetes.manifest_utils import ( + build_deployment_manifest, + build_service_manifest, +) +from zenml.logger import get_logger +from zenml.models import ( + DeploymentOperationalState, + DeploymentResponse, +) +from zenml.stack import StackValidator + +if TYPE_CHECKING: + from zenml.stack import Stack + +logger = get_logger(__name__) + +# Resource constants +MAX_K8S_NAME_LENGTH = 63 + +# Minimum datetime for pod creation timestamp comparisons +MIN_DATETIME = datetime.min.replace(tzinfo=timezone.utc) + + +class KubernetesDeploymentMetadata(BaseModel): + """Metadata for a Kubernetes deployment. + + Captures runtime state and actual deployment details. Configuration settings + are stored separately in the deployment snapshot and can be viewed with + `zenml deployment describe --show-schema`. + + Attributes: + deployment_name: The name of the Kubernetes Deployment resource. + namespace: The namespace where the deployment is running. + service_name: The name of the Kubernetes Service resource. + pod_name: The name of a running pod (if available). + port: The service port exposed by the deployment. + service_type: The type of Kubernetes Service (LoadBalancer, NodePort, ClusterIP). + external_ip: The external IP or hostname (for LoadBalancer services). + node_port: The assigned node port (for NodePort services). + replicas: Number of replicas desired by the deployment. + ready_replicas: Number of pods that are ready and serving traffic. + available_replicas: Number of pods available for use. + cpu: CPU resources allocated to the deployment (e.g., "1000m", "2"). + memory: Memory resources allocated to the deployment (e.g., "2Gi"). + image: The container image actually deployed. + labels: Labels applied to the deployment resources. + """ + + deployment_name: str + namespace: str + service_name: str + pod_name: Optional[str] = None + port: int + service_type: str = "LoadBalancer" + external_ip: Optional[str] = None + node_port: Optional[int] = None + replicas: Optional[int] = None + ready_replicas: Optional[int] = None + available_replicas: Optional[int] = None + cpu: Optional[str] = None + memory: Optional[str] = None + image: Optional[str] = None + labels: Dict[str, str] = Field(default_factory=dict) + + @classmethod + def from_deployment( + cls, deployment: DeploymentResponse + ) -> "KubernetesDeploymentMetadata": + """Create KubernetesDeploymentMetadata from a deployment response. + + Args: + deployment: The deployment to get the metadata for. + + Returns: + The metadata for the Kubernetes deployment. + + Raises: + DeployerError: If the deployment metadata is invalid. + """ + if not deployment.deployment_metadata: + raise DeployerError( + f"Deployment '{deployment.name}' has no metadata." + ) + + try: + return cls.model_validate(deployment.deployment_metadata) + except Exception as e: + raise DeployerError( + f"Failed to parse deployment metadata for deployment " + f"'{deployment.name}': {e}" + ) + + +class KubernetesDeployer(ContainerizedDeployer): + """Deployer for running pipelines in Kubernetes.""" + + _k8s_client: Optional[k8s_client.ApiClient] = None + + # ======================================================================== + # Client and API Management + # ======================================================================== + + def get_kube_client( + self, incluster: Optional[bool] = None + ) -> k8s_client.ApiClient: + """Get authenticated Kubernetes client. + + This method handles: + - In-cluster authentication + - Service connector authentication + - Local kubeconfig authentication + - Client caching and expiration + + Args: + incluster: Whether to use in-cluster config. Overrides + the config setting if provided. + + Returns: + Authenticated Kubernetes API client. + + Raises: + RuntimeError: If connector behaves unexpectedly. + """ + if incluster is None: + incluster = self.config.incluster + + if incluster: + kube_utils.load_kube_config( + incluster=incluster, + context=self.config.kubernetes_context, + ) + self._k8s_client = k8s_client.ApiClient() + return self._k8s_client + + connector_has_expired = self.connector_has_expired() + if self._k8s_client and not connector_has_expired: + return self._k8s_client + + connector = self.get_connector() + if connector: + client = connector.connect() + if not isinstance(client, k8s_client.ApiClient): + raise RuntimeError( + f"Expected a k8s_client.ApiClient while trying to use the " + f"linked connector, but got {type(client)}." + ) + self._k8s_client = client + else: + kube_utils.load_kube_config( + incluster=incluster, + context=self.config.kubernetes_context, + ) + self._k8s_client = k8s_client.ApiClient() + + return self._k8s_client + + @property + def k8s_core_api(self) -> k8s_client.CoreV1Api: + """Get Kubernetes Core V1 API client. + + Returns: + Kubernetes Core V1 API client. + """ + return k8s_client.CoreV1Api(self.get_kube_client()) + + @property + def k8s_apps_api(self) -> k8s_client.AppsV1Api: + """Get Kubernetes Apps V1 API client. + + Returns: + Kubernetes Apps V1 API client. + """ + return k8s_client.AppsV1Api(self.get_kube_client()) + + @property + def k8s_rbac_api(self) -> k8s_client.RbacAuthorizationV1Api: + """Get Kubernetes RBAC Authorization V1 API client. + + Returns: + Kubernetes RBAC Authorization V1 API client. + """ + return k8s_client.RbacAuthorizationV1Api(self.get_kube_client()) + + @property + def k8s_networking_api(self) -> k8s_client.NetworkingV1Api: + """Get Kubernetes Networking V1 API client. + + Returns: + Kubernetes Networking V1 API client. + """ + return k8s_client.NetworkingV1Api(self.get_kube_client()) + + @property + def k8s_autoscaling_api(self) -> k8s_client.AutoscalingV2Api: + """Get Kubernetes Autoscaling V2 API client. + + Returns: + Kubernetes Autoscaling V2 API client. + """ + return k8s_client.AutoscalingV2Api(self.get_kube_client()) + + # ======================================================================== + # Cluster and Namespace Management + # ======================================================================== + + def get_kubernetes_contexts(self) -> Tuple[List[str], str]: + """Get list of configured Kubernetes contexts and the active context. + + Returns: + Tuple of (context_names, active_context_name). + + Raises: + RuntimeError: If Kubernetes configuration cannot be loaded. + """ + try: + contexts, active_context = k8s_config.list_kube_config_contexts() + except k8s_config.config_exception.ConfigException as e: + raise RuntimeError( + "Could not load the Kubernetes configuration" + ) from e + + context_names = [c["name"] for c in contexts] + active_context_name = active_context["name"] + return context_names, active_context_name + + def ensure_namespace_exists(self, namespace: str) -> None: + """Ensure a Kubernetes namespace exists. + + Args: + namespace: The namespace name. + + Raises: + RuntimeError: If namespace creation fails due to permissions + or other non-conflict errors. + """ + try: + kube_utils.create_namespace( + core_api=self.k8s_core_api, + namespace=namespace, + ) + logger.debug(f"Created namespace '{namespace}'.") + except ApiException as e: + if e.status == 409: + logger.debug(f"Namespace '{namespace}' already exists.") + else: + raise RuntimeError( + f"Failed to ensure namespace '{namespace}' exists: {e}. " + f"This may be due to insufficient permissions (RBAC) or " + f"cluster configuration issues." + ) from e + + def create_or_get_service_account( + self, + service_account_name: str, + namespace: str, + role_binding_name: str = "zenml-edit", + ) -> str: + """Create or get a Kubernetes service account with edit permissions. + + Args: + service_account_name: Name of the service account. + namespace: Kubernetes namespace. + role_binding_name: Name of the role binding. + + Returns: + The service account name. + """ + kube_utils.create_edit_service_account( + core_api=self.k8s_core_api, + rbac_api=self.k8s_rbac_api, + service_account_name=service_account_name, + namespace=namespace, + role_binding_name=role_binding_name, + ) + return service_account_name + + # ======================================================================== + # Validation and Configuration + # ======================================================================== + + def validate_kubernetes_context( + self, stack: "Stack", component_type: str + ) -> Tuple[bool, str]: + """Validate Kubernetes context configuration. + + Args: + stack: The stack to validate. + component_type: Type of component (e.g., "orchestrator", "deployer"). + + Returns: + Tuple of (is_valid, error_message). + """ + container_registry = stack.container_registry + assert container_registry is not None + + kubernetes_context = self.config.kubernetes_context + msg = f"'{self.name}' Kubernetes {component_type} error: " + + if not self.connector: + if kubernetes_context: + try: + contexts, active_context = self.get_kubernetes_contexts() + + if kubernetes_context not in contexts: + return False, ( + f"{msg}could not find a Kubernetes context named " + f"'{kubernetes_context}' in the local " + "Kubernetes configuration. Please make sure that " + "the Kubernetes cluster is running and that the " + "kubeconfig file is configured correctly. To list " + "all configured contexts, run:\n\n" + " `kubectl config get-contexts`\n" + ) + if kubernetes_context != active_context: + logger.warning( + f"{msg}the Kubernetes context '{kubernetes_context}' " + f"configured for the Kubernetes {component_type} is not " + f"the same as the active context in the local Kubernetes " + f"configuration. To set the active context, run:\n\n" + f" `kubectl config use-context {kubernetes_context}`\n" + ) + except RuntimeError as e: + logger.debug( + f"Could not validate Kubernetes context " + f"'{kubernetes_context}': {e}" + ) + except Exception as e: + logger.warning( + f"Unexpected error while validating Kubernetes context " + f"'{kubernetes_context}': {e}" + ) + elif self.config.incluster: + pass + else: + return False, ( + f"{msg}you must either link this {component_type} to a " + "Kubernetes service connector (see the 'zenml " + f"{component_type} connect' CLI command), explicitly set " + "the `kubernetes_context` attribute to the name of the " + "Kubernetes config context pointing to the cluster " + "where you would like to run operations, or set the " + "`incluster` attribute to `True`." + ) + + if not self.config.is_local and container_registry.config.is_local: + return False, ( + f"{msg}the Kubernetes {component_type} is configured to " + "run in a remote Kubernetes cluster but the " + f"'{container_registry.name}' container registry URI " + f"'{container_registry.config.uri}' points to a local " + f"container registry. Please ensure that you use a remote " + f"container registry with a remote Kubernetes {component_type}." + ) + + return True, "" + + @property + def config(self) -> KubernetesDeployerConfig: + """Get the Kubernetes deployer config. + + Returns: + The Kubernetes deployer config. + """ + return cast(KubernetesDeployerConfig, self._config) + + @property + def settings_class(self) -> Optional[Type[KubernetesDeployerSettings]]: + """Return the settings class for the Kubernetes deployer. + + Returns: + The settings class. + """ + return KubernetesDeployerSettings + + @property + def validator(self) -> Optional[StackValidator]: + """Validator for the Kubernetes deployer. + + Returns: + Stack validator. + """ + + def _validate_local_requirements(stack: "Stack") -> Tuple[bool, str]: + """Validates that the stack is compatible with Kubernetes deployer. + + Args: + stack: The stack. + + Returns: + Whether the stack is valid and an explanation if not. + """ + return self.validate_kubernetes_context(stack, "deployer") + + return StackValidator( + required_components={ + StackComponentType.IMAGE_BUILDER, + StackComponentType.CONTAINER_REGISTRY, + }, + custom_validation_function=_validate_local_requirements, + ) + + # ======================================================================== + # Resource Naming and Labels + # ======================================================================== + + def _get_namespace(self, deployment: DeploymentResponse) -> str: + """Get the namespace for a deployment. + + Attempts to retrieve namespace from cached metadata first for performance, + then falls back to parsing settings if metadata is unavailable. + + Args: + deployment: The deployment. + + Returns: + Namespace name. + + Raises: + DeployerError: If the deployment has no snapshot. + """ + if deployment.deployment_metadata: + try: + metadata = KubernetesDeploymentMetadata.from_deployment( + deployment + ) + return metadata.namespace + except Exception: + logger.debug( + f"Could not retrieve namespace from metadata for " + f"deployment '{deployment.name}', parsing settings instead." + ) + + snapshot = deployment.snapshot + if not snapshot: + raise DeployerError( + f"Deployment '{deployment.name}' has no snapshot." + ) + + settings = cast( + KubernetesDeployerSettings, + self.get_settings(snapshot), + ) + return settings.namespace or self.config.kubernetes_namespace + + def _get_resource_base_name(self, deployment: DeploymentResponse) -> str: + """Get the base name used for all Kubernetes resources for this deployment. + + Args: + deployment: The deployment. + + Returns: + Sanitized base name used for Deployment, Service, and Ingress resources. + """ + name = f"zenml-deployment-{deployment.id}" + return kube_utils.sanitize_label(name)[:MAX_K8S_NAME_LENGTH] + + def _get_secret_name(self, deployment: DeploymentResponse) -> str: + """Generate Kubernetes secret name for deployment secrets. + + Args: + deployment: The deployment. + + Returns: + Secret name. + """ + return f"zenml-secrets-{deployment.id}" + + def _get_ingress_name( + self, deployment: DeploymentResponse, settings: KubernetesDeployerSettings + ) -> Optional[str]: + """Get Kubernetes ingress name from manifest if provided. + + Args: + deployment: The deployment. + settings: Deployer settings. + + Returns: + Ingress name from manifest or None if not provided. + """ + if not settings.ingress_manifest: + return None + + manifest_name = settings.ingress_manifest.get("metadata", {}).get("name") + if manifest_name: + return str(manifest_name) + + # Fallback to default name + deployment_name = self._get_resource_base_name(deployment) + return f"{deployment_name}-ingress" + + def _get_hpa_name(self, deployment: DeploymentResponse) -> str: + """Generate Kubernetes HorizontalPodAutoscaler name. + + Args: + deployment: The deployment. + + Returns: + HPA name. + """ + deployment_name = self._get_resource_base_name(deployment) + return f"{deployment_name}-hpa" + + def _get_deployment_labels( + self, + deployment: DeploymentResponse, + settings: KubernetesDeployerSettings, + ) -> Dict[str, str]: + """Get labels for Kubernetes resources. + + Args: + deployment: The deployment. + settings: Deployer settings. + + Returns: + Labels dictionary. + """ + labels = { + "zenml-deployment-id": str(deployment.id), + "zenml-deployment-name": kube_utils.sanitize_label( + deployment.name + ), + "zenml-deployer-id": str(self.id), + "managed-by": "zenml", + } + + if settings.labels: + labels.update(settings.labels) + + return labels + + # ======================================================================== + # Security and Secret Management + # ======================================================================== + + def _sanitize_secret_key(self, key: str) -> str: + """Sanitize a secret key to be a valid Kubernetes environment variable name. + + Kubernetes environment variable names must be valid C identifiers: + - Consist of alphanumeric characters and underscores only: [A-Za-z0-9_] + - Start with a letter or underscore (not a digit): [A-Za-z_] + - Cannot contain hyphens, dots, or other special characters + + Args: + key: The secret key to sanitize. + + Returns: + Sanitized key that is valid as a Kubernetes env var name. + """ + original_key = key + + sanitized = re.sub(r"[^A-Za-z0-9_]", "_", original_key) + + if not sanitized or not re.match(r"^[A-Za-z_]", sanitized): + sanitized = f"_{sanitized}" if sanitized else "_VAR" + + if sanitized != original_key: + logger.warning( + f"Secret key '{original_key}' was sanitized to '{sanitized}' " + f"to meet Kubernetes environment variable name requirements. " + f"The environment variable will be available as '{sanitized}'." + ) + + return sanitized + + def _sanitize_secrets(self, secrets: Dict[str, str]) -> Dict[str, str]: + """Sanitize secret keys to valid Kubernetes environment variable names. + + Args: + secrets: Dictionary of secret keys and values. + + Returns: + Dictionary mapping sanitized keys to values. + + Raises: + DeployerError: If sanitization causes key collisions. + """ + sanitized_secrets: Dict[str, str] = {} + collision_map: Dict[str, str] = {} + + for key, value in secrets.items(): + sanitized_key = self._sanitize_secret_key(key) + + if sanitized_key in collision_map: + raise DeployerError( + f"Secret key collision detected: keys '{collision_map[sanitized_key]}' " + f"and '{key}' both sanitize to '{sanitized_key}'. " + f"Please rename one of them to avoid conflicts." + ) + + collision_map[sanitized_key] = key + sanitized_secrets[sanitized_key] = value + + return sanitized_secrets + + def _prepare_environment( + self, + deployment: DeploymentResponse, + environment: Dict[str, str], + sanitized_secrets: Dict[str, str], + ) -> List[k8s_client.V1EnvVar]: + """Prepare environment variables for the container. + + Args: + deployment: The deployment. + environment: Environment variables. + sanitized_secrets: Pre-sanitized secret environment variables. + + Returns: + List of Kubernetes environment variables. + + Note: + Secrets are stored as Kubernetes Secret resources and referenced + via secretKeyRef for better security. Secret keys must be + pre-sanitized to valid Kubernetes environment variable names. + """ + env_vars = [] + + for key, value in environment.items(): + env_vars.append(k8s_client.V1EnvVar(name=key, value=value)) + + if sanitized_secrets: + secret_name = self._get_secret_name(deployment) + for key in sanitized_secrets.keys(): + env_vars.append( + k8s_client.V1EnvVar( + name=key, + value_from=k8s_client.V1EnvVarSource( + secret_key_ref=k8s_client.V1SecretKeySelector( + name=secret_name, + key=key, + ) + ), + ) + ) + + return env_vars + + # ======================================================================== + # Manifest Building + # ======================================================================== + + def _build_deployment_manifest( + self, + deployment: DeploymentResponse, + namespace: str, + labels: Dict[str, str], + image: str, + environment: Dict[str, str], + sanitized_secrets: Dict[str, str], + settings: KubernetesDeployerSettings, + resource_requests: Dict[str, str], + resource_limits: Dict[str, str], + replicas: int, + ) -> k8s_client.V1Deployment: + """Build Kubernetes Deployment manifest. + + Args: + deployment: The deployment. + namespace: Kubernetes namespace. + labels: Labels to apply to the deployment resources. + image: Container image URI. + environment: Environment variables. + sanitized_secrets: Pre-sanitized secret environment variables. + settings: Deployer settings. + resource_requests: Resource requests (cpu, memory, gpu). + resource_limits: Resource limits (cpu, memory, gpu). + replicas: Number of pod replicas. + + Returns: + Kubernetes Deployment manifest. + """ + deployment_name = self._get_resource_base_name(deployment) + + env_vars = self._prepare_environment( + deployment, environment, sanitized_secrets + ) + + command = settings.command or [ + "python", + "-m", + "zenml.deployers.server.app", + ] + args = settings.args or [ + f"--{DEPLOYMENT_ID_OPTION}", + str(deployment.id), + ] + + liveness_probe_config = { + "initial_delay_seconds": settings.liveness_probe_initial_delay, + "period_seconds": settings.liveness_probe_period, + "timeout_seconds": settings.liveness_probe_timeout, + "failure_threshold": settings.liveness_probe_failure_threshold, + } + readiness_probe_config = { + "initial_delay_seconds": settings.readiness_probe_initial_delay, + "period_seconds": settings.readiness_probe_period, + "timeout_seconds": settings.readiness_probe_timeout, + "failure_threshold": settings.readiness_probe_failure_threshold, + } + + return build_deployment_manifest( + deployment_name=deployment_name, + namespace=namespace, + labels=labels, + annotations=settings.annotations, + replicas=replicas, + image=image, + command=command, + args=args, + env_vars=env_vars, + service_port=settings.service_port, + resource_requests=resource_requests, + resource_limits=resource_limits, + image_pull_policy=settings.image_pull_policy, + image_pull_secrets=settings.image_pull_secrets, + service_account_name=settings.service_account_name, + liveness_probe_config=liveness_probe_config, + readiness_probe_config=readiness_probe_config, + liveness_probe_path=settings.liveness_probe_path, + readiness_probe_path=settings.readiness_probe_path, + pod_settings=settings.pod_settings, + ) + + def _build_service_manifest( + self, + deployment_name: str, + service_name: str, + namespace: str, + labels: Dict[str, str], + settings: KubernetesDeployerSettings, + ) -> k8s_client.V1Service: + """Build Kubernetes Service manifest. + + Args: + deployment_name: Name of the deployment. + service_name: Name of the service. + namespace: Kubernetes namespace. + labels: Labels to apply to the service resources. + settings: Deployer settings. + + Returns: + Kubernetes Service manifest. + """ + return build_service_manifest( + service_name=service_name, + namespace=namespace, + labels=labels, + annotations=settings.service_annotations, + service_type=settings.service_type, + service_port=settings.service_port, + node_port=settings.node_port, + session_affinity=settings.session_affinity, + load_balancer_ip=settings.load_balancer_ip, + load_balancer_source_ranges=settings.load_balancer_source_ranges, + deployment_name=deployment_name, + ) + + + # ======================================================================== + # Resource Lifecycle Management + # ======================================================================== + + def _get_pod_for_deployment( + self, deployment: DeploymentResponse + ) -> Optional[k8s_client.V1Pod]: + """Get a pod for the deployment. + + Prefers running pods over pending or terminating ones. + + Args: + deployment: The deployment. + + Returns: + Pod or None if not found. + """ + namespace = self._get_namespace(deployment) + label_selector = f"zenml-deployment-id={deployment.id}" + + try: + pods = kube_utils.list_pods( + core_api=self.k8s_core_api, + namespace=namespace, + label_selector=label_selector, + ) + if not pods.items: + return None + + running_pods = [ + p + for p in pods.items + if p.status and p.status.phase == "Running" + ] + if running_pods: + return max( + running_pods, + key=lambda p: p.metadata.creation_timestamp + or MIN_DATETIME, + ) + + return max( + pods.items, + key=lambda p: p.metadata.creation_timestamp or MIN_DATETIME, + ) + except ApiException: + pass + + return None + + def _manage_deployment_secrets( + self, + deployment: DeploymentResponse, + namespace: str, + secrets: Dict[str, str], + ) -> Dict[str, str]: + """Manage Kubernetes secrets for deployment. + + Args: + deployment: The deployment. + namespace: Kubernetes namespace. + secrets: Secret environment variables. + + Returns: + Sanitized secrets dictionary. + """ + sanitized_secrets = self._sanitize_secrets(secrets) if secrets else {} + secret_name = self._get_secret_name(deployment) + + if sanitized_secrets: + logger.info( + f"Creating/updating Kubernetes Secret '{secret_name}' " + f"in namespace '{namespace}'." + ) + kube_utils.create_or_update_secret( + core_api=self.k8s_core_api, + namespace=namespace, + secret_name=secret_name, + data=cast(Dict[str, Optional[str]], sanitized_secrets), + ) + else: + # Clean up Secret if all secrets were removed + try: + kube_utils.delete_secret( + core_api=self.k8s_core_api, + namespace=namespace, + secret_name=secret_name, + ) + logger.debug( + f"Deleted empty Kubernetes Secret '{secret_name}' " + f"in namespace '{namespace}'." + ) + except Exception: + pass + + return sanitized_secrets + + def _manage_deployment_resource( + self, + namespace: str, + deployment_name: str, + deployment_manifest: k8s_client.V1Deployment, + existing_deployment: Optional[k8s_client.V1Deployment], + ) -> None: + """Create or update Kubernetes Deployment resource. + + Args: + namespace: Kubernetes namespace. + deployment_name: Name of the Kubernetes Deployment. + deployment_manifest: Deployment manifest. + existing_deployment: Existing deployment if updating. + """ + if existing_deployment: + logger.info( + f"Updating Kubernetes Deployment '{deployment_name}' " + f"in namespace '{namespace}'." + ) + kube_utils.update_deployment( + apps_api=self.k8s_apps_api, + name=deployment_name, + namespace=namespace, + deployment_manifest=deployment_manifest, + ) + else: + logger.info( + f"Creating Kubernetes Deployment '{deployment_name}' " + f"in namespace '{namespace}'." + ) + kube_utils.create_deployment( + apps_api=self.k8s_apps_api, + namespace=namespace, + deployment_manifest=deployment_manifest, + ) + + def _manage_service_resource( + self, + namespace: str, + service_name: str, + service_manifest: k8s_client.V1Service, + existing_service: Optional[k8s_client.V1Service], + settings: KubernetesDeployerSettings, + ) -> None: + """Create or update Kubernetes Service resource. + + Args: + namespace: Kubernetes namespace. + service_name: Name of the Kubernetes Service. + service_manifest: Service manifest. + existing_service: Existing service if updating. + settings: Deployer settings. + + Raises: + DeploymentProvisionError: If service deletion times out. + """ + if existing_service: + needs_recreate = kube_utils.service_needs_recreate( + existing_service, service_manifest + ) + + if needs_recreate: + logger.info( + f"Service '{service_name}' has immutable field changes. " + f"Deleting and recreating..." + ) + kube_utils.delete_service( + core_api=self.k8s_core_api, + name=service_name, + namespace=namespace, + ) + try: + kube_utils.wait_for_service_deletion( + core_api=self.k8s_core_api, + service_name=service_name, + namespace=namespace, + timeout=settings.service_deletion_timeout, + ) + except RuntimeError as e: + raise DeploymentProvisionError(str(e)) from e + + kube_utils.create_service( + core_api=self.k8s_core_api, + namespace=namespace, + service_manifest=service_manifest, + ) + else: + logger.info( + f"Updating Kubernetes Service '{service_name}' " + f"in namespace '{namespace}'." + ) + kube_utils.update_service( + core_api=self.k8s_core_api, + name=service_name, + namespace=namespace, + service_manifest=service_manifest, + ) + else: + logger.info( + f"Creating Kubernetes Service '{service_name}' " + f"in namespace '{namespace}'." + ) + kube_utils.create_service( + core_api=self.k8s_core_api, + namespace=namespace, + service_manifest=service_manifest, + ) + + def _manage_ingress_resource( + self, + deployment: DeploymentResponse, + namespace: str, + settings: KubernetesDeployerSettings, + ) -> None: + """Manage Kubernetes Ingress resource. + + Args: + deployment: The deployment. + namespace: Kubernetes namespace. + settings: Deployer settings. + """ + ingress_name = self._get_ingress_name(deployment, settings) + + if settings.ingress_manifest: + logger.info( + f"Creating/updating Ingress " + f"in namespace '{namespace}'." + ) + kube_utils.create_or_update_ingress( + networking_api=self.k8s_networking_api, + namespace=namespace, + ingress_manifest=settings.ingress_manifest, + ) + else: + # Clean up ingress if manifest was removed + if ingress_name: + existing_ingress = kube_utils.get_ingress( + networking_api=self.k8s_networking_api, + name=ingress_name, + namespace=namespace, + ) + if existing_ingress: + logger.info( + f"Ingress manifest removed, deleting existing Ingress '{ingress_name}' " + f"in namespace '{namespace}'." + ) + kube_utils.delete_ingress( + networking_api=self.k8s_networking_api, + name=ingress_name, + namespace=namespace, + ) + + def _manage_hpa_resource( + self, + deployment: DeploymentResponse, + namespace: str, + settings: KubernetesDeployerSettings, + ) -> None: + """Manage HorizontalPodAutoscaler resource. + + Args: + deployment: The deployment. + namespace: Kubernetes namespace. + settings: Deployer settings. + """ + hpa_name = self._get_hpa_name(deployment) + + if settings.hpa_manifest: + logger.info( + f"Creating/updating HorizontalPodAutoscaler " + f"in namespace '{namespace}'." + ) + kube_utils.create_or_update_hpa( + autoscaling_api=self.k8s_autoscaling_api, + namespace=namespace, + hpa_manifest=settings.hpa_manifest, + ) + else: + existing_hpa = kube_utils.get_hpa( + autoscaling_api=self.k8s_autoscaling_api, + name=hpa_name, + namespace=namespace, + ) + if existing_hpa: + logger.info( + f"HPA manifest removed, deleting existing HorizontalPodAutoscaler '{hpa_name}' " + f"in namespace '{namespace}'." + ) + kube_utils.delete_hpa( + autoscaling_api=self.k8s_autoscaling_api, + name=hpa_name, + namespace=namespace, + ) + + # ======================================================================== + # Deployment Operations (Core Methods) + # ======================================================================== + + def do_provision_deployment( + self, + deployment: DeploymentResponse, + stack: "Stack", + environment: Dict[str, str], + secrets: Dict[str, str], + timeout: int, + ) -> DeploymentOperationalState: + """Provision a Kubernetes deployment. + + Args: + deployment: The deployment to provision. + stack: The stack to use for provisioning. + environment: Environment variables. + secrets: Secret environment variables. + timeout: Timeout in seconds. + + Returns: + The operational state of the deployment. + + Raises: + DeploymentProvisionError: If provisioning fails. + """ + snapshot = deployment.snapshot + if not snapshot: + raise DeploymentProvisionError( + f"Deployment '{deployment.name}' has no snapshot." + ) + + settings = cast( + KubernetesDeployerSettings, + self.get_settings(snapshot), + ) + + # Convert resource settings to Kubernetes format + try: + resource_requests, resource_limits, replicas = ( + kube_utils.convert_resource_settings_to_k8s_format( + snapshot.pipeline_configuration.resource_settings + ) + ) + except ValueError as e: + raise DeploymentProvisionError(str(e)) from e + + namespace = self._get_namespace(deployment) + deployment_name = service_name = self._get_resource_base_name( + deployment + ) + labels = self._get_deployment_labels(deployment, settings) + + existing_deployment = kube_utils.get_deployment( + apps_api=self.k8s_apps_api, + name=deployment_name, + namespace=namespace, + ) + is_new_deployment = existing_deployment is None + + try: + image = self.get_image(snapshot) + self.ensure_namespace_exists(namespace) + + # Create/update Kubernetes Secret resource and get sanitized keys + sanitized_secrets = self._manage_deployment_secrets( + deployment, namespace, secrets + ) + + # Build Kubernetes Deployment and Service manifests + deployment_manifest = self._build_deployment_manifest( + deployment, + namespace, + labels, + image, + environment, + sanitized_secrets, + settings, + resource_requests, + resource_limits, + replicas, + ) + service_manifest = self._build_service_manifest( + deployment_name, service_name, namespace, labels, settings + ) + + # Create or update Kubernetes Deployment resource + self._manage_deployment_resource( + namespace, + deployment_name, + deployment_manifest, + existing_deployment, + ) + + existing_service = kube_utils.get_service( + core_api=self.k8s_core_api, + name=service_name, + namespace=namespace, + ) + self._manage_service_resource( + namespace, + service_name, + service_manifest, + existing_service, + settings, + ) + + self._manage_ingress_resource( + deployment, namespace, settings + ) + self._manage_hpa_resource(deployment, namespace, settings) + + if timeout > 0: + try: + kube_utils.wait_for_deployment_ready( + apps_api=self.k8s_apps_api, + deployment_name=deployment_name, + namespace=namespace, + timeout=timeout, + check_interval=settings.deployment_ready_check_interval, + ) + except RuntimeError as e: + raise DeploymentProvisionError(str(e)) from e + + if settings.service_type == "LoadBalancer": + lb_timeout = min(timeout, 150) + kube_utils.wait_for_loadbalancer_ip( + core_api=self.k8s_core_api, + service_name=service_name, + namespace=namespace, + timeout=lb_timeout, + check_interval=settings.deployment_ready_check_interval, + ) + else: + logger.info( + f"Deployment '{deployment_name}' created. " + f"No timeout specified, not waiting for readiness. " + f"Poll deployment state to check readiness." + ) + + return self.do_get_deployment_state(deployment) + + except DeploymentProvisionError: + # Re-raise deployment errors without cleanup (user may want to inspect state) + raise + except Exception as e: + # For new deployments that failed, clean up to avoid orphaned resources + if is_new_deployment: + logger.error( + f"Provisioning failed for new deployment '{deployment.name}'. " + f"Attempting cleanup of partial resources..." + ) + try: + self.do_deprovision_deployment( + deployment, timeout=settings.service_deletion_timeout + ) + logger.info( + f"Successfully cleaned up partial resources for deployment '{deployment.name}'." + ) + except Exception as cleanup_error: + logger.warning( + f"Failed to clean up partial resources for deployment '{deployment.name}': " + f"{cleanup_error}. Manual cleanup may be required. " + f"Original provisioning error: {e}" + ) + else: + logger.error( + f"Provisioning update failed for deployment '{deployment.name}'. " + f"Previous deployment state may still be active." + ) + + raise DeploymentProvisionError( + f"Failed to provision Kubernetes deployment " + f"'{deployment.name}': {e}" + ) from e + + # ======================================================================== + # State and Status Management + # ======================================================================== + + def _determine_deployment_status( + self, + k8s_deployment: k8s_client.V1Deployment, + deployment: DeploymentResponse, + settings: KubernetesDeployerSettings, + ) -> DeploymentStatus: + """Determine deployment status from Kubernetes Deployment. + + Args: + k8s_deployment: Kubernetes Deployment resource. + deployment: ZenML deployment. + settings: Deployer settings. + + Returns: + Deployment status. + """ + status = DeploymentStatus.PENDING + + if k8s_deployment.status: + available_replicas = k8s_deployment.status.available_replicas or 0 + replicas = k8s_deployment.spec.replicas or 0 + + if available_replicas == replicas and replicas > 0: + status = DeploymentStatus.RUNNING + elif k8s_deployment.status.ready_replicas: + status = DeploymentStatus.PENDING + elif k8s_deployment.status.conditions: + for condition in k8s_deployment.status.conditions: + if ( + condition.type == "Progressing" + and condition.status == "False" + ): + status = DeploymentStatus.ERROR + break + + # Check pod-level failures + if status != DeploymentStatus.RUNNING: + pod = self._get_pod_for_deployment(deployment) + if pod: + error_reason = kube_utils.check_pod_failure_status( + pod, + restart_error_threshold=settings.pod_restart_error_threshold, + ) + if error_reason: + logger.warning( + f"Deployment '{deployment.name}' pod failure detected: {error_reason}" + ) + status = DeploymentStatus.ERROR + + return status + + def _extract_resource_info( + self, k8s_deployment: k8s_client.V1Deployment + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """Extract resource information from Kubernetes Deployment. + + Args: + k8s_deployment: Kubernetes Deployment resource. + + Returns: + Tuple of (cpu, memory, image) as strings or None. + """ + cpu_str = None + memory_str = None + image_str = None + + if ( + k8s_deployment.spec.template.spec.containers + and k8s_deployment.spec.template.spec.containers[0] + ): + container = k8s_deployment.spec.template.spec.containers[0] + + if container.resources: + if container.resources.requests: + cpu_str = container.resources.requests.get("cpu") + memory_str = container.resources.requests.get("memory") + # Fall back to limits if requests not set + if not cpu_str and container.resources.limits: + cpu_str = container.resources.limits.get("cpu") + if not memory_str and container.resources.limits: + memory_str = container.resources.limits.get("memory") + + if container.image: + image_str = container.image + + return cpu_str, memory_str, image_str + + def _enrich_metadata_with_service_info( + self, + metadata: KubernetesDeploymentMetadata, + k8s_service: k8s_client.V1Service, + settings: KubernetesDeployerSettings, + ) -> None: + """Enrich metadata with service-specific information. + + Args: + metadata: Deployment metadata to enrich. + k8s_service: Kubernetes Service resource. + settings: Deployer settings. + """ + if settings.service_type == "LoadBalancer": + if ( + k8s_service.status.load_balancer + and k8s_service.status.load_balancer.ingress + ): + lb_ingress = k8s_service.status.load_balancer.ingress[0] + metadata.external_ip = lb_ingress.ip or lb_ingress.hostname + elif settings.service_type == "NodePort": + if k8s_service.spec.ports: + metadata.node_port = k8s_service.spec.ports[0].node_port + + def do_get_deployment_state( + self, + deployment: DeploymentResponse, + ) -> DeploymentOperationalState: + """Get the state of a Kubernetes deployment. + + Args: + deployment: The deployment. + + Returns: + The operational state of the deployment. + + Raises: + DeploymentNotFoundError: If deployment is not found. + DeployerError: If the deployment has no snapshot. + """ + snapshot = deployment.snapshot + if not snapshot: + raise DeployerError( + f"Deployment '{deployment.name}' has no snapshot." + ) + + settings = cast( + KubernetesDeployerSettings, + self.get_settings(snapshot), + ) + namespace = self._get_namespace(deployment) + deployment_name = service_name = self._get_resource_base_name( + deployment + ) + ingress_name = self._get_ingress_name(deployment, settings) + labels = self._get_deployment_labels(deployment, settings) + + try: + k8s_deployment = kube_utils.get_deployment( + apps_api=self.k8s_apps_api, + name=deployment_name, + namespace=namespace, + ) + k8s_service = kube_utils.get_service( + core_api=self.k8s_core_api, + name=service_name, + namespace=namespace, + ) + k8s_ingress = None + if ingress_name: + k8s_ingress = kube_utils.get_ingress( + networking_api=self.k8s_networking_api, + name=ingress_name, + namespace=namespace, + ) + + if not k8s_deployment or not k8s_service: + raise DeploymentNotFoundError( + f"Kubernetes resources for deployment '{deployment.name}' " + "not found" + ) + + status = self._determine_deployment_status( + k8s_deployment, deployment, settings + ) + + url = kube_utils.build_service_url( + core_api=self.k8s_core_api, + service=k8s_service, + namespace=namespace, + ingress=k8s_ingress, + ) + + cpu_str, memory_str, image_str = self._extract_resource_info( + k8s_deployment + ) + + pod = self._get_pod_for_deployment(deployment) + pod_name = pod.metadata.name if pod else None + + # Build metadata + metadata = KubernetesDeploymentMetadata( + deployment_name=deployment_name, + namespace=namespace, + service_name=service_name, + pod_name=pod_name, + port=settings.service_port, + service_type=settings.service_type, + replicas=k8s_deployment.spec.replicas or 0, + ready_replicas=k8s_deployment.status.ready_replicas or 0, + available_replicas=k8s_deployment.status.available_replicas + or 0, + cpu=cpu_str, + memory=memory_str, + image=image_str, + labels=labels, + ) + + self._enrich_metadata_with_service_info( + metadata, k8s_service, settings + ) + + return DeploymentOperationalState( + status=status, + url=url, + metadata=metadata.model_dump(), + ) + + except ApiException as e: + if e.status == 404: + raise DeploymentNotFoundError( + f"Kubernetes resources for deployment '{deployment.name}' " + "not found" + ) + raise DeployerError( + f"Failed to get state for deployment '{deployment.name}': {e}" + ) + + # ======================================================================== + # Logging + # ======================================================================== + + def do_get_deployment_state_logs( + self, + deployment: DeploymentResponse, + follow: bool = False, + tail: Optional[int] = None, + ) -> Generator[str, bool, None]: + """Get logs from a Kubernetes deployment. + + Args: + deployment: The deployment. + follow: Whether to follow the logs. + tail: Number of lines to tail. + + Yields: + Log lines. + + Raises: + DeploymentLogsNotFoundError: If logs cannot be retrieved. + + Note: + The Generator type signature includes a bool send type for + compatibility with the base class, though this implementation + does not currently use sent values. + """ + namespace = self._get_namespace(deployment) + pod = self._get_pod_for_deployment(deployment) + + if not pod: + raise DeploymentLogsNotFoundError( + f"No pod found for deployment '{deployment.name}'" + ) + + pod_name = pod.metadata.name + + try: + if follow: + w = k8s_watch.Watch() + for line in w.stream( + self.k8s_core_api.read_namespaced_pod_log, + name=pod_name, + namespace=namespace, + follow=True, + tail_lines=tail, + ): + yield line + else: + logs = self.k8s_core_api.read_namespaced_pod_log( + name=pod_name, + namespace=namespace, + tail_lines=tail, + ) + for line in logs.split("\n"): + if line: + yield line + + except ApiException as e: + raise DeploymentLogsNotFoundError( + f"Failed to retrieve logs for deployment " + f"'{deployment.name}': {e}" + ) + + # ======================================================================== + # Deprovisioning + # ======================================================================== + + def do_deprovision_deployment( + self, + deployment: DeploymentResponse, + timeout: int, + ) -> Optional[DeploymentOperationalState]: + """Deprovision a Kubernetes deployment. + + Args: + deployment: The deployment to deprovision. + timeout: Timeout in seconds. + + Returns: + None to indicate immediate deletion. + + Raises: + DeploymentNotFoundError: If deployment is not found. + DeploymentDeprovisionError: If deprovisioning fails. + """ + snapshot = deployment.snapshot + if not snapshot: + raise DeploymentDeprovisionError( + f"Deployment '{deployment.name}' has no snapshot." + ) + + settings = cast( + KubernetesDeployerSettings, + self.get_settings(snapshot), + ) + namespace = self._get_namespace(deployment) + deployment_name = service_name = self._get_resource_base_name( + deployment + ) + ingress_name = self._get_ingress_name(deployment, settings) + hpa_name = self._get_hpa_name(deployment) + + try: + kube_utils.delete_hpa( + autoscaling_api=self.k8s_autoscaling_api, + name=hpa_name, + namespace=namespace, + ) + logger.info( + f"Deleted HorizontalPodAutoscaler '{hpa_name}' " + f"in namespace '{namespace}'." + ) + + if ingress_name: + kube_utils.delete_ingress( + networking_api=self.k8s_networking_api, + name=ingress_name, + namespace=namespace, + ) + logger.info( + f"Deleted Kubernetes Ingress '{ingress_name}' " + f"in namespace '{namespace}'." + ) + + kube_utils.delete_service( + core_api=self.k8s_core_api, + name=service_name, + namespace=namespace, + ) + logger.info( + f"Deleted Kubernetes Service '{service_name}' " + f"in namespace '{namespace}'." + ) + + kube_utils.delete_deployment( + apps_api=self.k8s_apps_api, + name=deployment_name, + namespace=namespace, + propagation_policy="Foreground", # Wait for pods to be deleted + ) + logger.info( + f"Deleted Kubernetes Deployment '{deployment_name}' " + f"in namespace '{namespace}'." + ) + + try: + secret_name = self._get_secret_name(deployment) + kube_utils.delete_secret( + core_api=self.k8s_core_api, + namespace=namespace, + secret_name=secret_name, + ) + logger.info( + f"Deleted Kubernetes Secret '{secret_name}' " + f"in namespace '{namespace}'." + ) + except ApiException as e: + if e.status == 404: + logger.debug( + f"Secret '{secret_name}' not found in namespace '{namespace}' (already deleted or never existed)." + ) + elif e.status == 403: + logger.error( + f"Permission denied when deleting Secret '{secret_name}' in namespace '{namespace}': {e}. " + f"The Secret may remain and require manual cleanup. " + f"Check RBAC permissions for the service account." + ) + else: + logger.error( + f"Failed to delete Secret '{secret_name}' in namespace '{namespace}': {e}. " + f"The Secret may remain and require manual cleanup." + ) + + return None + + except ApiException as e: + if e.status == 404: + raise DeploymentNotFoundError( + f"Kubernetes resources for deployment '{deployment.name}' " + "not found" + ) + else: + raise DeploymentDeprovisionError( + f"Failed to deprovision deployment '{deployment.name}': {e}" + ) diff --git a/src/zenml/integrations/kubernetes/flavors/__init__.py b/src/zenml/integrations/kubernetes/flavors/__init__.py index 17d67ff4202..5b7e003afa9 100644 --- a/src/zenml/integrations/kubernetes/flavors/__init__.py +++ b/src/zenml/integrations/kubernetes/flavors/__init__.py @@ -13,6 +13,11 @@ # permissions and limitations under the License. """Kubernetes integration flavors.""" +from zenml.integrations.kubernetes.flavors.kubernetes_deployer_flavor import ( + KubernetesDeployerConfig, + KubernetesDeployerFlavor, + KubernetesDeployerSettings, +) from zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor import ( KubernetesOrchestratorConfig, KubernetesOrchestratorFlavor, @@ -25,6 +30,9 @@ ) __all__ = [ + "KubernetesDeployerConfig", + "KubernetesDeployerFlavor", + "KubernetesDeployerSettings", "KubernetesOrchestratorFlavor", "KubernetesOrchestratorConfig", "KubernetesOrchestratorSettings", diff --git a/src/zenml/integrations/kubernetes/flavors/kubernetes_deployer_flavor.py b/src/zenml/integrations/kubernetes/flavors/kubernetes_deployer_flavor.py new file mode 100644 index 00000000000..dd6e3451816 --- /dev/null +++ b/src/zenml/integrations/kubernetes/flavors/kubernetes_deployer_flavor.py @@ -0,0 +1,441 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Kubernetes deployer flavor.""" + +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type + +from pydantic import Field, PositiveInt + +from zenml.constants import KUBERNETES_CLUSTER_RESOURCE_TYPE +from zenml.deployers.base_deployer import ( + BaseDeployerConfig, + BaseDeployerFlavor, + BaseDeployerSettings, +) +from zenml.enums import KubernetesServiceType +from zenml.integrations.kubernetes import KUBERNETES_DEPLOYER_FLAVOR +from zenml.integrations.kubernetes.pod_settings import KubernetesPodSettings +from zenml.models import ServiceConnectorRequirements + +if TYPE_CHECKING: + from zenml.integrations.kubernetes.deployers import KubernetesDeployer + + +class KubernetesDeployerSettings(BaseDeployerSettings): + """Settings for the Kubernetes deployer. + + Attributes: + namespace: Kubernetes namespace for deployments. + service_type: Type of Kubernetes Service (LoadBalancer, NodePort, ClusterIP). + service_port: Port to expose on the service. + node_port: Specific NodePort (only for NodePort service type). + image_pull_policy: Kubernetes image pull policy. + labels: Labels to apply to all resources. + annotations: Annotations to apply to pod resources. + service_annotations: Annotations to apply to Service resources. + session_affinity: Session affinity for the Service (ClientIP or None). + load_balancer_ip: Static IP for LoadBalancer service type. + load_balancer_source_ranges: CIDR blocks allowed to access LoadBalancer. + command: Override container command (entrypoint). + args: Override container args. + readiness_probe_initial_delay: Seconds before first readiness probe after container start. + readiness_probe_period: Seconds between readiness probe checks. + readiness_probe_timeout: Seconds before readiness probe times out. + readiness_probe_failure_threshold: Failed probes before marking container as not ready. + liveness_probe_initial_delay: Seconds before first liveness probe after container start. + liveness_probe_period: Seconds between liveness probe checks. + liveness_probe_timeout: Seconds before liveness probe times out. + liveness_probe_failure_threshold: Failed probes before restarting container. + service_account_name: Kubernetes service account for the deployment pods. + image_pull_secrets: Names of Kubernetes secrets for pulling private images. + pod_settings: Advanced pod configuration settings. + ingress_manifest: Optional Ingress manifest for external access routing. + hpa_manifest: Optional HorizontalPodAutoscaler manifest for autoscaling. + service_deletion_timeout: Timeout in seconds to wait for service deletion. + deployment_ready_check_interval: Interval in seconds between deployment readiness checks. + pod_restart_error_threshold: Number of pod restarts before marking a deployment as failed. + """ + + namespace: Optional[str] = Field( + default=None, + description="Kubernetes namespace for deployments. " + "If not provided, uses the `kubernetes_namespace` from the deployer " + "component configuration (defaults to 'zenml').", + ) + service_type: KubernetesServiceType = Field( + default=KubernetesServiceType.LOAD_BALANCER, + description=( + "Type of Kubernetes Service: LoadBalancer, NodePort, or ClusterIP. " + "LoadBalancer is recommended for production (requires cloud provider support). " + "NodePort has limitations: nodes may lack external IPs or be behind firewalls, " + "making the service unreachable. For production with custom domains and TLS, " + "consider using ClusterIP with a manually configured Ingress resource." + ), + ) + service_port: PositiveInt = Field( + default=8000, + description="Port to expose on the service.", + ) + node_port: Optional[PositiveInt] = Field( + default=None, + description="Specific port on each node (NodePort type only). " + "Must be in range 30000-32767. If not specified, Kubernetes assigns one.", + ) + image_pull_policy: str = Field( + default="IfNotPresent", + description="Kubernetes image pull policy: Always, IfNotPresent, or Never.", + ) + labels: Dict[str, str] = Field( + default_factory=dict, + description="Additional labels to apply to all Kubernetes resources.", + ) + annotations: Dict[str, str] = Field( + default_factory=dict, + description="Annotations to apply to pod resources.", + ) + service_annotations: Dict[str, str] = Field( + default_factory=dict, + description="Annotations to apply to Service resources. " + "Useful for cloud provider-specific configurations.", + ) + session_affinity: Optional[str] = Field( + default=None, + description="Session affinity for the Service. Set to 'ClientIP' to enable.", + ) + load_balancer_ip: Optional[str] = Field( + default=None, + description="Static IP address for LoadBalancer type Services. " + "Cloud provider must support this feature.", + ) + load_balancer_source_ranges: List[str] = Field( + default_factory=list, + description="CIDR blocks allowed to access LoadBalancer. " + "Example: ['10.0.0.0/8', '192.168.0.0/16']", + ) + + # Container configuration + command: Optional[List[str]] = Field( + default=None, + description="Override container command (entrypoint). " + "If not set, uses the image's default CMD/ENTRYPOINT.", + ) + args: Optional[List[str]] = Field( + default=None, + description="Override container args. " + "If not set, uses the image's default CMD.", + ) + + # Probe configuration + readiness_probe_initial_delay: PositiveInt = Field( + default=10, + description="Seconds before first readiness probe after container start.", + ) + readiness_probe_period: PositiveInt = Field( + default=10, + description="Seconds between readiness probe checks.", + ) + readiness_probe_timeout: PositiveInt = Field( + default=5, + description="Seconds before readiness probe times out.", + ) + readiness_probe_failure_threshold: PositiveInt = Field( + default=3, + description="Failed probes before marking container as not ready.", + ) + liveness_probe_initial_delay: PositiveInt = Field( + default=30, + description="Seconds before first liveness probe after container start.", + ) + liveness_probe_period: PositiveInt = Field( + default=10, + description="Seconds between liveness probe checks.", + ) + liveness_probe_timeout: PositiveInt = Field( + default=5, + description="Seconds before liveness probe times out.", + ) + liveness_probe_failure_threshold: PositiveInt = Field( + default=3, + description="Failed probes before restarting container.", + ) + liveness_probe_path: str = Field( + default="/api/health", + description="HTTP path for liveness probe health checks.", + ) + readiness_probe_path: str = Field( + default="/api/health", + description="HTTP path for readiness probe health checks.", + ) + + # Security and access control + service_account_name: Optional[str] = Field( + default=None, + description="Kubernetes service account for the deployment pods. " + "If not set, uses the default service account in the namespace.", + ) + image_pull_secrets: List[str] = Field( + default_factory=list, + description="Names of Kubernetes secrets for pulling private images. " + "Example: ['my-registry-secret', 'dockerhub-secret']", + ) + + pod_settings: Optional[KubernetesPodSettings] = Field( + default=None, + description="Advanced pod configuration: volumes, affinity, tolerations, etc.", + ) + + # Ingress configuration + ingress_manifest: Optional[Dict[str, Any]] = Field( + default=None, + description="Optional Ingress manifest for external access routing. " + "If provided, ZenML will create/update this Ingress alongside the Deployment and Service. " + "The Ingress should route to the service created by this deployer. " + "Requires an Ingress Controller (nginx, traefik, etc.) in the cluster. " + "Example:\n" + " {\n" + ' "apiVersion": "networking.k8s.io/v1",\n' + ' "kind": "Ingress",\n' + ' "metadata": {\n' + ' "name": "my-ingress",\n' + ' "annotations": {\n' + ' "nginx.ingress.kubernetes.io/rewrite-target": "/"\n' + " }\n" + " },\n" + ' "spec": {\n' + ' "ingressClassName": "nginx",\n' + ' "rules": [\n' + " {\n" + ' "host": "my-app.example.com",\n' + ' "http": {\n' + ' "paths": [\n' + " {\n" + ' "path": "/",\n' + ' "pathType": "Prefix",\n' + ' "backend": {\n' + ' "service": {\n' + ' "name": "",\n' + ' "port": {"number": 8000}\n' + " }\n" + " }\n" + " }\n" + " ]\n" + " }\n" + " }\n" + " ],\n" + ' "tls": [\n' + ' {"hosts": ["my-app.example.com"], "secretName": "tls-secret"}\n' + " ]\n" + " }\n" + " }\n" + "See: https://kubernetes.io/docs/concepts/services-networking/ingress/", + ) + + # Autoscaling configuration + hpa_manifest: Optional[Dict[str, Any]] = Field( + default=None, + description="Optional HorizontalPodAutoscaler (HPA) manifest for autoscaling. " + "If provided, ZenML will create/update this HPA alongside the Deployment. " + "The HPA must target the deployment created by this deployer. " + "Use the autoscaling/v2 API. " + "Example:\n" + " {\n" + ' "apiVersion": "autoscaling/v2",\n' + ' "kind": "HorizontalPodAutoscaler",\n' + ' "metadata": {"name": "my-hpa"},\n' + ' "spec": {\n' + ' "scaleTargetRef": {\n' + ' "apiVersion": "apps/v1",\n' + ' "kind": "Deployment",\n' + ' "name": ""\n' + " },\n" + ' "minReplicas": 2,\n' + ' "maxReplicas": 10,\n' + ' "metrics": [\n' + ' {"type": "Resource", "resource": {"name": "cpu", ' + '"target": {"type": "Utilization", "averageUtilization": 75}}}\n' + " ]\n" + " }\n" + " }\n" + "See: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/", + ) + + # Operational timeouts and thresholds + service_deletion_timeout: PositiveInt = Field( + default=60, + description="Timeout in seconds to wait for service deletion when recreating services " + "with immutable field changes. Defaults to 60 seconds.", + ) + + deployment_ready_check_interval: PositiveInt = Field( + default=2, + description="Interval in seconds between deployment readiness checks. " + "Lower values provide faster feedback but increase API calls. Defaults to 2 seconds.", + ) + + pod_restart_error_threshold: PositiveInt = Field( + default=2, + description="Number of pod restarts before marking a deployment as failed. " + "Helps detect crashloop backoffs early. Defaults to 2 restarts.", + ) + + +class KubernetesDeployerConfig(BaseDeployerConfig, KubernetesDeployerSettings): + """Configuration for the Kubernetes deployer. + + This config combines deployer-specific settings with Kubernetes + component configuration (context, namespace, in-cluster mode). + + Attributes: + incluster: If `True`, the deployer will run inside the same cluster in which it itself is running. This requires the client to run in a Kubernetes pod itself. If set, the `kubernetes_context` config option is ignored. If the stack component is linked to a Kubernetes service connector, this field is ignored. + kubernetes_context: Name of a Kubernetes context to run deployments in. If the stack component is linked to a Kubernetes service connector, this field is ignored. Otherwise, it is mandatory. + kubernetes_namespace: Default Kubernetes namespace for deployments. Can be overridden per-deployment using the `namespace` setting. Defaults to 'zenml-deployments'. + local: If `True`, the deployer will assume it is connected to a local kubernetes cluster and will perform additional validations. + """ + + incluster: bool = Field( + False, + description="If `True`, the deployer will run inside the " + "same cluster in which it itself is running. This requires the client " + "to run in a Kubernetes pod itself. If set, the `kubernetes_context` " + "config option is ignored. If the stack component is linked to a " + "Kubernetes service connector, this field is ignored.", + ) + + kubernetes_context: Optional[str] = Field( + None, + description="Name of a Kubernetes context to run deployments in. " + "If the stack component is linked to a Kubernetes service connector, " + "this field is ignored. Otherwise, it is mandatory.", + ) + + kubernetes_namespace: str = Field( + "zenml-deployments", + description="Default Kubernetes namespace for deployments. " + "Can be overridden per-deployment using the `namespace` setting. " + "Defaults to 'zenml-deployments'.", + ) + + local: bool = Field( + False, + description="If `True`, the deployer will assume it is connected to a " + "local kubernetes cluster and will perform additional validations.", + ) + + @property + def is_local(self) -> bool: + """Checks if this is a local Kubernetes cluster. + + Returns: + True if using a local Kubernetes cluster, False otherwise. + """ + # Check if context indicates a local cluster + if self.kubernetes_context: + local_context_indicators = [ + "k3d-", + "kind-", + "minikube", + "docker-desktop", + "colima", + "rancher-desktop", + ] + context_lower = self.kubernetes_context.lower() + return any( + indicator in context_lower + for indicator in local_context_indicators + ) + return self.local + + @property + def is_remote(self) -> bool: + """Checks if this stack component is running remotely. + + Returns: + True if this config is for a remote component, False otherwise. + """ + return not self.is_local + + +class KubernetesDeployerFlavor(BaseDeployerFlavor): + """Flavor for the Kubernetes deployer.""" + + @property + def name(self) -> str: + """The name of the flavor. + + Returns: + The flavor name. + """ + return KUBERNETES_DEPLOYER_FLAVOR + + @property + def service_connector_requirements( + self, + ) -> Optional[ServiceConnectorRequirements]: + """Service connector requirements for the Kubernetes deployer. + + Returns: + Service connector requirements. + """ + return ServiceConnectorRequirements( + resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE, + ) + + @property + def docs_url(self) -> Optional[str]: + """A URL to docs about this flavor. + + Returns: + The documentation URL. + """ + return self.generate_default_docs_url() + + @property + def sdk_docs_url(self) -> Optional[str]: + """A URL to SDK docs about this flavor. + + Returns: + The SDK documentation URL. + """ + return self.generate_default_sdk_docs_url() + + @property + def logo_url(self) -> str: + """The logo URL for the flavor. + + Returns: + The logo URL. + """ + return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/kubernetes.png" + + @property + def config_class(self) -> Type[KubernetesDeployerConfig]: + """Returns `KubernetesDeployerConfig` config class. + + Returns: + The config class. + """ + return KubernetesDeployerConfig + + @property + def implementation_class(self) -> Type["KubernetesDeployer"]: + """Returns the implementation class for this flavor. + + Returns: + The implementation class. + """ + from zenml.integrations.kubernetes.deployers import ( + KubernetesDeployer, + ) + + return KubernetesDeployer diff --git a/src/zenml/integrations/kubernetes/orchestrators/kube_utils.py b/src/zenml/integrations/kubernetes/kube_utils.py similarity index 51% rename from src/zenml/integrations/kubernetes/orchestrators/kube_utils.py rename to src/zenml/integrations/kubernetes/kube_utils.py index 0e3674c4a64..7e502189a6a 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/kube_utils.py +++ b/src/zenml/integrations/kubernetes/kube_utils.py @@ -37,7 +37,17 @@ import re import time from collections import defaultdict -from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, cast +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Tuple, + TypeVar, + cast, +) from kubernetes import client as k8s_client from kubernetes import config as k8s_config @@ -46,7 +56,7 @@ from zenml.integrations.kubernetes.constants import ( STEP_NAME_ANNOTATION_KEY, ) -from zenml.integrations.kubernetes.orchestrators.manifest_utils import ( +from zenml.integrations.kubernetes.manifest_utils import ( build_namespace_manifest, build_role_binding_manifest_for_service_account, build_secret_manifest, @@ -56,10 +66,15 @@ from zenml.logger import get_logger from zenml.utils.time_utils import utc_now +if TYPE_CHECKING: + from zenml.config.resource_settings import ResourceSettings + logger = get_logger(__name__) R = TypeVar("R") +MIB_TO_GIB = 1024 # MiB to GiB conversion factor + # This is to fix a bug in the kubernetes client which has some wrong # client-side validations that means the `on_exit_codes` field is @@ -331,6 +346,9 @@ def _if_not_exists(create_fn: FuncT) -> FuncT: Returns: Wrapped Kubernetes function. + + Raises: + ApiException: If an API error occurs. """ def create_if_not_exists(*args: Any, **kwargs: Any) -> None: @@ -1171,3 +1189,1078 @@ def apply_default_resource_requests( pod_settings.resources["requests"] = resources["requests"] return pod_settings + + +# ============================================================================ +# Waiting and Monitoring Functions +# ============================================================================ + + +def wait_for_service_deletion( + core_api: k8s_client.CoreV1Api, + service_name: str, + namespace: str, + timeout: int = 60, +) -> None: + """Wait for a Service to be fully deleted. + + Polls the Service until it returns 404, indicating deletion is complete. + This prevents race conditions when recreating Services with immutable + field changes. + + Args: + core_api: Kubernetes CoreV1Api client. + service_name: Name of the Service to wait for. + namespace: Namespace containing the Service. + timeout: Maximum time to wait in seconds. Default is 60. + + Raises: + RuntimeError: If Service is not deleted within timeout. + ApiException: If an API error occurs (other than 404). + """ + start_time = time.time() + backoff = 1.0 + max_backoff = 5.0 + + while time.time() - start_time < timeout: + try: + core_api.read_namespaced_service( + name=service_name, + namespace=namespace, + ) + logger.debug( + f"Waiting for Service '{service_name}' deletion to complete..." + ) + time.sleep(backoff) + backoff = min(backoff * 1.5, max_backoff) + except ApiException as e: + if e.status == 404: + logger.debug(f"Service '{service_name}' deletion confirmed.") + return + raise + + raise RuntimeError( + f"Timeout waiting for Service '{service_name}' to be deleted after " + f"{timeout} seconds. Service may have finalizers or the cluster may " + f"be slow. Check Service status with kubectl." + ) + + +def wait_for_deployment_ready( + apps_api: k8s_client.AppsV1Api, + deployment_name: str, + namespace: str, + timeout: int, + check_interval: float = 2.0, +) -> None: + """Wait for a Deployment to become ready. + + Args: + apps_api: Kubernetes AppsV1Api client. + deployment_name: Name of the Deployment to wait for. + namespace: Namespace containing the Deployment. + timeout: Maximum time to wait in seconds. + check_interval: Seconds between status checks. Default is 2.0. + + Raises: + RuntimeError: If deployment doesn't become ready within timeout + or encounters a failure condition. + """ + logger.info( + f"Waiting up to {timeout}s for deployment '{deployment_name}' " + f"to become ready..." + ) + + start_time = time.time() + while time.time() - start_time < timeout: + try: + deployment = apps_api.read_namespaced_deployment( + name=deployment_name, + namespace=namespace, + ) + + if deployment.status: + available_replicas = deployment.status.available_replicas or 0 + replicas = deployment.spec.replicas or 0 + + if available_replicas == replicas and replicas > 0: + logger.info( + f"Deployment '{deployment_name}' is ready with " + f"{available_replicas}/{replicas} replicas available." + ) + return + + if deployment.status.conditions: + for condition in deployment.status.conditions: + if ( + condition.type == "Progressing" + and condition.status == "False" + and condition.reason == "ProgressDeadlineExceeded" + ): + raise RuntimeError( + f"Deployment '{deployment_name}' failed to " + f"progress: {condition.message}" + ) + + logger.debug( + f"Deployment '{deployment_name}' status: " + f"{available_replicas}/{replicas} replicas available" + ) + + except ApiException as e: + if e.status != 404: + raise RuntimeError( + f"Error checking deployment status: {e}" + ) from e + + time.sleep(check_interval) + + raise RuntimeError( + f"Deployment '{deployment_name}' did not become ready " + f"within {timeout} seconds" + ) + + +def wait_for_loadbalancer_ip( + core_api: k8s_client.CoreV1Api, + service_name: str, + namespace: str, + timeout: int = 150, + check_interval: float = 2.0, +) -> Optional[str]: + """Wait for a LoadBalancer service to get an external IP. + + Args: + core_api: Kubernetes CoreV1Api client. + service_name: Name of the LoadBalancer Service. + namespace: Namespace containing the Service. + timeout: Maximum time to wait in seconds. Default is 150. + check_interval: Seconds between status checks. Default is 2.0. + + Returns: + The external IP/hostname if assigned, None if timeout reached. + Note: Returns None on timeout rather than raising to allow + deployment to continue (IP might be assigned later). + """ + logger.info( + f"Waiting up to {timeout}s for LoadBalancer service '{service_name}' " + f"to get an external IP..." + ) + + start_time = time.time() + while time.time() - start_time < timeout: + try: + service = core_api.read_namespaced_service( + name=service_name, + namespace=namespace, + ) + + if ( + service.status + and service.status.load_balancer + and service.status.load_balancer.ingress + ): + ingress = service.status.load_balancer.ingress[0] + external_ip: Optional[str] = ingress.ip or ingress.hostname + if external_ip: + logger.info( + f"LoadBalancer service '{service_name}' received " + f"external IP/hostname: {external_ip}" + ) + return str(external_ip) + + logger.debug( + f"LoadBalancer service '{service_name}' is still waiting " + f"for external IP assignment..." + ) + + except ApiException as e: + if e.status != 404: + logger.error(f"Error checking service status: {e}") + return None + + time.sleep(check_interval) + + logger.warning( + f"LoadBalancer service '{service_name}' did not receive an " + f"external IP within {timeout} seconds. The deployment is still " + f"running, but you may need to check the service status later." + ) + return None + + +def check_pod_failure_status( + pod: k8s_client.V1Pod, + restart_error_threshold: int = 5, +) -> Optional[str]: + """Check if a pod has container failures indicating deployment errors. + + Args: + pod: The Kubernetes pod to inspect. + restart_error_threshold: Number of restarts to consider as an error. + Default is 5. + + Returns: + Error reason if pod has failures, None otherwise. + """ + if not pod.status or not pod.status.container_statuses: + return None + + # Error reasons that indicate permanent or recurring failures + ERROR_REASONS = { + "CrashLoopBackOff", + "ErrImagePull", + "ImagePullBackOff", + "CreateContainerConfigError", + "InvalidImageName", + "CreateContainerError", + "RunContainerError", + } + + for container_status in pod.status.container_statuses: + if container_status.state and container_status.state.waiting: + reason = container_status.state.waiting.reason + if reason in ERROR_REASONS: + message = container_status.state.waiting.message or "" + return f"{reason}: {message}".strip(": ") + + if container_status.state and container_status.state.terminated: + reason = container_status.state.terminated.reason + exit_code = container_status.state.terminated.exit_code + if exit_code and exit_code != 0: + message = container_status.state.terminated.message or "" + return f"Container terminated with exit code {exit_code}: {reason} {message}".strip() + + if ( + container_status.last_state + and container_status.last_state.terminated + ): + restart_count = container_status.restart_count or 0 + if restart_count > restart_error_threshold: + reason = ( + container_status.last_state.terminated.reason or "Error" + ) + exit_code = container_status.last_state.terminated.exit_code + return f"Container restarting ({restart_count} restarts): {reason} (exit code {exit_code})" + + return None + + +# ============================================================================ +# Deployment Management Functions +# ============================================================================ + + +def get_deployment( + apps_api: k8s_client.AppsV1Api, + name: str, + namespace: str, +) -> Optional[k8s_client.V1Deployment]: + """Get a Kubernetes Deployment. + + Args: + apps_api: Kubernetes Apps V1 API client. + name: Name of the deployment. + namespace: Kubernetes namespace. + + Returns: + The Deployment object, or None if not found. + + Raises: + ApiException: If an API error occurs. + """ + try: + return retry_on_api_exception( + apps_api.read_namespaced_deployment, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace) + except ApiException as e: + if e.status == 404: + return None + raise + + +def create_deployment( + apps_api: k8s_client.AppsV1Api, + namespace: str, + deployment_manifest: k8s_client.V1Deployment, +) -> k8s_client.V1Deployment: + """Create a Kubernetes Deployment. + + Args: + apps_api: Kubernetes Apps V1 API client. + namespace: Kubernetes namespace. + deployment_manifest: The Deployment manifest. + + Returns: + The created Deployment. + """ + return retry_on_api_exception( + apps_api.create_namespaced_deployment, + fail_on_status_codes=(404, 409), + )(namespace=namespace, body=deployment_manifest) + + +def update_deployment( + apps_api: k8s_client.AppsV1Api, + name: str, + namespace: str, + deployment_manifest: k8s_client.V1Deployment, +) -> k8s_client.V1Deployment: + """Update a Kubernetes Deployment. + + Args: + apps_api: Kubernetes Apps V1 API client. + name: Name of the deployment. + namespace: Kubernetes namespace. + deployment_manifest: The updated Deployment manifest. + + Returns: + The updated Deployment. + """ + return retry_on_api_exception( + apps_api.patch_namespaced_deployment, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace, body=deployment_manifest) + + +def delete_deployment( + apps_api: k8s_client.AppsV1Api, + name: str, + namespace: str, + propagation_policy: str = "Foreground", +) -> None: + """Delete a Kubernetes Deployment. + + Args: + apps_api: Kubernetes Apps V1 API client. + name: Name of the deployment. + namespace: Kubernetes namespace. + propagation_policy: Deletion propagation policy. + + Raises: + ApiException: If an API error occurs. + """ + try: + retry_on_api_exception( + apps_api.delete_namespaced_deployment, + fail_on_status_codes=(404,), + )( + name=name, + namespace=namespace, + propagation_policy=propagation_policy, + ) + except ApiException as e: + if e.status != 404: + raise + + +# ============================================================================ +# Service Management Functions +# ============================================================================ + + +def get_service( + core_api: k8s_client.CoreV1Api, + name: str, + namespace: str, +) -> Optional[k8s_client.V1Service]: + """Get a Kubernetes Service. + + Args: + core_api: Kubernetes Core V1 API client. + name: Name of the service. + namespace: Kubernetes namespace. + + Returns: + The Service object, or None if not found. + + Raises: + ApiException: If an API error occurs. + """ + try: + return retry_on_api_exception( + core_api.read_namespaced_service, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace) + except ApiException as e: + if e.status == 404: + return None + raise + + +def create_service( + core_api: k8s_client.CoreV1Api, + namespace: str, + service_manifest: k8s_client.V1Service, +) -> k8s_client.V1Service: + """Create a Kubernetes Service. + + Args: + core_api: Kubernetes Core V1 API client. + namespace: Kubernetes namespace. + service_manifest: The Service manifest. + + Returns: + The created Service. + """ + return retry_on_api_exception( + core_api.create_namespaced_service, + fail_on_status_codes=(404, 409), + )(namespace=namespace, body=service_manifest) + + +def update_service( + core_api: k8s_client.CoreV1Api, + name: str, + namespace: str, + service_manifest: k8s_client.V1Service, +) -> k8s_client.V1Service: + """Update a Kubernetes Service. + + Args: + core_api: Kubernetes Core V1 API client. + name: Name of the service. + namespace: Kubernetes namespace. + service_manifest: The updated Service manifest. + + Returns: + The updated Service. + """ + return retry_on_api_exception( + core_api.patch_namespaced_service, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace, body=service_manifest) + + +def delete_service( + core_api: k8s_client.CoreV1Api, + name: str, + namespace: str, +) -> None: + """Delete a Kubernetes Service. + + Args: + core_api: Kubernetes Core V1 API client. + name: Name of the service. + namespace: Kubernetes namespace. + + Raises: + ApiException: If an API error occurs. + """ + try: + retry_on_api_exception( + core_api.delete_namespaced_service, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace) + except ApiException as e: + if e.status != 404: + raise + + +def service_needs_recreate( + existing_service: k8s_client.V1Service, + new_manifest: k8s_client.V1Service, +) -> bool: + """Check if a Service needs to be recreated due to immutable field changes. + + Args: + existing_service: The existing Service from the cluster. + new_manifest: The new Service manifest to apply. + + Returns: + True if the Service needs to be deleted and recreated, False otherwise. + """ + existing_type = existing_service.spec.type + new_type = new_manifest.spec.type + if existing_type != new_type: + logger.debug( + f"Service type changed from {existing_type} to {new_type}, " + f"requires recreate" + ) + return True + + # ClusterIP is immutable (except for "None" for headless services) + existing_cluster_ip = existing_service.spec.cluster_ip + new_cluster_ip = new_manifest.spec.cluster_ip + if ( + existing_cluster_ip + and new_cluster_ip + and existing_cluster_ip != new_cluster_ip + and existing_cluster_ip != "None" + and new_cluster_ip != "None" + ): + logger.debug( + f"Service clusterIP changed from {existing_cluster_ip} to " + f"{new_cluster_ip}, requires recreate" + ) + return True + + # NodePort values are immutable once assigned + if existing_type == "NodePort" or new_type == "NodePort": + existing_ports = existing_service.spec.ports or [] + new_ports = new_manifest.spec.ports or [] + + # Build maps keyed by (port name/number, target port) -> node port + existing_node_ports = { + (p.name or str(p.port), p.target_port): p.node_port + for p in existing_ports + if p.node_port + } + new_node_ports = { + (p.name or str(p.port), p.target_port): p.node_port + for p in new_ports + if p.node_port + } + + for key, existing_node_port in existing_node_ports.items(): + if ( + key in new_node_ports + and new_node_ports[key] != existing_node_port + ): + logger.debug( + f"Service nodePort changed for {key}, requires recreate" + ) + return True + + return False + + +# ============================================================================ +# Ingress Management Functions +# ============================================================================ + + +def get_ingress( + networking_api: k8s_client.NetworkingV1Api, + name: str, + namespace: str, +) -> Optional[k8s_client.V1Ingress]: + """Get a Kubernetes Ingress. + + Args: + networking_api: Kubernetes Networking V1 API client. + name: Name of the ingress. + namespace: Kubernetes namespace. + + Returns: + The Ingress object, or None if not found. + + Raises: + ApiException: If an API error occurs. + """ + try: + return retry_on_api_exception( + networking_api.read_namespaced_ingress, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace) + except ApiException as e: + if e.status == 404: + return None + raise + +def delete_ingress( + networking_api: k8s_client.NetworkingV1Api, + name: str, + namespace: str, +) -> None: + """Delete a Kubernetes Ingress. + + Args: + networking_api: Kubernetes Networking V1 API client. + name: Name of the ingress. + namespace: Kubernetes namespace. + + Raises: + ApiException: If an API error occurs. + """ + try: + retry_on_api_exception( + networking_api.delete_namespaced_ingress, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace) + except ApiException as e: + if e.status != 404: + raise + + +def create_or_update_ingress( + networking_api: k8s_client.NetworkingV1Api, + namespace: str, + ingress_manifest: Dict[str, Any], +) -> None: + """Create or update a Kubernetes Ingress. + + Args: + networking_api: Kubernetes Networking V1 API client. + namespace: Kubernetes namespace. + ingress_manifest: The Ingress manifest as a dictionary. + + Raises: + ApiException: If an API error occurs. + """ + ingress_name = ingress_manifest.get("metadata", {}).get("name") + if not ingress_name: + logger.warning( + "Ingress manifest is missing 'metadata.name'. Skipping Ingress creation." + ) + return + + try: + retry_on_api_exception( + networking_api.create_namespaced_ingress, + fail_on_status_codes=(404,), + )(namespace=namespace, body=ingress_manifest) + logger.info(f"Created Ingress '{ingress_name}'.") + except ApiException as e: + if e.status == 409: + # Ingress already exists, update it + try: + retry_on_api_exception( + networking_api.patch_namespaced_ingress, + fail_on_status_codes=(404,), + )(name=ingress_name, namespace=namespace, body=ingress_manifest) + logger.info(f"Updated Ingress '{ingress_name}'.") + except ApiException as patch_error: + logger.warning( + f"Failed to update Ingress '{ingress_name}': {patch_error}" + ) + else: + logger.warning(f"Failed to create Ingress '{ingress_name}': {e}") + + +# ============================================================================ +# HorizontalPodAutoscaler Management Functions +# ============================================================================ + + +def get_hpa( + autoscaling_api: k8s_client.AutoscalingV2Api, + name: str, + namespace: str, +) -> Optional[k8s_client.V1HorizontalPodAutoscaler]: + """Get a Kubernetes HorizontalPodAutoscaler. + + Args: + autoscaling_api: Kubernetes Autoscaling V2 API client. + name: Name of the HPA. + namespace: Kubernetes namespace. + + Returns: + The HPA object, or None if not found. + + Raises: + ApiException: If an API error occurs. + + """ + try: + return retry_on_api_exception( + autoscaling_api.read_namespaced_horizontal_pod_autoscaler, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace) + except ApiException as e: + if e.status == 404: + return None + raise + + +def create_or_update_hpa( + autoscaling_api: k8s_client.AutoscalingV2Api, + namespace: str, + hpa_manifest: Dict[str, Any], +) -> None: + """Create or update a Kubernetes HorizontalPodAutoscaler. + + Args: + autoscaling_api: Kubernetes Autoscaling V2 API client. + namespace: Kubernetes namespace. + hpa_manifest: The HPA manifest as a dictionary. + + Raises: + ApiException: If an API error occurs. + """ + hpa_name = hpa_manifest.get("metadata", {}).get("name") + if not hpa_name: + logger.warning( + "HPA manifest is missing 'metadata.name'. Skipping HPA creation." + ) + return + + try: + retry_on_api_exception( + autoscaling_api.create_namespaced_horizontal_pod_autoscaler, + fail_on_status_codes=(404,), + )(namespace=namespace, body=hpa_manifest) + logger.info(f"Created HorizontalPodAutoscaler '{hpa_name}'.") + except ApiException as e: + if e.status == 409: + # HPA already exists, update it + try: + retry_on_api_exception( + autoscaling_api.patch_namespaced_horizontal_pod_autoscaler, + fail_on_status_codes=(404,), + )(name=hpa_name, namespace=namespace, body=hpa_manifest) + logger.info(f"Updated HorizontalPodAutoscaler '{hpa_name}'.") + except ApiException as patch_error: + logger.warning( + f"Failed to update HPA '{hpa_name}': {patch_error}" + ) + else: + logger.warning(f"Failed to create HPA '{hpa_name}': {e}") + + +def delete_hpa( + autoscaling_api: k8s_client.AutoscalingV2Api, + name: str, + namespace: str, +) -> None: + """Delete a Kubernetes HorizontalPodAutoscaler. + + Args: + autoscaling_api: Kubernetes Autoscaling V2 API client. + name: Name of the HPA. + namespace: Kubernetes namespace. + + Raises: + ApiException: If an API error occurs. + """ + try: + retry_on_api_exception( + autoscaling_api.delete_namespaced_horizontal_pod_autoscaler, + fail_on_status_codes=(404,), + )(name=name, namespace=namespace) + except ApiException as e: + if e.status == 404: + logger.debug( + f"HPA '{name}' not found (expected if not configured)" + ) + else: + logger.warning(f"Failed to delete HPA '{name}': {e}") + + +# ============================================================================ +# Pod Management Functions +# ============================================================================ + + +def list_pods( + core_api: k8s_client.CoreV1Api, + namespace: str, + label_selector: Optional[str] = None, +) -> k8s_client.V1PodList: + """List pods in a namespace. + + Args: + core_api: Kubernetes Core V1 API client. + namespace: Kubernetes namespace. + label_selector: Optional label selector to filter pods. + + Returns: + List of pods. + """ + return retry_on_api_exception( + core_api.list_namespaced_pod, + fail_on_status_codes=(404,), + )(namespace=namespace, label_selector=label_selector) + + +# ============================================================================ +# Resource Conversion Utilities +# ============================================================================ + + +def convert_resource_settings_to_k8s_format( + resource_settings: "ResourceSettings", +) -> Tuple[Dict[str, str], Dict[str, str], int]: + """Convert ZenML ResourceSettings to Kubernetes resource format. + + Args: + resource_settings: The resource settings from pipeline configuration. + + Returns: + Tuple of (requests, limits, replicas) in Kubernetes format. + - requests: Dict with 'cpu', 'memory', and optionally 'nvidia.com/gpu' keys + - limits: Dict with 'cpu', 'memory', and optionally 'nvidia.com/gpu' keys + - replicas: Number of replicas + + Raises: + ValueError: If replica configuration is invalid. + """ + from zenml.config.resource_settings import ByteUnit + + requests: Dict[str, str] = {} + limits: Dict[str, str] = {} + + if resource_settings.cpu_count is not None: + cpu_value = resource_settings.cpu_count + # Kubernetes accepts CPU as whole numbers (e.g., "2") or millicores (e.g., "500m") + if cpu_value < 1: + # Fractional CPUs: 0.5 → "500m" + cpu_str = f"{int(cpu_value * 1000)}m" + else: + if cpu_value == int(cpu_value): + cpu_str = str(int(cpu_value)) # 2.0 → "2" + else: + cpu_str = f"{int(cpu_value * 1000)}m" # 1.5 → "1500m" + + requests["cpu"] = cpu_str + limits["cpu"] = cpu_str + + if resource_settings.memory is not None: + memory_value = resource_settings.get_memory(unit=ByteUnit.MIB) + if memory_value is not None: + # Use Gi only for clean conversions to avoid precision loss + if memory_value >= MIB_TO_GIB and memory_value % MIB_TO_GIB == 0: + memory_str = f"{int(memory_value / MIB_TO_GIB)}Gi" + else: + memory_str = f"{int(memory_value)}Mi" + + requests["memory"] = memory_str + limits["memory"] = memory_str + + # Determine replica count from min/max settings + # For standard K8s Deployments, we use min_replicas as the baseline + # (autoscaling requires a separate HPA resource) + min_r = resource_settings.min_replicas or 0 + max_r = resource_settings.max_replicas or 0 + + if max_r > 0 and min_r > max_r: + raise ValueError( + f"min_replicas ({min_r}) cannot be greater than max_replicas ({max_r})" + ) + + replicas = min_r if min_r > 0 else (max_r if max_r > 0 else 1) + + if ( + resource_settings.gpu_count is not None + and resource_settings.gpu_count > 0 + ): + # GPU requests must be integers; Kubernetes auto-sets requests=limits for GPUs + gpu_str = str(resource_settings.gpu_count) + requests["nvidia.com/gpu"] = gpu_str + limits["nvidia.com/gpu"] = gpu_str + logger.info( + f"Configured {resource_settings.gpu_count} GPU(s) per pod. " + f"Ensure your cluster has GPU nodes with the nvidia.com/gpu resource. " + f"You may need to install the NVIDIA device plugin: " + f"https://github.com/NVIDIA/k8s-device-plugin" + ) + + return requests, limits, replicas + + +# ============================================================================ +# URL Building Utilities +# ============================================================================ + + +def build_url_from_ingress(ingress: k8s_client.V1Ingress) -> Optional[str]: + """Extract URL from Kubernetes Ingress resource. + + Args: + ingress: Kubernetes Ingress resource. + + Returns: + URL from ingress rules or load balancer, or None if not available. + """ + if not ingress.spec: + logger.warning( + f"Ingress '{ingress.metadata.name if ingress.metadata else 'unknown'}' " + f"has no spec. Cannot build URL." + ) + return None + + protocol = "https" if ingress.spec.tls else "http" + + # Try to get URL from ingress rules + if ingress.spec.rules: + rule = ingress.spec.rules[0] + if rule.host and rule.http and rule.http.paths: + path = rule.http.paths[0].path or "/" + return f"{protocol}://{rule.host}{path}" + + # Try to get URL from load balancer status + if ( + ingress.status + and ingress.status.load_balancer + and ingress.status.load_balancer.ingress + ): + lb_ingress = ingress.status.load_balancer.ingress[0] + host = lb_ingress.ip or lb_ingress.hostname + if host: + path = "/" + if ( + ingress.spec.rules + and ingress.spec.rules[0].http + and ingress.spec.rules[0].http.paths + ): + path = ingress.spec.rules[0].http.paths[0].path or "/" + return f"{protocol}://{host}{path}" + + return None + + +def build_url_from_loadbalancer_service( + service: k8s_client.V1Service, +) -> Optional[str]: + """Get URL from LoadBalancer service. + + Args: + service: Kubernetes Service resource. + + Returns: + LoadBalancer URL or None if IP not yet assigned. + """ + if not service.spec or not service.spec.ports: + return None + + service_port = service.spec.ports[0].port + + if ( + service.status + and service.status.load_balancer + and service.status.load_balancer.ingress + ): + lb_ingress = service.status.load_balancer.ingress[0] + host = lb_ingress.ip or lb_ingress.hostname + if host: + return f"http://{host}:{service_port}" + return None + + +def build_url_from_nodeport_service( + core_api: k8s_client.CoreV1Api, + service: k8s_client.V1Service, + namespace: str, +) -> Optional[str]: + """Get URL from NodePort service. + + Args: + core_api: Kubernetes Core V1 API client. + service: Kubernetes Service resource. + namespace: Kubernetes namespace. + + Returns: + NodePort URL or None if not accessible. + """ + if not service.spec or not service.spec.ports: + return None + + node_port = service.spec.ports[0].node_port + service_port = service.spec.ports[0].port + service_name = service.metadata.name if service.metadata else "unknown" + + if not node_port: + return None + + try: + nodes = core_api.list_node() + + # Try to find external IP first + for node in nodes.items: + if node.status and node.status.addresses: + for address in node.status.addresses: + if address.type == "ExternalIP": + logger.info( + f"NodePort service accessible at: http://{address.address}:{node_port}" + ) + return f"http://{address.address}:{node_port}" + + # Fall back to internal IP with warning + for node in nodes.items: + if node.status and node.status.addresses: + for address in node.status.addresses: + if address.type == "InternalIP": + logger.warning( + f"NodePort service '{service_name}' has no nodes with ExternalIP. " + f"The returned InternalIP URL is likely NOT accessible from outside the cluster. " + f"For local access, use: kubectl port-forward -n {namespace} " + f"service/{service_name} 8080:{service_port}" + ) + return f"http://{address.address}:{node_port}" + + logger.error( + f"NodePort service '{service_name}' deployed, but no node IPs available. " + f"Use: kubectl port-forward -n {namespace} service/{service_name} 8080:{service_port}" + ) + return None + + except Exception as e: + logger.error( + f"Failed to get node IPs for NodePort service: {e}. " + f"Use: kubectl port-forward -n {namespace} service/{service_name} 8080:{service_port}" + ) + return None + + +def build_url_from_clusterip_service( + service: k8s_client.V1Service, + namespace: str, +) -> str: + """Get internal DNS URL from ClusterIP service. + + Args: + service: Kubernetes Service resource. + namespace: Kubernetes namespace. + + Returns: + Internal cluster DNS URL. + """ + service_name = service.metadata.name if service.metadata else "unknown" + + if not service.spec or not service.spec.ports: + return f"http://{service_name}.{namespace}.svc.cluster.local" + + service_port = service.spec.ports[0].port + + logger.warning( + f"Service '{service_name}' uses ClusterIP, which is only " + f"accessible from within the Kubernetes cluster. " + f"For local access, use: kubectl port-forward -n {namespace} " + f"service/{service_name} 8080:{service_port}" + ) + return ( + f"http://{service_name}.{namespace}.svc.cluster.local:{service_port}" + ) + + +def build_service_url( + core_api: k8s_client.CoreV1Api, + service: k8s_client.V1Service, + namespace: str, + ingress: Optional[k8s_client.V1Ingress] = None, +) -> Optional[str]: + """Build URL for accessing a Kubernetes service. + + Args: + core_api: Kubernetes Core V1 API client. + service: Kubernetes Service resource. + namespace: Kubernetes namespace. + ingress: Optional Kubernetes Ingress resource. + + Returns: + Service URL or None if not yet available. + """ + # If ingress is configured, use it + if ingress: + return build_url_from_ingress(ingress) + + if not service.spec or not service.spec.type: + service_name = service.metadata.name if service.metadata else "unknown" + logger.warning( + f"Service '{service_name}' has no type specified in spec. " + f"Cannot build service URL." + ) + return None + + # Otherwise, build URL based on service type + service_type = service.spec.type + + if service_type == "LoadBalancer": + return build_url_from_loadbalancer_service(service) + elif service_type == "NodePort": + return build_url_from_nodeport_service(core_api, service, namespace) + elif service_type == "ClusterIP": + return build_url_from_clusterip_service(service, namespace) + + return None diff --git a/src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py b/src/zenml/integrations/kubernetes/manifest_utils.py similarity index 53% rename from src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py rename to src/zenml/integrations/kubernetes/manifest_utils.py index c52ffb5e9fc..15b563bf6e5 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py +++ b/src/zenml/integrations/kubernetes/manifest_utils.py @@ -197,12 +197,15 @@ def build_pod_manifest( def add_pod_settings( pod_spec: k8s_client.V1PodSpec, settings: KubernetesPodSettings, + skip_resources: bool = False, ) -> None: """Updates pod `spec` fields in place if passed in orchestrator settings. Args: pod_spec: Pod spec to update. settings: Pod settings to apply. + skip_resources: If True, skip applying resource settings. This is useful + when resources are already set explicitly on containers. """ if settings.node_selectors: pod_spec.node_selector = settings.node_selectors @@ -215,7 +218,8 @@ def add_pod_settings( for container in pod_spec.containers: assert isinstance(container, k8s_client.V1Container) - container._resources = settings.resources + if not skip_resources and settings.resources: + container._resources = settings.resources if settings.volume_mounts: if container.volume_mounts: container.volume_mounts.extend(settings.volume_mounts) @@ -477,3 +481,394 @@ def build_cron_job_manifest( metadata=job_template.metadata, spec=spec, ) + + +def _build_container_spec( + image: str, + command: List[str], + args: List[str], + env_vars: List[k8s_client.V1EnvVar], + service_port: int, + resource_requests: Dict[str, str], + resource_limits: Dict[str, str], + image_pull_policy: str, + liveness_probe_config: Dict[str, Any], + readiness_probe_config: Dict[str, Any], + liveness_probe_path: str, + readiness_probe_path: str, +) -> k8s_client.V1Container: + """Build a Kubernetes container specification. + + Args: + image: Container image URI. + command: Container command. + args: Container arguments. + env_vars: Environment variables for the container. + service_port: Container port to expose. + resource_requests: Resource requests (cpu, memory, gpu). + resource_limits: Resource limits (cpu, memory, gpu). + image_pull_policy: Image pull policy. + liveness_probe_config: Liveness probe configuration. + readiness_probe_config: Readiness probe configuration. + liveness_probe_path: HTTP path for liveness probe health checks. + readiness_probe_path: HTTP path for readiness probe health checks. + + Returns: + Kubernetes container specification. + """ + return k8s_client.V1Container( + name="deployment", + image=image, + command=command, + args=args, + env=env_vars, + ports=[ + k8s_client.V1ContainerPort( + container_port=service_port, + name="http", + ) + ], + resources=k8s_client.V1ResourceRequirements( + requests=resource_requests, + limits=resource_limits, + ), + liveness_probe=k8s_client.V1Probe( + http_get=k8s_client.V1HTTPGetAction( + path=liveness_probe_path, + port=service_port, + ), + **liveness_probe_config, + ), + readiness_probe=k8s_client.V1Probe( + http_get=k8s_client.V1HTTPGetAction( + path=readiness_probe_path, + port=service_port, + ), + **readiness_probe_config, + ), + image_pull_policy=image_pull_policy, + ) + + +def _build_pod_spec( + container: k8s_client.V1Container, + service_account_name: Optional[str], + image_pull_secrets: List[str], + pod_settings: Optional[KubernetesPodSettings], +) -> k8s_client.V1PodSpec: + """Build a Kubernetes pod specification. + + Args: + container: Container specification. + service_account_name: Service account name. + image_pull_secrets: Names of image pull secrets. + pod_settings: Optional pod settings to apply. + + Returns: + Kubernetes pod specification. + """ + pod_spec = k8s_client.V1PodSpec( + containers=[container], + service_account_name=service_account_name, + image_pull_secrets=[ + k8s_client.V1LocalObjectReference(name=secret_name) + for secret_name in image_pull_secrets + ] + if image_pull_secrets + else None, + ) + + if pod_settings: + add_pod_settings(pod_spec, pod_settings, skip_resources=True) + + return pod_spec + + +def _build_pod_template( + deployment_name: str, + labels: Dict[str, str], + annotations: Optional[Dict[str, str]], + pod_spec: k8s_client.V1PodSpec, +) -> k8s_client.V1PodTemplateSpec: + """Build a Kubernetes pod template. + + Args: + deployment_name: Name of the deployment. + labels: Labels to apply to resources. + annotations: Annotations to apply to pod metadata. + pod_spec: Pod specification. + + Returns: + Kubernetes pod template specification. + """ + # Labels must include the selector label + pod_labels = {**labels, "app": deployment_name} + return k8s_client.V1PodTemplateSpec( + metadata=k8s_client.V1ObjectMeta( + labels=pod_labels, + annotations=annotations or None, + ), + spec=pod_spec, + ) + + +def build_deployment_manifest( + deployment_name: str, + namespace: str, + labels: Dict[str, str], + annotations: Optional[Dict[str, str]], + replicas: int, + image: str, + command: List[str], + args: List[str], + env_vars: List[k8s_client.V1EnvVar], + service_port: int, + resource_requests: Dict[str, str], + resource_limits: Dict[str, str], + image_pull_policy: str, + image_pull_secrets: List[str], + service_account_name: Optional[str], + liveness_probe_config: Dict[str, Any], + readiness_probe_config: Dict[str, Any], + liveness_probe_path: str = "/api/health", + readiness_probe_path: str = "/api/health", + pod_settings: Optional[KubernetesPodSettings] = None, +) -> k8s_client.V1Deployment: + """Build a Kubernetes Deployment manifest. + + Args: + deployment_name: Name of the deployment. + namespace: Kubernetes namespace. + labels: Labels to apply to resources. + annotations: Annotations to apply to pod metadata. + replicas: Number of pod replicas. + image: Container image URI. + command: Container command. + args: Container arguments. + env_vars: Environment variables for the container. + service_port: Container port to expose. + resource_requests: Resource requests (cpu, memory, gpu). + resource_limits: Resource limits (cpu, memory, gpu). + image_pull_policy: Image pull policy. + image_pull_secrets: Names of image pull secrets. + service_account_name: Service account name. + liveness_probe_config: Liveness probe configuration. + readiness_probe_config: Readiness probe configuration. + liveness_probe_path: HTTP path for liveness probe health checks. + readiness_probe_path: HTTP path for readiness probe health checks. + pod_settings: Optional pod settings to apply. + + Returns: + Kubernetes Deployment manifest. + """ + # Build container specification + container = _build_container_spec( + image=image, + command=command, + args=args, + env_vars=env_vars, + service_port=service_port, + resource_requests=resource_requests, + resource_limits=resource_limits, + image_pull_policy=image_pull_policy, + liveness_probe_config=liveness_probe_config, + readiness_probe_config=readiness_probe_config, + liveness_probe_path=liveness_probe_path, + readiness_probe_path=readiness_probe_path, + ) + + # Build pod specification + pod_spec = _build_pod_spec( + container=container, + service_account_name=service_account_name, + image_pull_secrets=image_pull_secrets, + pod_settings=pod_settings, + ) + + # Build pod template + pod_template = _build_pod_template( + deployment_name=deployment_name, + labels=labels, + annotations=annotations, + pod_spec=pod_spec, + ) + + # Build deployment manifest + return k8s_client.V1Deployment( + api_version="apps/v1", + kind="Deployment", + metadata=k8s_client.V1ObjectMeta( + name=deployment_name, + namespace=namespace, + labels=labels, + ), + spec=k8s_client.V1DeploymentSpec( + replicas=replicas, + selector=k8s_client.V1LabelSelector( + match_labels={"app": deployment_name} + ), + template=pod_template, + ), + ) + + +def build_service_manifest( + service_name: str, + namespace: str, + labels: Dict[str, str], + annotations: Optional[Dict[str, str]], + service_type: str, + service_port: int, + node_port: Optional[int], + session_affinity: Optional[str], + load_balancer_ip: Optional[str], + load_balancer_source_ranges: Optional[List[str]], + deployment_name: Optional[str] = None, +) -> k8s_client.V1Service: + """Build a Kubernetes Service manifest. + + Args: + service_name: Name of the service. + namespace: Kubernetes namespace. + labels: Labels to apply to the service. + annotations: Annotations to apply to the service. + service_type: Type of service (LoadBalancer, NodePort, ClusterIP). + service_port: Port to expose. + node_port: Node port (for NodePort service type). + session_affinity: Session affinity setting. + load_balancer_ip: Static IP for LoadBalancer. + load_balancer_source_ranges: CIDR blocks for LoadBalancer. + deployment_name: Name of the deployment to select pods from. If not provided, + uses service_name. The selector must match the deployment's pod labels. + + Returns: + Kubernetes Service manifest. + """ + service_port_obj = k8s_client.V1ServicePort( + port=service_port, + target_port=service_port, + protocol="TCP", + name="http", + ) + + if service_type == "NodePort" and node_port: + service_port_obj.node_port = node_port + + # Selector must match the deployment's pod labels + selector_name = deployment_name if deployment_name else service_name + + service_spec = k8s_client.V1ServiceSpec( + type=service_type, + selector={"app": selector_name}, + ports=[service_port_obj], + ) + + if session_affinity: + service_spec.session_affinity = session_affinity + + if service_type == "LoadBalancer": + if load_balancer_ip: + service_spec.load_balancer_ip = load_balancer_ip + if load_balancer_source_ranges: + service_spec.load_balancer_source_ranges = ( + load_balancer_source_ranges + ) + + return k8s_client.V1Service( + api_version="v1", + kind="Service", + metadata=k8s_client.V1ObjectMeta( + name=service_name, + namespace=namespace, + labels=labels, + annotations=annotations or None, + ), + spec=service_spec, + ) + + +def build_ingress_manifest( + ingress_name: str, + namespace: str, + labels: Dict[str, str], + annotations: Optional[Dict[str, str]], + service_name: str, + service_port: int, + ingress_class: Optional[str], + ingress_host: Optional[str], + ingress_path: str, + ingress_path_type: str, + tls_enabled: bool, + tls_secret_name: Optional[str], +) -> k8s_client.V1Ingress: + """Build a Kubernetes Ingress manifest. + + Args: + ingress_name: Name of the ingress. + namespace: Kubernetes namespace. + labels: Labels to apply to the ingress. + annotations: Annotations to apply to the ingress. + service_name: Name of the backend service. + service_port: Port of the backend service. + ingress_class: Ingress class name. + ingress_host: Hostname for the ingress rule. + ingress_path: Path for the ingress rule. + ingress_path_type: Path type (Prefix, Exact, etc.). + tls_enabled: Whether TLS is enabled. + tls_secret_name: Name of the TLS secret. + + Returns: + Kubernetes Ingress manifest. + """ + backend = k8s_client.V1IngressBackend( + service=k8s_client.V1IngressServiceBackend( + name=service_name, + port=k8s_client.V1ServiceBackendPort(number=service_port), + ) + ) + + http_ingress_path = k8s_client.V1HTTPIngressPath( + path=ingress_path, + path_type=ingress_path_type, + backend=backend, + ) + + http_ingress_rule_value = k8s_client.V1HTTPIngressRuleValue( + paths=[http_ingress_path] + ) + + ingress_rule = k8s_client.V1IngressRule( + http=http_ingress_rule_value, + ) + + if ingress_host: + ingress_rule.host = ingress_host + + tls_configs = None + if tls_enabled and tls_secret_name: + tls_config = k8s_client.V1IngressTLS( + secret_name=tls_secret_name, + ) + if ingress_host: + tls_config.hosts = [ingress_host] + tls_configs = [tls_config] + + ingress_spec = k8s_client.V1IngressSpec( + rules=[ingress_rule], + tls=tls_configs, + ) + + if ingress_class: + ingress_spec.ingress_class_name = ingress_class + + return k8s_client.V1Ingress( + api_version="networking.k8s.io/v1", + kind="Ingress", + metadata=k8s_client.V1ObjectMeta( + name=ingress_name, + namespace=namespace, + labels=labels, + annotations=annotations or None, + ), + spec=ingress_spec, + ) diff --git a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py index 384a80bece6..e2ff1bd6401 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py +++ b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py @@ -51,6 +51,7 @@ METADATA_ORCHESTRATOR_RUN_ID, ) from zenml.enums import ExecutionMode, ExecutionStatus, StackComponentType +from zenml.integrations.kubernetes import kube_utils from zenml.integrations.kubernetes.constants import ( ENV_ZENML_KUBERNETES_RUN_ID, KUBERNETES_CRON_JOB_METADATA_KEY, @@ -62,17 +63,16 @@ KubernetesOrchestratorConfig, KubernetesOrchestratorSettings, ) -from zenml.integrations.kubernetes.orchestrators import kube_utils -from zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint_configuration import ( - KubernetesOrchestratorEntrypointConfiguration, -) -from zenml.integrations.kubernetes.orchestrators.manifest_utils import ( +from zenml.integrations.kubernetes.manifest_utils import ( build_cron_job_manifest, build_job_manifest, build_pod_manifest, job_template_manifest_from_job, pod_template_manifest_from_pod, ) +from zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint_configuration import ( + KubernetesOrchestratorEntrypointConfiguration, +) from zenml.logger import get_logger from zenml.metadata.metadata_types import MetadataType from zenml.models.v2.core.schedule import ScheduleUpdate diff --git a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py index bfba199e407..361ce2a13ec 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py +++ b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py @@ -30,6 +30,7 @@ ) from zenml.enums import ExecutionMode, ExecutionStatus from zenml.exceptions import AuthorizationException +from zenml.integrations.kubernetes import kube_utils from zenml.integrations.kubernetes.constants import ( ENV_ZENML_KUBERNETES_RUN_ID, KUBERNETES_SECRET_TOKEN_KEY_NAME, @@ -40,7 +41,11 @@ from zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor import ( KubernetesOrchestratorSettings, ) -from zenml.integrations.kubernetes.orchestrators import kube_utils +from zenml.integrations.kubernetes.manifest_utils import ( + build_job_manifest, + build_pod_manifest, + pod_template_manifest_from_pod, +) from zenml.integrations.kubernetes.orchestrators.dag_runner import ( DagRunner, InterruptMode, @@ -50,11 +55,6 @@ from zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator import ( KubernetesOrchestrator, ) -from zenml.integrations.kubernetes.orchestrators.manifest_utils import ( - build_job_manifest, - build_pod_manifest, - pod_template_manifest_from_pod, -) from zenml.logger import get_logger from zenml.logging.step_logging import setup_orchestrator_logging from zenml.models import ( diff --git a/src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py b/src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py index 5a22df4c558..85666f7c95e 100644 --- a/src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py +++ b/src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py @@ -21,6 +21,7 @@ from zenml.config.base_settings import BaseSettings from zenml.config.build_configuration import BuildConfiguration from zenml.enums import StackComponentType +from zenml.integrations.kubernetes import kube_utils from zenml.integrations.kubernetes.constants import ( STEP_NAME_ANNOTATION_KEY, STEP_OPERATOR_ANNOTATION_KEY, @@ -29,10 +30,7 @@ KubernetesStepOperatorConfig, KubernetesStepOperatorSettings, ) -from zenml.integrations.kubernetes.orchestrators import ( - kube_utils, -) -from zenml.integrations.kubernetes.orchestrators.manifest_utils import ( +from zenml.integrations.kubernetes.manifest_utils import ( build_job_manifest, build_pod_manifest, pod_template_manifest_from_pod, diff --git a/tests/integration/integrations/kubernetes/orchestrators/test_manifest_utils.py b/tests/integration/integrations/kubernetes/orchestrators/test_manifest_utils.py index 5db14255417..002089e5307 100644 --- a/tests/integration/integrations/kubernetes/orchestrators/test_manifest_utils.py +++ b/tests/integration/integrations/kubernetes/orchestrators/test_manifest_utils.py @@ -20,10 +20,10 @@ V1PodSpec, V1Toleration, ) - from zenml.integrations.kubernetes.orchestrators.manifest_utils import ( build_pod_manifest, ) + from zenml.integrations.kubernetes.pod_settings import KubernetesPodSettings