diff --git a/README.md b/README.md index a8d4acb8e..fe4cf20f8 100644 --- a/README.md +++ b/README.md @@ -421,7 +421,14 @@ will fail the cluster creation process because Vertex AI Tensorboard is not supp --cluster xpk-pw-test ``` Executing the command above would provide the address of the proxy that the user job should connect to. - Specify `JAX_PLATFORMS=proxy` and `JAX_BACKEND_TARGET=` and `import previewutilies` to establish this connection between the user's JAX code and the Pathways proxy. Execute Pathways workloads interactively on Vertex AI notebooks! + ```shell + kubectl get pods + kubectl port-forward pod/ 29000:29000 + ``` + ```shell + JAX_PLATFORMS=proxy JAX_BACKEND_TARGET=grpc://127.0.0.1:29000 python -c 'import pathwaysutils; import jax; print(jax.devices())' + ``` + Specify `JAX_PLATFORMS=proxy` and `JAX_BACKEND_TARGET=` and `import pathwaysutils` to establish this connection between the user's JAX code and the Pathways proxy. Execute Pathways workloads interactively on Vertex AI notebooks! ### Set `max-restarts` for production jobs diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 42f0c0fcc..4b571042f 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -35,7 +35,6 @@ run_gke_node_pool_create_command, set_jobset_on_cluster, set_up_cluster_network_for_gpu, - update_cluster_with_clouddns_if_necessary, zone_to_region, ) from ..core.cluster_private import authorize_private_cluster_access_if_necessary @@ -101,13 +100,7 @@ def cluster_create(args) -> None: if authorize_private_cluster_access_command_code != 0: xpk_exit(authorize_private_cluster_access_command_code) - # Update Pathways clusters with CloudDNS if not enabled already. - if args.enable_pathways: - update_cluster_command_code = update_cluster_with_clouddns_if_necessary( - args - ) - if update_cluster_command_code != 0: - xpk_exit(update_cluster_command_code) + # ToDo(roshanin@) - Re-enable CloudDNS on Pathways clusters conditionally. set_cluster_command_code = set_cluster_command(args) if set_cluster_command_code != 0: @@ -506,12 +499,7 @@ def run_gke_cluster_create_command( if args.enable_pathways: enable_ip_alias = True - command += ( - f' --create-subnetwork name={args.cluster}-subnetwork' - ' --cluster-dns=clouddns' - ' --cluster-dns-scope=vpc' - f' --cluster-dns-domain={args.cluster}-domain' - ) + command += f' --create-subnetwork name={args.cluster}-subnetwork' if enable_ip_alias: command += ' --enable-ip-alias' diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 87a1fa849..70ef1a749 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -39,7 +39,6 @@ get_gpu_volume, get_user_workload_container, get_volumes, - is_cluster_using_clouddns, parse_env_config, wait_for_job_completion, xpk_current_version, @@ -325,12 +324,13 @@ def workload_create(args) -> None: """ add_zone_and_project(args) - if args.headless and not is_cluster_using_clouddns(args): + if args.headless: xpk_print( - 'Please run xpk cluster create-pathways first, to upgrade and enable' - ' CloudDNS on your cluster.' + 'Please use kubectl port forwarding to connect to the Pathways proxy.' + ' kubectl get pods kubectl port-forward 29000:29000' + ' JAX_PLATFORMS=proxy JAX_BACKEND_TARGET=grpc://127.0.0.1:29000 python' + " -c 'import pathwaysutils; import jax; print(jax.devices())'" ) - xpk_exit(1) set_cluster_command_code = set_cluster_command(args) if set_cluster_command_code != 0: diff --git a/src/xpk/core/core.py b/src/xpk/core/core.py index 532b5e58b..3a7539827 100644 --- a/src/xpk/core/core.py +++ b/src/xpk/core/core.py @@ -333,110 +333,6 @@ def get_total_chips_requested_from_args( return num_chips -def update_gke_cluster_with_clouddns(args) -> int: - """Run the GKE cluster update command for existing clusters and enable CloudDNS. - - Args: - args: user provided arguments for running the command. - - Returns: - 0 if successful and 1 otherwise. - """ - command = ( - 'gcloud container clusters update' - f' {args.cluster} --project={args.project}' - f' --region={zone_to_region(args.zone)}' - ' --cluster-dns=clouddns' - ' --cluster-dns-scope=vpc' - f' --cluster-dns-domain={args.cluster}-domain' - ' --quiet' - ) - xpk_print('Updating GKE cluster to use Cloud DNS, may take a while!') - return_code = run_command_with_updates( - command, 'GKE Cluster Update to enable Cloud DNS', args - ) - if return_code != 0: - xpk_print(f'GKE Cluster Update request returned ERROR {return_code}') - return 1 - return 0 - - -def upgrade_gke_control_plane_version(args, default_rapid_gke_version) -> int: - """Upgrade GKE cluster's control plane version before updating nodepools to use CloudDNS. - - Args: - args: user provided arguments for running the command. - default_rapid_gke_version: Rapid default version for the upgrade. - - Returns: - 0 if successful and 1 otherwise. - """ - command = ( - 'gcloud container clusters upgrade' - f' {args.cluster} --project={args.project}' - f' --region={zone_to_region(args.zone)}' - f' --cluster-version={default_rapid_gke_version}' - ' --master' - ' --quiet' - ) - xpk_print("Updating GKE cluster's control plane version, may take a while!") - return_code = run_command_with_updates( - command, - 'GKE Cluster control plane version update to enable Cloud DNS', - args, - ) - if return_code != 0: - xpk_print( - "GKE cluster's control plane version update request returned" - f' ERROR {return_code}' - ) - return 1 - return 0 - - -def upgrade_gke_nodepools_version(args, default_rapid_gke_version) -> int: - """Upgrade nodepools in the cluster to default rapid gke version. Recreates the nodes. - - Args: - args: user provided arguments for running the command. - default_rapid_gke_version: Rapid default version for the upgrade. - - Returns: - 0 if successful and 1 otherwise. - """ - existing_node_pool_names, return_code = get_all_nodepools_programmatic(args) - if return_code != 0: - xpk_print('Listing all node pools failed!') - return return_code - - # Batch execution to upgrade node pools simultaneously - commands = [] - task_names = [] - for node_pool_name in existing_node_pool_names: - commands.append( - 'gcloud container clusters upgrade' - f' {args.cluster} --project={args.project}' - f' --region={zone_to_region(args.zone)}' - f' --cluster-version={default_rapid_gke_version}' - f' --node-pool={node_pool_name}' - ' --quiet' - ) - task_names.append(f'Upgrading node pool {node_pool_name}.') - - for i, command in enumerate(commands): - xpk_print(f'To complete {task_names[i]} we are executing {command}') - max_return_code = run_commands( - commands, 'Update GKE node pools to default RAPID GKE version', task_names - ) - if max_return_code != 0: - xpk_print( - 'GKE node pools update to default RAPID GKE version returned ERROR:' - f' {max_return_code}' - ) - return max_return_code - return 0 - - def set_up_cluster_network_for_gpu(args, system: SystemCharacteristics) -> int: """Set up GKE Cluster networks, subnets and firewall rules for A3/A3+. Note: there are 4 NICs for GPU-GPU bw and 1 NIC for host in an A3 node, @@ -1019,73 +915,6 @@ def get_all_clusters_programmatic(args) -> tuple[list[str], int]: return raw_cluster_output.splitlines(), 0 -def is_cluster_using_clouddns(args) -> bool: - """Checks if cluster is using CloudDNS. - Args: - args: user provided arguments for running the command. - - Returns: - True if cluster is using CloudDNS and False otherwise. - """ - command = ( - f'gcloud container clusters describe {args.cluster}' - f' --project={args.project} --region={zone_to_region(args.zone)}' - ' | grep "clusterDns: CLOUD_DNS"' - ) - return_code, _ = run_command_for_value( - command, - 'Check if Cloud DNS is enabled in cluster describe.', - args, - ) - if return_code == 0: - xpk_print('Cloud DNS is enabled on the cluster, no update needed.') - return True - return False - - -def update_cluster_with_clouddns_if_necessary(args) -> int: - """Updates a GKE cluster to use CloudDNS, if not enabled already. - - Args: - args: user provided arguments for running the command. - - Returns: - 0 if successful and error code otherwise. - """ - all_clusters, return_code = get_all_clusters_programmatic(args) - if return_code > 0: - xpk_print('Listing all clusters failed!') - return 1 - if args.cluster in all_clusters: - # If cluster is already using clouddns, no update necessary! - if is_cluster_using_clouddns(args): - return 0 - cluster_update_return_code = update_gke_cluster_with_clouddns(args) - if cluster_update_return_code > 0: - xpk_print('Updating GKE cluster to use CloudDNS failed!') - return cluster_update_return_code - - # Find default rapid control plane version and update the control plane to the same. - server_config_return_code, gke_server_config = get_gke_server_config(args) - if server_config_return_code != 0: - xpk_exit(server_config_return_code) - upgrade_master_return_code = upgrade_gke_control_plane_version( - args, gke_server_config.default_rapid_gke_version - ) - if upgrade_master_return_code > 0: - xpk_print("Updating GKE cluster's control plane upgrade failed!") - return upgrade_master_return_code - - # Upgrade nodepools version after the master upgrade. - node_pool_update_code = upgrade_gke_nodepools_version( - args, gke_server_config.default_rapid_gke_version - ) - if node_pool_update_code > 0: - xpk_print('Upgrading nodepools version failed!') - return node_pool_update_code - return 0 - - def get_nodepool_zone(args, nodepool_name) -> tuple[int, str]: """Return zone in which nodepool exists in the cluster. diff --git a/src/xpk/core/pathways.py b/src/xpk/core/pathways.py index e36ca9e4f..71ed27ea1 100644 --- a/src/xpk/core/pathways.py +++ b/src/xpk/core/pathways.py @@ -19,7 +19,6 @@ AcceleratorType, get_all_nodepools_programmatic, get_user_workload_container, - is_cluster_using_clouddns, zone_to_region, ) from .system_characteristics import SystemCharacteristics @@ -257,34 +256,26 @@ def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: def get_rm_address(args) -> str: - """Generates the Pathways resource manager address based on whether CloudDNS is enabled or not. + """Generates the Pathways resource manager address. Args: args: user provided arguments for running the command. Returns: str: Fully qualified RM address. """ - suffix = '' - if is_cluster_using_clouddns(args): - suffix = f'.default.svc.{args.cluster}-domain.' - rm_address = f'{args.workload}-rm-0-0.{args.workload}{suffix}:29001' + rm_address = f'{args.workload}-rm-0-0.{args.workload}:29001' return rm_address def get_proxy_address(args) -> str: - """Generates the Pathways proxy address based on whether CloudDNS is enabled or not. + """Generates the Pathways proxy address. Args: args: user provided arguments for running the command. Returns: str: Fully qualified proxy address. """ - suffix = '' - if is_cluster_using_clouddns(args): - suffix = f'.default.svc.{args.cluster}-domain.' - proxy_address = ( - f'grpc://{args.workload}-proxy-0-0.{args.workload}{suffix}:29000' - ) + proxy_address = f'grpc://{args.workload}-proxy-0-0.{args.workload}:29000' return proxy_address