Skip to content

Commit c9a99cb

Browse files
authored
feat: remove args from run_command_with_updates_retry (#652)
1 parent d99f40f commit c9a99cb

File tree

19 files changed

+124
-189
lines changed

19 files changed

+124
-189
lines changed

src/xpk/commands/cluster.py

Lines changed: 47 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def cluster_adapt(args) -> None:
176176

177177
install_kjob(args)
178178
if system.accelerator_type == AcceleratorType['GPU']:
179-
prepare_gpus(args, system)
179+
prepare_gpus(system)
180180

181181
if args.enable_ray_cluster:
182182
return_code = install_ray_cluster(args, system)
@@ -249,7 +249,7 @@ def cluster_create(args) -> None:
249249

250250
get_cluster_credentials(args)
251251

252-
update_coredns_command_code = update_coredns_if_necessary(args)
252+
update_coredns_command_code = update_coredns_if_necessary()
253253
if update_coredns_command_code != 0:
254254
xpk_exit(update_cluster_command_code)
255255

@@ -317,7 +317,7 @@ def cluster_create(args) -> None:
317317
set_jobset_on_cluster_code = set_jobset_on_cluster(args)
318318
if set_jobset_on_cluster_code != 0:
319319
xpk_exit(set_jobset_on_cluster_code)
320-
update_jobset_resources_code = update_jobset_resources_if_necessary(args)
320+
update_jobset_resources_code = update_jobset_resources_if_necessary()
321321
if update_jobset_resources_code != 0:
322322
xpk_exit(update_jobset_resources_code)
323323

@@ -330,7 +330,7 @@ def cluster_create(args) -> None:
330330
install_kjob(args)
331331

332332
if system.accelerator_type == AcceleratorType['GPU']:
333-
prepare_gpus(args, system)
333+
prepare_gpus(system)
334334

335335
if args.enable_ray_cluster:
336336
return_code = install_ray_cluster(args, system)
@@ -416,15 +416,13 @@ def cluster_cacheimage(args) -> None:
416416
command_delete = f'kubectl delete -f {str(tmp)} --ignore-not-found=true'
417417

418418
return_code = run_command_with_updates(
419-
command_delete, 'Deleting Cached Image', args
419+
command_delete, 'Deleting Cached Image'
420420
)
421421
if return_code != 0:
422422
xpk_print(f'Delete Cached Image returned ERROR {return_code}')
423423
xpk_exit(return_code)
424424

425-
return_code = run_command_with_updates(
426-
command_apply, 'Creating Cached Image', args
427-
)
425+
return_code = run_command_with_updates(command_apply, 'Creating Cached Image')
428426
if return_code != 0:
429427
xpk_print(f'Create Cached Image returned ERROR {return_code}')
430428
xpk_exit(return_code)
@@ -704,20 +702,20 @@ def cluster_create_ray_cluster(args) -> None:
704702
cluster_create(args)
705703

706704

707-
def install_jq(args):
705+
def install_jq():
708706
"""Installs 'jq' utility."""
709707
if shutil.which('jq'):
710708
xpk_print("Task: 'Install jq' skipped, jq already installed.")
711709
return
712710
command_jq_install = 'sudo apt install jq -y'
713711
xpk_print("Task: 'Install jq' in progress.")
714-
return_code = run_command_with_updates(command_jq_install, 'Install jq', args)
712+
return_code = run_command_with_updates(command_jq_install, 'Install jq')
715713
if return_code != 0:
716714
xpk_print(f'Install jq error {return_code}')
717715
xpk_exit(return_code)
718716

719717

720-
def clone_coredns_deployment_repo(args, coredns_repo_full_path: str):
718+
def clone_coredns_deployment_repo(coredns_repo_full_path: str):
721719
"""Clones the CoreDNS deployment repository if it doesn't exist."""
722720
if os.path.exists(coredns_repo_full_path):
723721
xpk_print(
@@ -732,15 +730,13 @@ def clone_coredns_deployment_repo(args, coredns_repo_full_path: str):
732730
"Task: 'Clone deployment' in progress, Target"
733731
f' directory:{coredns_repo_full_path}.'
734732
)
735-
return_code = run_command_with_updates(
736-
command_git_clone, 'Clone deployment', args
737-
)
733+
return_code = run_command_with_updates(command_git_clone, 'Clone deployment')
738734
if return_code != 0:
739735
xpk_print(f'Clone deployment error {return_code}')
740736
xpk_exit(return_code)
741737

742738

743-
def deploy_coredns_manifests(args, coredns_k8s_path: str):
739+
def deploy_coredns_manifests(coredns_k8s_path: str):
744740
"""Deploys CoreDNS manifests to the cluster."""
745741
if not os.path.isdir(coredns_k8s_path):
746742
xpk_print(
@@ -758,7 +754,7 @@ def deploy_coredns_manifests(args, coredns_k8s_path: str):
758754
f"Task: 'Deploy CoreDNS' in progress, Located at '{coredns_k8s_path}'"
759755
)
760756
return_code = run_command_with_updates(
761-
command_deploy_coredns, 'Deploy CoreDNS', args
757+
command_deploy_coredns, 'Deploy CoreDNS'
762758
)
763759
if return_code != 0:
764760
xpk_print(f'Deploy CoreDNS error {return_code}')
@@ -770,57 +766,53 @@ def deploy_coredns_manifests(args, coredns_k8s_path: str):
770766
xpk_exit(return_code)
771767

772768

773-
def scale_down_deployment(
774-
args, deployment_name: str, namespace: str = 'kube-system'
775-
):
769+
def scale_down_deployment(deployment_name: str, namespace: str = 'kube-system'):
776770
"""Scales down a specified Kubernetes deployment to 0 replicas."""
777771
command = (
778772
f'kubectl scale deployment {deployment_name} --replicas=0'
779773
f' --namespace={namespace}'
780774
)
781775
xpk_print(f"Task: 'Scaling down {deployment_name}' in progress")
782776
return_code = run_command_with_updates(
783-
command, f'Scale down {deployment_name}', args
777+
command, f'Scale down {deployment_name}'
784778
)
785779
if return_code != 0:
786780
xpk_print(f'Scale down {deployment_name} error {return_code}')
787781
xpk_exit(return_code)
788782
xpk_print(f'\n{deployment_name} has been scaled down.')
789783

790784

791-
def scale_up_coredns(args, replicas: int = 15, namespace: str = 'kube-system'):
785+
def scale_up_coredns(replicas: int = 15, namespace: str = 'kube-system'):
792786
"""Scales up the CoreDNS deployment to a specified number of replicas."""
793787
command_coredns_scale = (
794788
f'kubectl scale deployment coredns --replicas={replicas} -n {namespace}'
795789
)
796790
xpk_print(f"Task: 'Scale CoreDNS' in progress (to {replicas} replicas)")
797-
return_code = run_command_with_updates(
798-
command_coredns_scale, 'Scale CoreDNS', args
799-
)
791+
return_code = run_command_with_updates(command_coredns_scale, 'Scale CoreDNS')
800792
if return_code != 0:
801793
xpk_print(f'Scale CoreDNS error {return_code}')
802794
xpk_exit(return_code)
803795

804796

805-
def check_deployment_exists(args, deployment_name: str, namespace: str) -> bool:
797+
def check_deployment_exists(deployment_name: str, namespace: str) -> bool:
806798
"""Check for the existence of a specific Deployment in a given namespace."""
807799
# TODO: rewrite this to be more obvious, check if it is correct
808800
command = (
809801
f'kubectl get deployment {deployment_name} -n'
810802
f' {namespace} --ignore-not-found'
811803
)
812804
result = run_command_with_updates(
813-
command, 'Waiting for kubeDNS to be checked.', args
805+
command, 'Waiting for kubeDNS to be checked.'
814806
)
815807
return result != 0
816808

817809

818810
def verify_coredns_readiness(
819-
args, timeout: int = 240, namespace: str = 'kube-system'
811+
timeout: int = 240, namespace: str = 'kube-system'
820812
):
821813
"""Verifies CoreDNS readiness using kubectl wait commands."""
822814
xpk_print('Now verifying CoreDNS readiness...')
823-
kube_dns_exists = check_deployment_exists(args, 'kube-dns', namespace)
815+
kube_dns_exists = check_deployment_exists('kube-dns', namespace)
824816
if kube_dns_exists:
825817
# Wait for kube-dns to be fully scaled down
826818
command_kube_dns_wait_scaled_down = (
@@ -830,7 +822,7 @@ def verify_coredns_readiness(
830822
)
831823
xpk_print('Verifying if kube-dns has scaled down...')
832824
return_code_kube_dns = run_command_with_updates(
833-
command_kube_dns_wait_scaled_down, 'Wait for kube-dns scale down', args
825+
command_kube_dns_wait_scaled_down, 'Wait for kube-dns scale down'
834826
)
835827
if return_code_kube_dns != 0:
836828
xpk_print('kube-dns did not scale down successfully within the timeout.')
@@ -846,7 +838,7 @@ def verify_coredns_readiness(
846838
)
847839
xpk_print('Verifying if CoreDNS is available...')
848840
return_code_coredns = run_command_with_updates(
849-
command_coredns_wait_available, 'Wait for coredns available', args
841+
command_coredns_wait_available, 'Wait for coredns available'
850842
)
851843
if return_code_coredns != 0:
852844
xpk_print(
@@ -871,12 +863,9 @@ def cleanup_coredns_repo(coredns_repo_full_path: str):
871863
xpk_print(f'Error deleting directory {coredns_repo_full_path}: {e}')
872864

873865

874-
def update_coredns(args) -> int:
866+
def update_coredns() -> int:
875867
"""Updates and deploys CoreDNS within a cluster.
876868
877-
Args:
878-
args: user provided arguments for running the command.
879-
880869
Returns:
881870
0 if successful and 1 otherwise.
882871
"""
@@ -885,23 +874,23 @@ def update_coredns(args) -> int:
885874
coredns_repo_full_path = os.path.join(coredns_repo_dir, coredns_repo_dir_name)
886875
coredns_k8s_path = os.path.join(coredns_repo_full_path, 'kubernetes')
887876
# 1. Install jq
888-
install_jq(args)
877+
install_jq()
889878

890879
# 2. Clone CoreDNS deployment repository
891-
clone_coredns_deployment_repo(args, coredns_repo_full_path)
880+
clone_coredns_deployment_repo(coredns_repo_full_path)
892881

893882
# 3. Deploy CoreDNS to the cluster
894-
deploy_coredns_manifests(args, coredns_k8s_path)
883+
deploy_coredns_manifests(coredns_k8s_path)
895884

896885
# 4. Scale down kube-dns-autoscaler
897-
scale_down_deployment(args, 'kube-dns-autoscaler')
886+
scale_down_deployment('kube-dns-autoscaler')
898887

899888
# 5. Scale down kube-dns
900-
scale_down_deployment(args, 'kube-dns')
889+
scale_down_deployment('kube-dns')
901890

902891
# 6. Scale up coredns and verify readiness
903-
scale_up_coredns(args, replicas=15)
904-
verify_coredns_readiness(args, timeout=120)
892+
scale_up_coredns(replicas=15)
893+
verify_coredns_readiness(timeout=120)
905894

906895
xpk_print('The CoreDNS setup process has been completed.')
907896

@@ -911,7 +900,7 @@ def update_coredns(args) -> int:
911900
return 0
912901

913902

914-
def coredns_deployment_exists(args, namespace: str = 'kube-system') -> bool:
903+
def coredns_deployment_exists(namespace: str = 'kube-system') -> bool:
915904
"""Checks if the CoreDNS deployment exists in the given namespace.
916905
917906
Args:
@@ -926,10 +915,10 @@ def coredns_deployment_exists(args, namespace: str = 'kube-system') -> bool:
926915
f' namespace: {namespace}'
927916
)
928917
return_code = run_command_with_updates(
929-
command, f'Check CoreDNS deployment in {namespace}', args
918+
command, f'Check CoreDNS deployment in {namespace}'
930919
)
931920
if return_code == 0:
932-
verify_coredns_readiness(args)
921+
verify_coredns_readiness()
933922
xpk_print(f"CoreDNS deployment 'coredns' found in namespace '{namespace}'.")
934923
return True
935924
else:
@@ -940,25 +929,22 @@ def coredns_deployment_exists(args, namespace: str = 'kube-system') -> bool:
940929
return False
941930

942931

943-
def update_coredns_if_necessary(args) -> int:
932+
def update_coredns_if_necessary() -> int:
944933
"""Updates and deploys CoreDNS within the cluster if it's not already present.
945934
946935
This function checks for the existence of the CoreDNS deployment.
947936
If it's not found, it proceeds to deploy and configure CoreDNS.
948937
949-
Args:
950-
args: User-provided arguments for running the command.
951-
952938
Returns:
953939
0 if successful (CoreDNS was already present or successfully deployed),
954940
and 1 otherwise.
955941
"""
956-
if coredns_deployment_exists(args, namespace='kube-system'):
942+
if coredns_deployment_exists(namespace='kube-system'):
957943
xpk_print('Skipping CoreDNS deployment since it already exists.')
958944
return 0
959945
else:
960946
xpk_print('CoreDNS deployment not found. Proceeding with CoreDNS setup.')
961-
return update_coredns(args)
947+
return update_coredns()
962948

963949

964950
def create_cluster_if_necessary(
@@ -1021,7 +1007,7 @@ def run_gke_cluster_delete_command(args) -> int:
10211007
f' --region={zone_to_region(args.zone)} --quiet'
10221008
)
10231009

1024-
return_code = run_command_with_updates(command, 'Cluster Delete', args)
1010+
return_code = run_command_with_updates(command, 'Cluster Delete')
10251011
if return_code != 0:
10261012
xpk_print(f'Cluster delete request returned ERROR {return_code}')
10271013
return 1
@@ -1046,7 +1032,7 @@ def run_gke_clusters_list_command(args) -> int:
10461032
'gcloud container clusters list'
10471033
f' --project={args.project} --region={zone_to_region(args.zone)}'
10481034
)
1049-
return_code = run_command_with_updates(command, 'Cluster List', args)
1035+
return_code = run_command_with_updates(command, 'Cluster List')
10501036
if return_code != 0:
10511037
xpk_print(f'Cluster list request returned ERROR {return_code}')
10521038
return 1
@@ -1155,7 +1141,7 @@ def run_gke_cluster_create_command(
11551141
addons_str = ','.join(addons)
11561142
command += f' --addons={addons_str}'
11571143

1158-
return_code = run_command_with_updates(command, 'GKE Cluster Create', args)
1144+
return_code = run_command_with_updates(command, 'GKE Cluster Create')
11591145
if return_code != 0:
11601146
xpk_print(f'GKE Cluster Create request returned ERROR {return_code}')
11611147
return 1
@@ -1206,7 +1192,7 @@ def install_kjob(args):
12061192
xpk_exit(err_code)
12071193

12081194
xpk_print('Applying kjob CDRs')
1209-
err_code = apply_kjob_crds(args)
1195+
err_code = apply_kjob_crds()
12101196
if err_code > 0:
12111197
xpk_exit(err_code)
12121198

@@ -1217,12 +1203,12 @@ def install_kjob(args):
12171203

12181204
def install_kueue(args, system: SystemCharacteristics, autoprovisioning_config):
12191205
xpk_print('Enabling Kueue on the cluster')
1220-
install_kueue_on_cluster_code = install_kueue_on_cluster(args)
1206+
install_kueue_on_cluster_code = install_kueue_on_cluster()
12211207
if install_kueue_on_cluster_code != 0:
12221208
xpk_exit(install_kueue_on_cluster_code)
12231209

12241210
xpk_print('Wait for Kueue to be fully available')
1225-
wait_for_kueue_available_code = wait_for_kueue_available(args)
1211+
wait_for_kueue_available_code = wait_for_kueue_available()
12261212
if wait_for_kueue_available_code != 0:
12271213
xpk_exit(wait_for_kueue_available_code)
12281214

@@ -1234,25 +1220,25 @@ def install_kueue(args, system: SystemCharacteristics, autoprovisioning_config):
12341220
xpk_exit(enable_kueue_credentials_code)
12351221

12361222
xpk_print('Update Kueue Controller Manager resources')
1237-
update_kueue_resources_code = update_kueue_resources_if_necessary(args)
1223+
update_kueue_resources_code = update_kueue_resources_if_necessary()
12381224
if update_kueue_resources_code != 0:
12391225
xpk_exit(update_kueue_resources_code)
12401226

12411227

1242-
def prepare_gpus(args, system: SystemCharacteristics):
1228+
def prepare_gpus(system: SystemCharacteristics):
12431229
xpk_print('Installing NCCL Plugin for cluster')
1244-
install_nccl_code = install_nccl_on_cluster(args, system)
1230+
install_nccl_code = install_nccl_on_cluster(system)
12451231
if install_nccl_code != 0:
12461232
xpk_exit(install_nccl_code)
12471233

12481234
if system.device_type == H100_DEVICE_TYPE:
12491235
xpk_print('Installing NRI device injector for cluster')
1250-
install_nri_code = install_nri_on_cluster(args)
1236+
install_nri_code = install_nri_on_cluster()
12511237
if install_nri_code != 0:
12521238
xpk_exit(install_nri_code)
12531239

12541240
if system.device_type in [H200_DEVICE_TYPE, B200_DEVICE_TYPE]:
12551241
xpk_print('Disabling MGLRU')
1256-
err_code = disable_mglru_on_cluster(args)
1242+
err_code = disable_mglru_on_cluster()
12571243
if err_code > 0:
12581244
xpk_exit(err_code)

src/xpk/commands/cluster_gcluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def cluster_create(args) -> None:
9090

9191
get_cluster_credentials(args)
9292

93-
err_code = apply_kjob_crds(args)
93+
err_code = apply_kjob_crds()
9494
if err_code > 0:
9595
xpk_exit(err_code)
9696

src/xpk/commands/common.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@ def set_cluster_command(args) -> int:
4242
' --namespace=default'
4343
)
4444
task = f'get-credentials to cluster {args.cluster}'
45-
return_code = run_command_with_updates_retry(
46-
command, task, args, verbose=False
47-
)
45+
return_code = run_command_with_updates_retry(command, task, verbose=False)
4846
if return_code != 0:
4947
xpk_print(f'{task} returned ERROR {return_code}')
5048
return return_code

0 commit comments

Comments
 (0)