Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,14 @@ will fail the cluster creation process because Vertex AI Tensorboard is not supp
--cluster xpk-pw-test
```
Executing the command above would provide the address of the proxy that the user job should connect to.
Specify `JAX_PLATFORMS=proxy` and `JAX_BACKEND_TARGET=<proxy address from above>` and `import previewutilies` to establish this connection between the user's JAX code and the Pathways proxy. Execute Pathways workloads interactively on Vertex AI notebooks!
```shell
kubectl get pods
kubectl port-forward pod/<proxy-pod-name> 29000:29000
```
```shell
JAX_PLATFORMS=proxy JAX_BACKEND_TARGET=grpc://127.0.0.1:29000 python -c 'import pathwaysutils; import jax; print(jax.devices())'
```
Specify `JAX_PLATFORMS=proxy` and `JAX_BACKEND_TARGET=<proxy address from above>` and `import pathwaysutils` to establish this connection between the user's JAX code and the Pathways proxy. Execute Pathways workloads interactively on Vertex AI notebooks!

### Set `max-restarts` for production jobs

Expand Down
16 changes: 2 additions & 14 deletions src/xpk/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
run_gke_node_pool_create_command,
set_jobset_on_cluster,
set_up_cluster_network_for_gpu,
update_cluster_with_clouddns_if_necessary,
zone_to_region,
)
from ..core.cluster_private import authorize_private_cluster_access_if_necessary
Expand Down Expand Up @@ -100,13 +99,7 @@ def cluster_create(args) -> None:
if authorize_private_cluster_access_command_code != 0:
xpk_exit(authorize_private_cluster_access_command_code)

# Update Pathways clusters with CloudDNS if not enabled already.
if args.enable_pathways:
update_cluster_command_code = update_cluster_with_clouddns_if_necessary(
args
)
if update_cluster_command_code != 0:
xpk_exit(update_cluster_command_code)
# ToDo(roshanin@) - Re-enable CloudDNS on Pathways clusters conditionally.

set_cluster_command_code = set_cluster_command(args)
if set_cluster_command_code != 0:
Expand Down Expand Up @@ -500,12 +493,7 @@ def run_gke_cluster_create_command(

if args.enable_pathways:
enable_ip_alias = True
command += (
f' --create-subnetwork name={args.cluster}-subnetwork'
' --cluster-dns=clouddns'
' --cluster-dns-scope=vpc'
f' --cluster-dns-domain={args.cluster}-domain'
)
command += f' --create-subnetwork name={args.cluster}-subnetwork'

if enable_ip_alias:
command += ' --enable-ip-alias'
Expand Down
10 changes: 5 additions & 5 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
get_gpu_volume,
get_user_workload_container,
get_volumes,
is_cluster_using_clouddns,
parse_env_config,
wait_for_job_completion,
xpk_current_version,
Expand Down Expand Up @@ -325,12 +324,13 @@ def workload_create(args) -> None:
"""
add_zone_and_project(args)

if args.headless and not is_cluster_using_clouddns(args):
if args.headless:
xpk_print(
'Please run xpk cluster create-pathways first, to upgrade and enable'
' CloudDNS on your cluster.'
'Please use kubectl port forwarding to connect to the Pathways proxy.'
' kubectl get pods kubectl port-forward <proxy-pod-name> 29000:29000'
' JAX_PLATFORMS=proxy JAX_BACKEND_TARGET=grpc://127.0.0.1:29000 python'
" -c 'import pathwaysutils; import jax; print(jax.devices())'"
)
xpk_exit(1)

set_cluster_command_code = set_cluster_command(args)
if set_cluster_command_code != 0:
Expand Down
171 changes: 0 additions & 171 deletions src/xpk/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,110 +333,6 @@ def get_total_chips_requested_from_args(
return num_chips


def update_gke_cluster_with_clouddns(args) -> int:
"""Run the GKE cluster update command for existing clusters and enable CloudDNS.

Args:
args: user provided arguments for running the command.

Returns:
0 if successful and 1 otherwise.
"""
command = (
'gcloud container clusters update'
f' {args.cluster} --project={args.project}'
f' --region={zone_to_region(args.zone)}'
' --cluster-dns=clouddns'
' --cluster-dns-scope=vpc'
f' --cluster-dns-domain={args.cluster}-domain'
' --quiet'
)
xpk_print('Updating GKE cluster to use Cloud DNS, may take a while!')
return_code = run_command_with_updates(
command, 'GKE Cluster Update to enable Cloud DNS', args
)
if return_code != 0:
xpk_print(f'GKE Cluster Update request returned ERROR {return_code}')
return 1
return 0


def upgrade_gke_control_plane_version(args, default_rapid_gke_version) -> int:
"""Upgrade GKE cluster's control plane version before updating nodepools to use CloudDNS.

Args:
args: user provided arguments for running the command.
default_rapid_gke_version: Rapid default version for the upgrade.

Returns:
0 if successful and 1 otherwise.
"""
command = (
'gcloud container clusters upgrade'
f' {args.cluster} --project={args.project}'
f' --region={zone_to_region(args.zone)}'
f' --cluster-version={default_rapid_gke_version}'
' --master'
' --quiet'
)
xpk_print("Updating GKE cluster's control plane version, may take a while!")
return_code = run_command_with_updates(
command,
'GKE Cluster control plane version update to enable Cloud DNS',
args,
)
if return_code != 0:
xpk_print(
"GKE cluster's control plane version update request returned"
f' ERROR {return_code}'
)
return 1
return 0


def upgrade_gke_nodepools_version(args, default_rapid_gke_version) -> int:
"""Upgrade nodepools in the cluster to default rapid gke version. Recreates the nodes.

Args:
args: user provided arguments for running the command.
default_rapid_gke_version: Rapid default version for the upgrade.

Returns:
0 if successful and 1 otherwise.
"""
existing_node_pool_names, return_code = get_all_nodepools_programmatic(args)
if return_code != 0:
xpk_print('Listing all node pools failed!')
return return_code

# Batch execution to upgrade node pools simultaneously
commands = []
task_names = []
for node_pool_name in existing_node_pool_names:
commands.append(
'gcloud container clusters upgrade'
f' {args.cluster} --project={args.project}'
f' --region={zone_to_region(args.zone)}'
f' --cluster-version={default_rapid_gke_version}'
f' --node-pool={node_pool_name}'
' --quiet'
)
task_names.append(f'Upgrading node pool {node_pool_name}.')

for i, command in enumerate(commands):
xpk_print(f'To complete {task_names[i]} we are executing {command}')
max_return_code = run_commands(
commands, 'Update GKE node pools to default RAPID GKE version', task_names
)
if max_return_code != 0:
xpk_print(
'GKE node pools update to default RAPID GKE version returned ERROR:'
f' {max_return_code}'
)
return max_return_code
return 0


def set_up_cluster_network_for_gpu(args, system: SystemCharacteristics) -> int:
"""Set up GKE Cluster networks, subnets and firewall rules for A3/A3+.
Note: there are 4 NICs for GPU-GPU bw and 1 NIC for host in an A3 node,
Expand Down Expand Up @@ -1019,73 +915,6 @@ def get_all_clusters_programmatic(args) -> tuple[list[str], int]:
return raw_cluster_output.splitlines(), 0


def is_cluster_using_clouddns(args) -> bool:
"""Checks if cluster is using CloudDNS.
Args:
args: user provided arguments for running the command.

Returns:
True if cluster is using CloudDNS and False otherwise.
"""
command = (
f'gcloud container clusters describe {args.cluster}'
f' --project={args.project} --region={zone_to_region(args.zone)}'
' | grep "clusterDns: CLOUD_DNS"'
)
return_code, _ = run_command_for_value(
command,
'Check if Cloud DNS is enabled in cluster describe.',
args,
)
if return_code == 0:
xpk_print('Cloud DNS is enabled on the cluster, no update needed.')
return True
return False


def update_cluster_with_clouddns_if_necessary(args) -> int:
"""Updates a GKE cluster to use CloudDNS, if not enabled already.

Args:
args: user provided arguments for running the command.

Returns:
0 if successful and error code otherwise.
"""
all_clusters, return_code = get_all_clusters_programmatic(args)
if return_code > 0:
xpk_print('Listing all clusters failed!')
return 1
if args.cluster in all_clusters:
# If cluster is already using clouddns, no update necessary!
if is_cluster_using_clouddns(args):
return 0
cluster_update_return_code = update_gke_cluster_with_clouddns(args)
if cluster_update_return_code > 0:
xpk_print('Updating GKE cluster to use CloudDNS failed!')
return cluster_update_return_code

# Find default rapid control plane version and update the control plane to the same.
server_config_return_code, gke_server_config = get_gke_server_config(args)
if server_config_return_code != 0:
xpk_exit(server_config_return_code)
upgrade_master_return_code = upgrade_gke_control_plane_version(
args, gke_server_config.default_rapid_gke_version
)
if upgrade_master_return_code > 0:
xpk_print("Updating GKE cluster's control plane upgrade failed!")
return upgrade_master_return_code

# Upgrade nodepools version after the master upgrade.
node_pool_update_code = upgrade_gke_nodepools_version(
args, gke_server_config.default_rapid_gke_version
)
if node_pool_update_code > 0:
xpk_print('Upgrading nodepools version failed!')
return node_pool_update_code
return 0


def get_nodepool_zone(args, nodepool_name) -> tuple[int, str]:
"""Return zone in which nodepool exists in the cluster.

Expand Down
17 changes: 4 additions & 13 deletions src/xpk/core/pathways.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
AcceleratorType,
get_all_nodepools_programmatic,
get_user_workload_container,
is_cluster_using_clouddns,
zone_to_region,
)
from .system_characteristics import SystemCharacteristics
Expand Down Expand Up @@ -257,34 +256,26 @@ def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str:


def get_rm_address(args) -> str:
"""Generates the Pathways resource manager address based on whether CloudDNS is enabled or not.
"""Generates the Pathways resource manager address.
Args:
args: user provided arguments for running the command.

Returns:
str: Fully qualified RM address.
"""
suffix = ''
if is_cluster_using_clouddns(args):
suffix = f'.default.svc.{args.cluster}-domain.'
rm_address = f'{args.workload}-rm-0-0.{args.workload}{suffix}:29001'
rm_address = f'{args.workload}-rm-0-0.{args.workload}:29001'
return rm_address


def get_proxy_address(args) -> str:
"""Generates the Pathways proxy address based on whether CloudDNS is enabled or not.
"""Generates the Pathways proxy address.
Args:
args: user provided arguments for running the command.

Returns:
str: Fully qualified proxy address.
"""
suffix = ''
if is_cluster_using_clouddns(args):
suffix = f'.default.svc.{args.cluster}-domain.'
proxy_address = (
f'grpc://{args.workload}-proxy-0-0.{args.workload}{suffix}:29000'
)
proxy_address = f'grpc://{args.workload}-proxy-0-0.{args.workload}:29000'
return proxy_address


Expand Down
Loading