Skip to content

Commit a7167ba

Browse files
committed
Add wait_for_deployment_ready()
1 parent 7df858a commit a7167ba

File tree

4 files changed

+193
-53
lines changed

4 files changed

+193
-53
lines changed

src/xpk/commands/cluster.py

Lines changed: 181 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
from ..utils.templates import get_templates_absolute_path
8282
import shutil
8383
import os
84+
import time
8485

8586
CLUSTER_PREHEAT_JINJA_FILE = 'cluster_preheat.yaml.j2'
8687

@@ -308,8 +309,6 @@ def cluster_create(args) -> None:
308309
if update_coredns_command_code != 0:
309310
xpk_exit(update_cluster_command_code)
310311

311-
install_diagon_prerequisites()
312-
313312
if not is_dry_run():
314313
k8s_client = setup_k8s_env(args)
315314
install_storage_crd(k8s_client)
@@ -409,6 +408,8 @@ def cluster_create(args) -> None:
409408
# pylint: disable=line-too-long
410409
f' https://console.cloud.google.com/kubernetes/clusters/details/{get_cluster_location(args.project, args.cluster, args.zone)}/{args.cluster}/details?project={args.project}'
411410
)
411+
if args.managed_mldiagnostics:
412+
install_diagon_prerequisites()
412413
xpk_exit(0)
413414

414415

@@ -1404,10 +1405,10 @@ def install_mldiagnostics_yaml(artifact_filename: str):
14041405
0 if successful and 1 otherwise.
14051406
"""
14061407

1407-
command = f'kubectl apply -f {artifact_filename}'
1408+
command = f'kubectl apply -f {artifact_filename} -n gke-diagon'
14081409

14091410
return_code = run_command_with_updates(
1410-
command, f'Starting kubectl apply -f {artifact_filename} ...'
1411+
command, f'Starting kubectl apply -f {artifact_filename} -n gke-diagon...'
14111412
)
14121413

14131414
if return_code != 0:
@@ -1460,44 +1461,188 @@ def install_diagon_prerequisites():
14601461
Returns:
14611462
0 if successful and 1 otherwise.
14621463
"""
1464+
deployment_name = 'kueue-controller-manager'
1465+
namespace_name = 'kueue-system'
1466+
# is_running = wait_for_cluster_running(args)
1467+
is_running = wait_for_deployment_ready(deployment_name, namespace_name)
1468+
if is_running:
1469+
return_code = install_cert_manager()
1470+
if return_code != 0:
1471+
return return_code
14631472

1464-
return_code = install_cert_manager()
1465-
if return_code != 0:
1466-
return return_code
1467-
1473+
cert_webhook_ready = check_cert_manager_webhook_status()
1474+
if cert_webhook_ready:
14681475

1469-
webhook_package = "mldiagnostics-injection-webhook"
1470-
webhook_version = "v0.3.0"
1471-
webhook_filename = f"{webhook_package}-{webhook_version}.yaml"
1476+
webhook_package = "mldiagnostics-injection-webhook"
1477+
webhook_version = "v0.3.0"
1478+
webhook_filename = f"{webhook_package}-{webhook_version}.yaml"
14721479

1473-
return_code = download_mldiagnostics_yaml(package_name=webhook_package, version=webhook_version)
1474-
if return_code != 0:
1475-
return return_code
1476-
1477-
return_code = create_mldiagnostics_namespace()
1478-
if return_code != 0:
1479-
return return_code
1480+
return_code = download_mldiagnostics_yaml(package_name=webhook_package, version=webhook_version)
1481+
if return_code != 0:
1482+
return return_code
1483+
1484+
return_code = create_mldiagnostics_namespace()
1485+
if return_code != 0:
1486+
return return_code
14801487

