Skip to content

Commit 85f228d

Browse files
committed
Add wait_for_deployment_ready()
1 parent b4aaa13 commit 85f228d

File tree

4 files changed

+193
-53
lines changed

4 files changed

+193
-53
lines changed

src/xpk/commands/cluster.py

Lines changed: 181 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
from ..utils.templates import get_templates_absolute_path
8282
import shutil
8383
import os
84+
import time
8485

8586
CLUSTER_PREHEAT_JINJA_FILE = 'cluster_preheat.yaml.j2'
8687

@@ -308,8 +309,6 @@ def cluster_create(args) -> None:
308309
if update_coredns_command_code != 0:
309310
xpk_exit(update_cluster_command_code)
310311

311-
install_diagon_prerequisites()
312-
313312
if not is_dry_run():
314313
k8s_client = setup_k8s_env(args)
315314
install_storage_crd(k8s_client)
@@ -409,6 +408,8 @@ def cluster_create(args) -> None:
409408
# pylint: disable=line-too-long
410409
f' https://console.cloud.google.com/kubernetes/clusters/details/{get_cluster_location(args.project, args.cluster, args.zone)}/{args.cluster}/details?project={args.project}'
411410
)
411+
if args.managed_mldiagnostics:
412+
install_diagon_prerequisites()
412413
xpk_exit(0)
413414

414415

@@ -1405,10 +1406,10 @@ def install_mldiagnostics_yaml(artifact_filename: str):
14051406
0 if successful and 1 otherwise.
14061407
"""
14071408

1408-
command = f'kubectl apply -f {artifact_filename}'
1409+
command = f'kubectl apply -f {artifact_filename} -n gke-diagon'
14091410

14101411
return_code = run_command_with_updates(
1411-
command, f'Starting kubectl apply -f {artifact_filename} ...'
1412+
command, f'Starting kubectl apply -f {artifact_filename} -n gke-diagon...'
14121413
)
14131414

14141415
if return_code != 0:
@@ -1461,44 +1462,188 @@ def install_diagon_prerequisites():
14611462
Returns:
14621463
0 if successful and 1 otherwise.
14631464
"""
1465+
deployment_name = 'kueue-controller-manager'
1466+
namespace_name = 'kueue-system'
1467+
# is_running = wait_for_cluster_running(args)
1468+
is_running = wait_for_deployment_ready(deployment_name, namespace_name)
1469+
if is_running:
1470+
return_code = install_cert_manager()
1471+
if return_code != 0:
1472+
return return_code
14641473

1465-
return_code = install_cert_manager()
1466-
if return_code != 0:
1467-
return return_code
1468-
1474+
cert_webhook_ready = check_cert_manager_webhook_status()
1475+
if cert_webhook_ready:
14691476

1470-
webhook_package = "mldiagnostics-injection-webhook"
1471-
webhook_version = "v0.3.0"
1472-
webhook_filename = f"{webhook_package}-{webhook_version}.yaml"
1477+
webhook_package = "mldiagnostics-injection-webhook"
1478+
webhook_version = "v0.3.0"
1479+
webhook_filename = f"{webhook_package}-{webhook_version}.yaml"
14731480

1474-
return_code = download_mldiagnostics_yaml(package_name=webhook_package, version=webhook_version)
1475-
if return_code != 0:
1476-
return return_code
1477-
1478-
return_code = create_mldiagnostics_namespace()
1479-
if return_code != 0:
1480-
return return_code
1481+
return_code = download_mldiagnostics_yaml(package_name=webhook_package, version=webhook_version)
1482+
if return_code != 0:
1483+
return return_code
1484+
1485+
return_code = create_mldiagnostics_namespace()
1486+
if return_code != 0:
1487+
return return_code
14811488

