1+ # Copyright (c) ZenML GmbH 2024. All Rights Reserved.
2+ #
3+ # Licensed under the Apache License, Version 2.0 (the "License");
4+ # you may not use this file except in compliance with the License.
5+ # You may obtain a copy of the License at:
6+ #
7+ # https://www.apache.org/licenses/LICENSE-2.0
8+ #
9+ # Unless required by applicable law or agreed to in writing, software
10+ # distributed under the License is distributed on an "AS IS" BASIS,
11+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+ # or implied. See the License for the specific language governing
13+ # permissions and limitations under the License.
14+ from pathlib import Path
15+ from typing import Dict , Optional
16+
17+ import yaml
18+ from kubernetes import client , config
19+ from kubernetes .client .rest import ApiException
20+ from zenml import get_step_context , step
21+ from zenml .client import Client
22+ from zenml .logger import get_logger
23+
24+ logger = get_logger (__name__ )
25+
26+ def apply_kubernetes_configuration (k8s_configs : list ) -> None :
27+ """Apply Kubernetes configurations using the K8s Python client.
28+
29+ Args:
30+ k8s_configs: List of Kubernetes configuration dictionaries
31+ """
32+ # Load Kubernetes configuration
33+ try :
34+ config .load_kube_config ()
35+ except :
36+ config .load_incluster_config () # For in-cluster deployment
37+
38+ # Initialize API clients
39+ k8s_apps_v1 = client .AppsV1Api ()
40+ k8s_core_v1 = client .CoreV1Api ()
41+
42+ for k8s_config in k8s_configs :
43+ kind = k8s_config ["kind" ]
44+ name = k8s_config ["metadata" ]["name" ]
45+ namespace = k8s_config ["metadata" ].get ("namespace" , "default" )
46+
47+ try :
48+ if kind == "Deployment" :
49+ # Check if deployment exists
50+ try :
51+ k8s_apps_v1 .read_namespaced_deployment (name , namespace )
52+ # Update existing deployment
53+ k8s_apps_v1 .patch_namespaced_deployment (
54+ name = name ,
55+ namespace = namespace ,
56+ body = k8s_config
57+ )
58+ logger .info (f"Updated existing deployment: { name } " )
59+ except ApiException as e :
60+ if e .status == 404 :
61+ # Create new deployment
62+ k8s_apps_v1 .create_namespaced_deployment (
63+ namespace = namespace ,
64+ body = k8s_config
65+ )
66+ logger .info (f"Created new deployment: { name } " )
67+ else :
68+ raise e
69+
70+ elif kind == "Service" :
71+ # Check if service exists
72+ try :
73+ k8s_core_v1 .read_namespaced_service (name , namespace )
74+ # Update existing service
75+ k8s_core_v1 .patch_namespaced_service (
76+ name = name ,
77+ namespace = namespace ,
78+ body = k8s_config
79+ )
80+ logger .info (f"Updated existing service: { name } " )
81+ except ApiException as e :
82+ if e .status == 404 :
83+ # Create new service
84+ k8s_core_v1 .create_namespaced_service (
85+ namespace = namespace ,
86+ body = k8s_config
87+ )
88+ logger .info (f"Created new service: { name } " )
89+ else :
90+ raise e
91+
92+ except ApiException as e :
93+ logger .error (f"Error applying { kind } { name } : { e } " )
94+ raise e
95+
96+ @step
97+ def deploy_model_to_k8s (
98+ docker_image_tag : str ,
99+ namespace : str = "default"
100+ ) -> Dict :
101+ """Deploy a service to Kubernetes with the specified docker image and tag.
102+
103+ Args:
104+ docker_image: The full docker image name (e.g. "organization/image-name")
105+ docker_image_tag: The tag to use for the docker image
106+ namespace: Kubernetes namespace to deploy to (default: "default")
107+
108+ Returns:
109+ dict: Dictionary containing deployment information
110+ """
111+ # Get model name from context
112+ model_name = get_step_context ().model .name
113+
114+ # Read the K8s template
115+ template_path = Path (__file__ ).parent / "k8s_template.yaml"
116+ with open (template_path , "r" ) as f :
117+ # Load all documents in the YAML file
118+ k8s_configs = list (yaml .safe_load_all (f ))
119+
120+ # Update both Service and Deployment configurations
121+ for config in k8s_configs :
122+ # Add namespace
123+ config ["metadata" ]["namespace" ] = namespace
124+
125+ # Update metadata labels and name
126+ config ["metadata" ]["labels" ]["app" ] = model_name
127+ config ["metadata" ]["name" ] = model_name
128+
129+ if config ["kind" ] == "Service" :
130+ # Update service selector
131+ config ["spec" ]["selector" ]["app" ] = model_name
132+
133+ elif config ["kind" ] == "Deployment" :
134+ # Update deployment selector and template
135+ config ["spec" ]["selector" ]["matchLabels" ]["app" ] = model_name
136+ config ["spec" ]["template" ]["metadata" ]["labels" ]["app" ] = model_name
137+
138+ # Update the container image and name
139+ containers = config ["spec" ]["template" ]["spec" ]["containers" ]
140+ for container in containers :
141+ container ["name" ] = model_name
142+ container ["image" ] = docker_image_tag
143+
144+ # Apply the configurations
145+ try :
146+ apply_kubernetes_configuration (k8s_configs )
147+ deployment_status = "success"
148+ logger .info (f"Successfully deployed model { model_name } with image: { docker_image_tag } " )
149+ except Exception as e :
150+ deployment_status = "failed"
151+ logger .error (f"Failed to deploy model { model_name } : { str (e )} " )
152+ raise e
153+
154+ # Return deployment information
155+ deployment_info = {
156+ "model_name" : model_name ,
157+ "docker_image" : docker_image_tag ,
158+ "namespace" : namespace ,
159+ "status" : deployment_status ,
160+ "service_port" : 3000 ,
161+ "configurations" : k8s_configs
162+ }
163+
164+ return deployment_info
0 commit comments