1481-
return_code = install_mldiagnostics_yaml(artifact_filename=webhook_filename)
1482-
if return_code != 0:
1483-
return return_code
1488+
return_code = install_mldiagnostics_yaml(artifact_filename=webhook_filename)
1489+
if return_code != 0:
1490+
return return_code
1491+
1492+
return_code = label_default_namespace_mldiagnostics()
1493+
if return_code != 0:
1494+
return return_code
1495+
1496+
# --- Install Operator ---
1497+
operator_package = "mldiagnostics-connection-operator"
1498+
operator_version = "v0.3.0"
1499+
operator_filename = f"{operator_package}-{operator_version}.yaml"
1500+
1501+
return_code = download_mldiagnostics_yaml(package_name=operator_package, version=operator_version)
1502+
if return_code != 0:
1503+
return return_code
1504+
1505+
return_code = install_mldiagnostics_yaml(artifact_filename=operator_filename)
1506+
if return_code != 0:
1507+
return return_code
1508+
1509+
xpk_print("All diagon installation and setup steps have been successfully completed!")
1510+
return return_code
1511+
else:
1512+
xpk_print("The cert-manager-webhook installation failed.")
1513+
xpk_exit(1)
1514+
else:
1515+
xpk_print(f"Application {deployment_name} failed to become ready within the timeout.")
1516+
xpk_exit(1)
1517+
1518+
def wait_for_deployment_ready(deployment_name: str, namespace: str, timeout_seconds: int = 300) -> bool:
1519+
"""
1520+
Polls the Kubernetes Deployment status using kubectl rollout status
1521+
until it successfully rolls out (all replicas are ready) or times out.
1522+
1523+
Args:
1524+
deployment_name: The name of the Kubernetes Deployment (e.g., 'kueue-controller-manager').
1525+
namespace: The namespace where the Deployment is located (e.g., 'kueue-system').
1526+
timeout_seconds: Timeout duration in seconds (default is 300s / 5 minutes).
1527+
1528+
Returns:
1529+
bool: True if the Deployment successfully rolled out, False otherwise (timeout or error).
1530+
"""
14841531

1485-
return_code = label_default_namespace_mldiagnostics()
1486-
if return_code != 0:
1487-
return return_code
1532+
command = (
1533+
f'kubectl rollout status deployment/{deployment_name} -n {namespace}'
1534+
f' --timeout={timeout_seconds}s'
1535+
)
1536+
1537+
print(f"Waiting for deployment {deployment_name} in namespace {namespace} to successfully roll out...")
1538+
1539+
try:
1540+
return_code, return_output = run_command_for_value(
1541+
command, f'Checking status of deployment {deployment_name}...'
1542+
)
1543+
1544+
if return_code != 0:
1545+
xpk_print(f"\nError: Deployment {deployment_name} failed to roll out.")
1546+
xpk_print(f"kubectl output: {return_output}")
1547+
return False
14881548

1489-
# --- Install Operator ---
1490-
operator_package = "mldiagnostics-connection-operator"
1491-
operator_version = "v0.3.0"
1492-
operator_filename = f"{operator_package}-{operator_version}.yaml"
1549+
xpk_print(f"Success: Deployment {deployment_name} successfully rolled out.")
1550+
return True
1551+
1552+
except Exception as e:
1553+
xpk_print(f"\nUnexpected API request error while checking deployment status: {e}")
1554+
time.sleep(10)
1555+
return False
1556+
1557+
1558+
def wait_for_cluster_running(args, timeout_minutes: int = 30) -> bool:
1559+
"""
1560+
Polls the GKE Cluster status using gcloud CLI until it enters the RUNNING state.
1561+
1562+
Args:
1563+
args: user provided arguments for running the command.
1564+
timeout_minutes: Timeout duration in minutes.
1565+
1566+
Returns:
1567+
bool: True if the Cluster successfully enters the RUNNING state, False otherwise.
1568+
"""
1569+
timeout_seconds = timeout_minutes * 60
1570+
start_time = time.time()
14931571

1494-
return_code = download_mldiagnostics_yaml(package_name=operator_package, version=operator_version)
1495-
if return_code != 0:
1496-
return return_code
1572+
# Construct gcloud command to describe the cluster status
1573+
command = (
1574+
'gcloud container clusters describe'
1575+
f' {args.cluster} --region={zone_to_region(args.zone)} --project={args.project}'
1576+
" --format='value(status)'"
1577+
)
1578+
1579+
print(f"Waiting for cluster {args.cluster} ({args.zone}) to enter RUNNING state (using gcloud CLI)...")
1580+
1581+
while time.time() - start_time < timeout_seconds:
1582+
try:
1583+
# Execute the gcloud command
1584+
return_code, return_output = run_command_for_value(
1585+
command, f'Get the status of cluster...'
1586+
)
1587+
# Check if gcloud command itself returned an error
1588+
if return_code != 0:
1589+
# If Not found error, the cluster does not exist
1590+
if "Not found" in return_output:
1591+
xpk_print(f"\nError: Cluster {args.cluster} does not exist in {args.zone}.")
1592+
return False
1593+
1594+
# Other execution errors, wait and retry
1595+
xpk_print(f"\nError: gcloud command failed. {return_output}")
1596+
time.sleep(10)
1597+
continue
1598+
1599+
# Check cluster status returned by gcloud
1600+
if "RUNNING" in return_output:
1601+
xpk_print(f"Success: Cluster {args.cluster} status is RUNNING.")
1602+
return True
1603+
1604+
elif "ERROR" in return_output or "DEGRADED" in return_output:
1605+
xpk_print(f"Error: Cluster status is {return_output}, creation failed.")
1606+
return False
1607+
else:
1608+
elapsed_time = int(time.time() - start_time)
1609+
xpk_print(f"Current status: {return_output}. Elapsed time: {elapsed_time} seconds. Checking again...")
1610+
except Exception as e:
1611+
xpk_print(f"\nUnexpected API request error: {e}")
1612+
time.sleep(10) # Wait longer on unexpected errors
14971613