1482-
return_code = install_mldiagnostics_yaml(artifact_filename=webhook_filename)
1483-
if return_code != 0:
1484-
return return_code
1489+
return_code = install_mldiagnostics_yaml(artifact_filename=webhook_filename)
1490+
if return_code != 0:
1491+
return return_code
1492+
1493+
return_code = label_default_namespace_mldiagnostics()
1494+
if return_code != 0:
1495+
return return_code
1496+
1497+
# --- Install Operator ---
1498+
operator_package = "mldiagnostics-connection-operator"
1499+
operator_version = "v0.3.0"
1500+
operator_filename = f"{operator_package}-{operator_version}.yaml"
1501+
1502+
return_code = download_mldiagnostics_yaml(package_name=operator_package, version=operator_version)
1503+
if return_code != 0:
1504+
return return_code
1505+
1506+
return_code = install_mldiagnostics_yaml(artifact_filename=operator_filename)
1507+
if return_code != 0:
1508+
return return_code
1509+
1510+
xpk_print("All diagon installation and setup steps have been successfully completed!")
1511+
return return_code
1512+
else:
1513+
xpk_print("The cert-manager-webhook installation failed.")
1514+
xpk_exit(1)
1515+
else:
1516+
xpk_print(f"Application {deployment_name} failed to become ready within the timeout.")
1517+
xpk_exit(1)
1518+
1519+
def wait_for_deployment_ready(deployment_name: str, namespace: str, timeout_seconds: int = 300) -> bool:
1520+
"""
1521+
Polls the Kubernetes Deployment status using kubectl rollout status
1522+
until it successfully rolls out (all replicas are ready) or times out.
1523+
1524+
Args:
1525+
deployment_name: The name of the Kubernetes Deployment (e.g., 'kueue-controller-manager').
1526+
namespace: The namespace where the Deployment is located (e.g., 'kueue-system').
1527+
timeout_seconds: Timeout duration in seconds (default is 300s / 5 minutes).
1528+
1529+
Returns:
1530+
bool: True if the Deployment successfully rolled out, False otherwise (timeout or error).
1531+
"""
14851532

1486-
return_code = label_default_namespace_mldiagnostics()
1487-
if return_code != 0:
1488-
return return_code
1533+
command = (
1534+
f'kubectl rollout status deployment/{deployment_name} -n {namespace}'
1535+
f' --timeout={timeout_seconds}s'
1536+
)
1537+
1538+
print(f"Waiting for deployment {deployment_name} in namespace {namespace} to successfully roll out...")
1539+
1540+
try:
1541+
return_code, return_output = run_command_for_value(
1542+
command, f'Checking status of deployment {deployment_name}...'
1543+
)
1544+
1545+
if return_code != 0:
1546+
xpk_print(f"\nError: Deployment {deployment_name} failed to roll out.")
1547+
xpk_print(f"kubectl output: {return_output}")
1548+
return False
14891549

1490-
# --- Install Operator ---
1491-
operator_package = "mldiagnostics-connection-operator"
1492-
operator_version = "v0.3.0"
1493-
operator_filename = f"{operator_package}-{operator_version}.yaml"
1550+
xpk_print(f"Success: Deployment {deployment_name} successfully rolled out.")
1551+
return True
1552+
1553+
except Exception as e:
1554+
xpk_print(f"\nUnexpected API request error while checking deployment status: {e}")
1555+
time.sleep(10)
1556+
return False
1557+
1558+
1559+
def wait_for_cluster_running(args, timeout_minutes: int = 30) -> bool:
1560+
"""
1561+
Polls the GKE Cluster status using gcloud CLI until it enters the RUNNING state.
1562+
1563+
Args:
1564+
args: user provided arguments for running the command.
1565+
timeout_minutes: Timeout duration in minutes.
1566+
1567+
Returns:
1568+
bool: True if the Cluster successfully enters the RUNNING state, False otherwise.
1569+
"""
1570+
timeout_seconds = timeout_minutes * 60
1571+
start_time = time.time()
14941572

1495-
return_code = download_mldiagnostics_yaml(package_name=operator_package, version=operator_version)
1496-
if return_code != 0:
1497-
return return_code
1573+
# Construct gcloud command to describe the cluster status
1574+
command = (
1575+
'gcloud container clusters describe'
1576+
f' {args.cluster} --region={zone_to_region(args.zone)} --project={args.project}'
1577+
" --format='value(status)'"
1578+
)
1579+
1580+
print(f"Waiting for cluster {args.cluster} ({args.zone}) to enter RUNNING state (using gcloud CLI)...")
1581+
1582+
while time.time() - start_time < timeout_seconds:
1583+
try:
1584+
# Execute the gcloud command
1585+
return_code, return_output = run_command_for_value(
1586+
command, f'Get the status of cluster...'
1587+
)
1588+
# Check if gcloud command itself returned an error
1589+
if return_code != 0:
1590+
# If Not found error, the cluster does not exist
1591+
if "Not found" in return_output:
1592+
xpk_print(f"\nError: Cluster {args.cluster} does not exist in {args.zone}.")
1593+
return False
1594+
1595+
# Other execution errors, wait and retry
1596+
xpk_print(f"\nError: gcloud command failed. {return_output}")
1597+
time.sleep(10)
1598+
continue
1599+
1600+
# Check cluster status returned by gcloud
1601+
if "RUNNING" in return_output:
1602+
xpk_print(f"Success: Cluster {args.cluster} status is RUNNING.")
1603+
return True
1604+
1605+
elif "ERROR" in return_output or "DEGRADED" in return_output:
1606+
xpk_print(f"Error: Cluster status is {return_output}, creation failed.")
1607+
return False
1608+
else:
1609+
elapsed_time = int(time.time() - start_time)
1610+
xpk_print(f"Current status: {return_output}. Elapsed time: {elapsed_time} seconds. Checking again...")
1611+
except Exception as e:
1612+
xpk_print(f"\nUnexpected API request error: {e}")
1613+
time.sleep(10) # Wait longer on unexpected errors
14981614

