Skip to content
158 changes: 113 additions & 45 deletions src/xpk/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
from .common import set_cluster_command




import shlex

def cluster_adapt(args) -> None:
"""Function that performs cluster adaptation.

Expand Down Expand Up @@ -790,6 +794,47 @@ def run_gke_clusters_list_command(args) -> int:
return 0


def parse_command_args_to_dict(arg_string: str) -> dict:
"""Parses a command-line argument string into a dictionary of parameters.

This function safely splits a command-line string, handling quoted arguments
and different parameter formats (e.g., --flag, --key=value, --key value).
It's designed to help convert user-provided custom arguments into a structured
format for easier merging and de-duplication.

Args:
arg_string: A string containing command-line arguments, such as
"--master-ipv4-cidr=10.0.0.0/28 --enable-ip-alias".

Returns:
A dictionary where keys are parameter names (e.g., "--enable-ip-alias",
"--cluster-ipv4-cidr") and values are their corresponding parsed values
(e.g., True for a boolean flag, "10.0.0.0/28" for a string value).
"""
parsed_args = {}
if not arg_string:
return parsed_args

tokens = shlex.split(arg_string)
i = 0
while i < len(tokens):
token = tokens[i]
if token.startswith('--'):
if '=' in token:
key, value = token.split('=', 1)
parsed_args[key] = value
else:
if i + 1 < len(tokens) and not tokens[i+1].startswith('--'):
parsed_args[token] = tokens[i+1]
i += 1
else:
parsed_args[token] = True
elif token.startswith('-'):
pass
i += 1
return parsed_args


def run_gke_cluster_create_command(
args, gke_control_plane_version: str, system: SystemCharacteristics
) -> int:
Expand All @@ -806,65 +851,54 @@ def run_gke_cluster_create_command(
machine_type = args.default_pool_cpu_machine_type
if args.cluster_cpu_machine_type != '':
xpk_print(
'Warning: Note that cluster-cpu-machine-type is soon to be',
' deprecated. Please use --default-pool-cpu-machine-type instead,'
' to denote the machine type of the default cpu node pool. Set'
' the machine type of other cpu nodepools using `--device-type`.',
'Warning: Note that cluster-cpu-machine-type is soon to be',
' deprecated. Please use --default-pool-cpu-machine-type instead,'
' to denote the machine type of the default cpu node pool. Set'
' the machine type of other cpu nodepools using `--device-type`.',
)
machine_type = args.cluster_cpu_machine_type

# Create the regional cluster with `num-nodes` CPU nodes in the same zone as
# TPUs. This has been tested with clusters of 300 VMs. Larger clusters will
# benefit from a larger initial `--num-nodes`. After the cluster is created,
# the auto-scaler can reduce/increase the nodes based on the load.
final_gcloud_args = {}

final_gcloud_args['--project'] = args.project
final_gcloud_args['--region'] = zone_to_region(args.zone)
final_gcloud_args['--node-locations'] = args.zone
final_gcloud_args['--cluster-version'] = gke_control_plane_version
final_gcloud_args['--machine-type'] = machine_type
final_gcloud_args['--enable-autoscaling'] = True
final_gcloud_args['--total-min-nodes'] = 1
final_gcloud_args['--total-max-nodes'] = 1000
final_gcloud_args['--num-nodes'] = args.default_pool_cpu_num_nodes
final_gcloud_args['--enable-dns-access'] = True
final_gcloud_args['--master-ipv4-cidr'] = '172.16.0.32/28'
final_gcloud_args['--cluster-ipv4-cidr'] = '10.224.0.0/12'

# If the user passes in the gke version then we use that directly instead of the rapid release.
# This allows users to directly pass a specified gke version without release channel constraints.
rapid_release_cmd = ''
if args.gke_version is not None:
rapid_release_cmd = ' --release-channel rapid'

command = (
'gcloud beta container clusters create'
f' {args.cluster} --project={args.project}'
f' --region={zone_to_region(args.zone)}'
f' --node-locations={args.zone}'
f' --cluster-version={gke_control_plane_version}'
f' --machine-type={machine_type}'
' --enable-autoscaling'
' --total-min-nodes 1 --total-max-nodes 1000'
f' --num-nodes {args.default_pool_cpu_num_nodes}'
f' {args.custom_cluster_arguments}'
f' {rapid_release_cmd}'
' --enable-dns-access'
)
final_gcloud_args['--release-channel'] = 'rapid'

enable_ip_alias = False
conditional_params = {}

if args.private or args.authorized_networks is not None:
enable_ip_alias = True
command += ' --enable-master-authorized-networks --enable-private-nodes'
conditional_params['--enable-master-authorized-networks'] = True
conditional_params['--enable-private-nodes'] = True
conditional_params['--enable-ip-alias'] = True

if system.accelerator_type == AcceleratorType['GPU']:
enable_ip_alias = True
command += (
' --enable-dataplane-v2'
' --enable-multi-networking --no-enable-autoupgrade'
)
conditional_params['--enable-dataplane-v2'] = True
conditional_params['--enable-multi-networking'] = True
conditional_params['--no-enable-autoupgrade'] = True
conditional_params['--enable-ip-alias'] = True
else:
command += ' --location-policy=BALANCED --scopes=storage-full,gke-default'

conditional_params['--location-policy'] = 'BALANCED'
conditional_params['--scopes'] = 'storage-full,gke-default'
if args.enable_pathways:
enable_ip_alias = True

if enable_ip_alias:
command += ' --enable-ip-alias'
conditional_params['--enable-ip-alias'] = True

if args.enable_ray_cluster:
command += ' --addons RayOperator'
conditional_params['--addons'] = 'RayOperator'

if args.enable_workload_identity or args.enable_gcsfuse_csi_driver:
command += f' --workload-pool={args.project}.svc.id.goog'
conditional_params['--workload-pool'] = f'{args.project}.svc.id.goog'

addons = []
if args.enable_gcsfuse_csi_driver:
Expand All @@ -883,8 +917,42 @@ def run_gke_cluster_create_command(
addons.append('HighScaleCheckpointing')

if len(addons) > 0:
addons_str = ','.join(addons)
command += f' --addons={addons_str}'
conditional_params['--addons'] = ','.join(addons)

for key, value in conditional_params.items():
if key not in final_gcloud_args:
final_gcloud_args[key] = value
elif key == '--addons' and key in final_gcloud_args:
final_gcloud_args[key] = ','.join(list(set(final_gcloud_args[key].split(',') + value.split(','))))


user_parsed_args = parse_command_args_to_dict(args.custom_cluster_arguments)
for key, value in user_parsed_args.items():
if key.startswith('--no-') and key[5:] in final_gcloud_args:
del final_gcloud_args[key[5:]]
final_gcloud_args[key] = True
elif key.startswith('--enable-') and f'--no-{key[2:]}' in final_gcloud_args:
del final_gcloud_args[f'--no-{key[2:]}']
final_gcloud_args[key] = value
else:
final_gcloud_args[key] = value


command_parts = ['gcloud beta container clusters create', args.cluster]


for key, value in final_gcloud_args.items():
if value is True:
command_parts.append(key)
elif value is False:
pass
elif value is not None and value != '':
if ' ' in str(value):
command_parts.append(f"{key}=\"{value}\"")
else:
command_parts.append(f"{key}={value}")

command = ' '.join(command_parts)

return_code = run_command_with_updates(command, 'GKE Cluster Create', args)
if return_code != 0:
Expand Down