Skip to content

Commit 6f452bf

Browse files
authored
Implemented parallel processing for list-cluster operation to improve (#231)
* Implementing Task Gov. feature for SDK flow * Implemented parallel processing for list-cluster operation to improve time
1 parent da607d2 commit 6f452bf

File tree

3 files changed

+39
-35
lines changed

3 files changed

+39
-35
lines changed

src/sagemaker/hyperpod/cli/clients/kubernetes_client.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,10 @@ class KubernetesClient:
5151
_instance = None
5252
_kube_client = None
5353

54-
def __new__(cls, is_get_capacity: bool = False) -> "KubernetesClient":
54+
def __new__(cls, config_file: Optional[str] = None) -> "KubernetesClient":
5555
if cls._instance is None:
5656
cls._instance = super(KubernetesClient, cls).__new__(cls)
57-
config.load_kube_config(
58-
config_file=KUBE_CONFIG_PATH
59-
if not is_get_capacity
60-
else TEMP_KUBE_CONFIG_FILE
61-
) # or config.load_incluster_config() for in-cluster config
57+
config.load_kube_config(config_file=config_file or KUBE_CONFIG_PATH)
6258
cls._instance._kube_client = client.ApiClient()
6359
return cls._instance
6460

src/sagemaker/hyperpod/cli/commands/cluster.py

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import sys
1717
import botocore.config
1818
from collections import defaultdict
19+
from concurrent.futures import ThreadPoolExecutor, as_completed
1920
from typing import Any, Dict, List, Optional, Tuple
2021

2122
import boto3
@@ -191,30 +192,33 @@ def list_cluster(
191192

192193
cluster_capacities: List[List[str]] = []
193194

194-
counter = 0
195-
for cluster_name in cluster_names:
196-
current_cluster_capacities_size = len(cluster_capacities)
197-
rate_limited_operation(
198-
cluster_name=cluster_name,
199-
validator=validator,
200-
sm_client=sm_client,
201-
region=region,
202-
temp_config_file=TEMP_KUBE_CONFIG_FILE,
203-
cluster_capacities=cluster_capacities,
204-
namespace=namespace,
205-
)
206-
# cluster_capacities will only be updated when the cluster
207-
# is a valid Hyperpod EKS cluster. This check avoid
208-
# we skipped many Hyperpod Slurm clusters and didn't return
209-
# any Hyperpod EKS clusters.
210-
if len(cluster_capacities) > current_cluster_capacities_size:
211-
counter += 1
212-
# Currently only support list <= 50 clusters
213-
if counter >= 50:
214-
logger.debug(
215-
"The 'get-clusters' command has reached the maximum number of HyperPod clusters that can be listed, which is 50."
216-
)
217-
break
195+
# Process clusters in parallel with limited concurrency
196+
if cluster_names:
197+
with ThreadPoolExecutor(max_workers=len(cluster_names)) as executor:
198+
futures = {}
199+
counter = 0
200+
201+
for cluster_name in cluster_names[:50]: # Limit to 50 clusters
202+
future = executor.submit(
203+
rate_limited_operation,
204+
cluster_name=cluster_name,
205+
validator=validator,
206+
sm_client=sm_client,
207+
region=region,
208+
temp_config_file=f"{TEMP_KUBE_CONFIG_FILE}_{cluster_name}",
209+
namespace=namespace,
210+
)
211+
futures[future] = cluster_name
212+
213+
for future in as_completed(futures):
214+
cluster_name = futures[future]
215+
try:
216+
result = future.result()
217+
if result: # Only add if cluster processing was successful
218+
cluster_capacities.extend(result)
219+
counter += 1
220+
except Exception as e:
221+
logger.error(f"Error processing cluster {cluster_name}: {e}")
218222

219223
headers = [
220224
"Cluster",
@@ -245,9 +249,8 @@ def rate_limited_operation(
245249
sm_client: BaseClient,
246250
region: Optional[str],
247251
temp_config_file: str,
248-
cluster_capacities: List[List[str]],
249252
namespace: Optional[List[str]],
250-
) -> None:
253+
) -> Optional[List[List[str]]]:
251254
try:
252255
eks_cluster_arn = validator.validate_cluster_and_get_eks_arn(
253256
cluster_name, sm_client
@@ -259,11 +262,12 @@ def rate_limited_operation(
259262
return
260263
eks_cluster_name = get_name_from_arn(eks_cluster_arn)
261264
_update_kube_config(eks_cluster_name, region, temp_config_file)
262-
k8s_client = KubernetesClient(is_get_capacity=True)
265+
k8s_client = KubernetesClient(config_file=temp_config_file)
263266
nodes = k8s_client.list_node_with_temp_config(
264267
temp_config_file, SAGEMAKER_HYPERPOD_NAME_LABEL
265268
)
266269
nodes_info = _aggregate_nodes_info(nodes)
270+
cluster_capacities = []
267271

268272
ns_nominal_quota = {}
269273
ns_quota_usage = {}
@@ -279,6 +283,7 @@ def rate_limited_operation(
279283
+ quota_allocation_id
280284
+ SAGEMAKER_MANAGED_CLUSTER_QUEUE_SUFFIX
281285
)
286+
282287
cluster_queue = k8s_client.get_cluster_queue(cluster_queue_name)
283288
nominal_quota = _get_cluster_queue_nominal_quota(cluster_queue)
284289
quota_usage = _get_cluster_queue_quota_usage(cluster_queue)
@@ -312,8 +317,10 @@ def rate_limited_operation(
312317
)
313318
)
314319
cluster_capacities.append(capacities)
320+
return cluster_capacities
315321
except Exception as e:
316322
logger.error(f"Error processing cluster {cluster_name}: {e}, continue...")
323+
return None
317324

318325

319326
def _get_cluster_queue_nominal_quota(cluster_queue):

test/unit_tests/test_cluster.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,9 @@ def test_get_clusters_no_cluster_summary(
471471
self.assertEqual(result.exit_code, 0)
472472
self.assertNotIn("cluster-1", result.output)
473473
self.assertNotIn("cluster-2", result.output)
474-
# Expect JSON output
475-
json.loads(result.output)
474+
# Expect JSON output - should be empty list when no ClusterSummaries
475+
output = json.loads(result.output)
476+
self.assertEqual(output, [])
476477

477478
@mock.patch("kubernetes.config.load_kube_config")
478479
@mock.patch("boto3.Session")

0 commit comments

Comments
 (0)