1498-
return_code = install_mldiagnostics_yaml(artifact_filename=operator_filename)
1499-
if return_code != 0:
1500-
return return_code
1614+
# Poll interval
1615+
time.sleep(30)
1616+
1617+
xpk_print(f"\nTimeout Error: Cluster did not reach RUNNING state within {timeout_minutes} minutes.")
1618+
return False
1619+
1620+
def check_cert_manager_webhook_status(timeout_seconds: int = 300) -> bool:
1621+
"""
1622+
Runs and checks the exit code of kubectl rollout status for a specific deployment.
1623+
1624+
Args:
1625+
timeout_seconds (int): The maximum time to wait for the rollout to complete.
1626+
1627+
Returns:
1628+
bool: True if the rollout status is successful (exit code 0), False otherwise.
1629+
"""
1630+
# Build the kubectl command
1631+
kubectl_command = (
1632+
'kubectl rollout status deployment/cert-manager-webhook -n cert-manager'
1633+
f' --timeout={timeout_seconds}s'
1634+
)
15011635

1502-
xpk_print("All diagon installation and setup steps have been successfully completed!")
1503-
return return_code
1636+
xpk_print(f"Running command to check deployment status: {kubectl_command}")
1637+
1638+
try:
1639+
return_code, return_output = run_command_for_value(
1640+
kubectl_command, f'check cert manager...'
1641+
)
1642+
if "successfully rolled out" in return_output:
1643+
xpk_print(f"SUCCESS: Deployment cert-manager-webhook rollout completed.")
1644+
return True
1645+
1646+
except Exception as e:
1647+
xpk_print(f"\nUnexpected error during kubectl execution: {e}")
1648+
return False

src/xpk/commands/workload.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@
111111
labels:
112112
kueue.x-k8s.io/queue-name: {local_queue_name} # Name of the LocalQueue
113113
xpk.google.com/workload: {args.workload}
114-
{mldiagnostics_labels}
115114
annotations:
116115
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool # 1:1 job replica to node pool assignment
117116
spec:
@@ -399,12 +398,6 @@ def workload_create(args) -> None:
399398
if return_code != 0:
400399
xpk_exit(return_code)
401400

402-
mldiagnostics_labels = ''
403-
404-
if args.managed_mldiagnostics:
405-
mldiagnostics_labels = """diagon-enabled: "true" """
406-
xpk_print('Managed ML Diagnostics injection enabled. Adding Pod Label.')
407-
408401
service_account = ''
409402
all_storages = []
410403
# Currently storage customization is not supported for Pathways workloads. b/408468941
@@ -606,7 +599,6 @@ def workload_create(args) -> None:
606599
""" if system.accelerator_type == AcceleratorType.TPU else '',
607600
failure_policy_rules=failure_policy_rules,
608601
pod_failure_policy=pod_failure_policy,
609-
mldiagnostics_labels=mldiagnostics_labels,
610602
)
611603
tmp = write_tmp_file(yml_string)
612604
command = f'kubectl apply -f {str(tmp)}'

src/xpk/parser/cluster.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,18 @@ def set_cluster_create_parser(cluster_create_parser: ArgumentParser):
150150
' enable cluster to accept Pathways workloads.'
151151
),
152152
)
153+
154+
cluster_create_optional_arguments.add_argument(
155+
'--managed-mldiagnostics',
156+
action='store_true',
157+
default=False,
158+
help=(
159+
'[Optional] Enables the installation of required ML Diagnostics components: '
160+
'cert-manager, injection-webhook, and connection-operator. '
161+
'This feature is OFF by default.'
162+
),
163+
)
164+
153165
if FeatureFlags.SUB_SLICING_ENABLED:
154166
cluster_create_optional_arguments.add_argument(
155167
'--sub-slicing',

src/xpk/parser/workload.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,6 @@ def set_workload_create_parser(workload_create_parser: ArgumentParser):
224224
' conditions.'
225225
),
226226
)
227-
workload_create_parser_optional_arguments.add_argument(
228-
'--managed-mldiagnostics',
229-
action='store_true',
230-
default=False,
231-
help=(
232-
'[Optional] Enables injection of GKE managed ML Diagnostics'
233-
' webhook metadata.'
234-
),
235-
)
236227

237228
add_shared_workload_create_required_arguments([
238229
workload_create_parser_required_arguments,

0 commit comments

Comments
 (0)