1499-
return_code = install_mldiagnostics_yaml(artifact_filename=operator_filename)
1500-
if return_code != 0:
1501-
return return_code
1615+
# Poll interval
1616+
time.sleep(30)
1617+
1618+
xpk_print(f"\nTimeout Error: Cluster did not reach RUNNING state within {timeout_minutes} minutes.")
1619+
return False
1620+
1621+
def check_cert_manager_webhook_status(timeout_seconds: int = 300) -> bool:
1622+
"""
1623+
Runs and checks the exit code of kubectl rollout status for a specific deployment.
1624+
1625+
Args:
1626+
timeout_seconds (int): The maximum time to wait for the rollout to complete.
1627+
1628+
Returns:
1629+
bool: True if the rollout status is successful (exit code 0), False otherwise.
1630+
"""
1631+
# Build the kubectl command
1632+
kubectl_command = (
1633+
'kubectl rollout status deployment/cert-manager-webhook -n cert-manager'
1634+
f' --timeout={timeout_seconds}s'
1635+
)
15021636

1503-
xpk_print("All diagon installation and setup steps have been successfully completed!")
1504-
return return_code
1637+
xpk_print(f"Running command to check deployment status: {kubectl_command}")
1638+
1639+
try:
1640+
return_code, return_output = run_command_for_value(
1641+
kubectl_command, f'check cert manager...'
1642+
)
1643+
if "successfully rolled out" in return_output:
1644+
xpk_print(f"SUCCESS: Deployment cert-manager-webhook rollout completed.")
1645+
return True
1646+
1647+
except Exception as e:
1648+
xpk_print(f"\nUnexpected error during kubectl execution: {e}")
1649+
return False

src/xpk/commands/workload.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@
112112
labels:
113113
kueue.x-k8s.io/queue-name: {local_queue_name} # Name of the LocalQueue
114114
xpk.google.com/workload: {args.workload}
115-
{mldiagnostics_labels}
116115
annotations:
117116
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool # 1:1 job replica to node pool assignment
118117
spec:
@@ -400,12 +399,6 @@ def workload_create(args) -> None:
400399
if return_code != 0:
401400
xpk_exit(return_code)
402401

403-
mldiagnostics_labels = ''
404-
405-
if args.managed_mldiagnostics:
406-
mldiagnostics_labels = """diagon-enabled: "true" """
407-
xpk_print('Managed ML Diagnostics injection enabled. Adding Pod Label.')
408-
409402
service_account = ''
410403
all_storages = []
411404
# Currently storage customization is not supported for Pathways workloads. b/408468941
@@ -607,7 +600,6 @@ def workload_create(args) -> None:
607600
""" if system.accelerator_type == AcceleratorType.TPU else '',
608601
failure_policy_rules=failure_policy_rules,
609602
pod_failure_policy=pod_failure_policy,
610-
mldiagnostics_labels=mldiagnostics_labels,
611603
)
612604
tmp = write_tmp_file(yml_string)
613605
command = f'kubectl apply -f {str(tmp)}'

src/xpk/parser/cluster.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,18 @@ def set_cluster_create_parser(cluster_create_parser: ArgumentParser):
150150
' enable cluster to accept Pathways workloads.'
151151
),
152152
)
153+
154+
cluster_create_optional_arguments.add_argument(
155+
'--managed-mldiagnostics',
156+
action='store_true',
157+
default=False,
158+
help=(
159+
'[Optional] Enables the installation of required ML Diagnostics components: '
160+
'cert-manager, injection-webhook, and connection-operator. '
161+
'This feature is OFF by default.'
162+
),
163+
)
164+
153165
if FeatureFlags.SUB_SLICING_ENABLED:
154166
cluster_create_optional_arguments.add_argument(
155167
'--sub-slicing',

src/xpk/parser/workload.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,6 @@ def set_workload_create_parser(workload_create_parser: ArgumentParser):
224224
' conditions.'
225225
),
226226
)
227-
workload_create_parser_optional_arguments.add_argument(
228-
'--managed-mldiagnostics',
229-
action='store_true',
230-
default=False,
231-
help=(
232-
'[Optional] Enables injection of GKE managed ML Diagnostics'
233-
' webhook metadata.'
234-
),
235-
)
236227

237228
add_shared_workload_create_required_arguments([
238229
workload_create_parser_required_arguments,

0 commit comments

Comments
 (0)