Skip to content

Commit a0d5daf

Browse files
author
Roja Reddy Sareddy
committed
Enable Hyperpod telemetry
1 parent 4bc6c69 commit a0d5daf

File tree

6 files changed

+157
-69
lines changed

6 files changed

+157
-69
lines changed

src/sagemaker/hyperpod/cli/commands/cluster.py

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,17 @@
6161
from sagemaker.hyperpod.cli.utils import (
6262
get_eks_cluster_name,
6363
)
64-
from sagemaker.hyperpod.common.utils import get_cluster_context as get_cluster_context_util
65-
from sagemaker.hyperpod.observability.utils import get_monitoring_config, is_observability_addon_enabled
64+
from sagemaker.hyperpod.common.utils import (
65+
get_cluster_context as get_cluster_context_util,
66+
)
67+
from sagemaker.hyperpod.observability.utils import (
68+
get_monitoring_config,
69+
is_observability_addon_enabled,
70+
)
71+
from sagemaker.hyperpod.common.telemetry.telemetry_logging import (
72+
_hyperpod_telemetry_emitter,
73+
)
74+
from sagemaker.hyperpod.common.telemetry.constants import Feature
6675

6776
RATE_LIMIT = 4
6877
RATE_LIMIT_PERIOD = 1 # 1 second
@@ -103,12 +112,13 @@
103112
multiple=True,
104113
help="Optional. The namespace that you want to check the capacity for. Only SageMaker managed namespaces are supported.",
105114
)
115+
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_cluster")
106116
def list_cluster(
107117
region: Optional[str],
108118
output: Optional[str],
109119
clusters: Optional[str],
110120
debug: bool,
111-
namespace: Optional[List]
121+
namespace: Optional[List],
112122
):
113123
"""List SageMaker Hyperpod Clusters with cluster metadata.
114124
@@ -261,8 +271,14 @@ def rate_limited_operation(
261271
for ns in namespace:
262272
sm_managed_namespace = k8s_client.get_sagemaker_managed_namespace(ns)
263273
if sm_managed_namespace:
264-
quota_allocation_id = sm_managed_namespace.metadata.labels[SAGEMAKER_QUOTA_ALLOCATION_LABEL]
265-
cluster_queue_name = HYPERPOD_NAMESPACE_PREFIX + quota_allocation_id + SAGEMAKER_MANAGED_CLUSTER_QUEUE_SUFFIX
274+
quota_allocation_id = sm_managed_namespace.metadata.labels[
275+
SAGEMAKER_QUOTA_ALLOCATION_LABEL
276+
]
277+
cluster_queue_name = (
278+
HYPERPOD_NAMESPACE_PREFIX
279+
+ quota_allocation_id
280+
+ SAGEMAKER_MANAGED_CLUSTER_QUEUE_SUFFIX
281+
)
266282
cluster_queue = k8s_client.get_cluster_queue(cluster_queue_name)
267283
nominal_quota = _get_cluster_queue_nominal_quota(cluster_queue)
268284
quota_usage = _get_cluster_queue_quota_usage(cluster_queue)
@@ -282,8 +298,19 @@ def rate_limited_operation(
282298
nodes_summary["deep_health_check_passed"],
283299
]
284300
for ns in namespace:
285-
capacities.append(ns_nominal_quota.get(ns).get(instance_type, {}).get(NVIDIA_GPU_RESOURCE_LIMIT_KEY, "N/A"))
286-
capacities.append(_get_available_quota(ns_nominal_quota.get(ns), ns_quota_usage.get(ns), instance_type, NVIDIA_GPU_RESOURCE_LIMIT_KEY))
301+
capacities.append(
302+
ns_nominal_quota.get(ns)
303+
.get(instance_type, {})
304+
.get(NVIDIA_GPU_RESOURCE_LIMIT_KEY, "N/A")
305+
)
306+
capacities.append(
307+
_get_available_quota(
308+
ns_nominal_quota.get(ns),
309+
ns_quota_usage.get(ns),
310+
instance_type,
311+
NVIDIA_GPU_RESOURCE_LIMIT_KEY,
312+
)
313+
)
287314
cluster_capacities.append(capacities)
288315
except Exception as e:
289316
logger.error(f"Error processing cluster {cluster_name}: {e}, continue...")
@@ -305,7 +332,7 @@ def _get_cluster_queue_nominal_quota(cluster_queue):
305332
if resource_name == NVIDIA_GPU_RESOURCE_LIMIT_KEY:
306333
quota = int(quota)
307334
nominal_quota[flavor_name][resource_name] = quota
308-
335+
309336
return nominal_quota
310337

311338

@@ -336,7 +363,7 @@ def _get_available_quota(nominal, usage, flavor, resource_name):
336363
# Some resources need to be further processed by parsing unit like memory, e.g 10Gi
337364
if nominal_quota is not None and usage_quota is not None:
338365
return int(nominal_quota) - int(usage_quota)
339-
366+
340367
return "N/A"
341368

342369

@@ -358,7 +385,9 @@ def _restructure_output(summary_list, namespaces):
358385
for node_summary in summary_list:
359386
node_summary["Namespaces"] = {}
360387
for ns in namespaces:
361-
available_accelerators = node_summary[ns + AVAILABLE_ACCELERATOR_DEVICES_KEY]
388+
available_accelerators = node_summary[
389+
ns + AVAILABLE_ACCELERATOR_DEVICES_KEY
390+
]
362391
total_accelerators = node_summary[ns + TOTAL_ACCELERATOR_DEVICES_KEY]
363392
quota_accelerator_info = {
364393
AVAILABLE_ACCELERATOR_DEVICES_KEY: available_accelerators,
@@ -425,9 +454,9 @@ def _aggregate_nodes_info(
425454

426455
# Accelerator Devices available = Allocatable devices - Allocated devices
427456
if node_name in nodes_resource_allocated_dict:
428-
nodes_summary[instance_type]["accelerator_devices_available"] -= (
429-
nodes_resource_allocated_dict[node_name]
430-
)
457+
nodes_summary[instance_type][
458+
"accelerator_devices_available"
459+
] -= nodes_resource_allocated_dict[node_name]
431460

432461
logger.debug(f"nodes_summary: {nodes_summary}")
433462
return nodes_summary
@@ -550,7 +579,6 @@ def get_cluster_context(
550579
sys.exit(1)
551580

552581

553-
554582
@click.command()
555583
@click.option("--grafana", is_flag=True, help="Returns Grafana Dashboard URL")
556584
@click.option("--prometheus", is_flag=True, help="Returns Prometheus Workspace URL")
@@ -572,14 +600,21 @@ def get_monitoring(grafana: bool, prometheus: bool, list: bool) -> None:
572600
print(f"Grafana dashboard URL: {monitor_config.grafanaURL}")
573601
if list:
574602
metrics_data = monitor_config.availableMetrics
575-
print(tabulate([[k, v.get('level', v.get('enabled'))] for k, v in metrics_data.items()],
576-
headers=['Metric', 'Level/Enabled'], tablefmt='presto'))
603+
print(
604+
tabulate(
605+
[
606+
[k, v.get("level", v.get("enabled"))]
607+
for k, v in metrics_data.items()
608+
],
609+
headers=["Metric", "Level/Enabled"],
610+
tablefmt="presto",
611+
)
612+
)
577613
except Exception as e:
578614
logger.error(f"Failed to get metrics: {e}")
579615
sys.exit(1)
580616

581617

582-
583618
def _update_kube_config(
584619
eks_name: str,
585620
region: Optional[str],

src/sagemaker/hyperpod/common/telemetry/constants.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@
55
class Feature(Enum):
66
"""Enumeration of feature names used in telemetry."""
77

8-
SDK_DEFAULTS = 1
9-
LOCAL_MODE = 2
10-
REMOTE_FUNCTION = 3
11-
MODEL_TRAINER = 4
12-
ESTIMATOR = 5
138
HYPERPOD = 6 # Added to support telemetry in sagemaker-hyperpod-cli
149

1510
def __str__(self): # pylint: disable=E0307

src/sagemaker/hyperpod/common/telemetry/telemetry_logging.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@
2323
)
2424

2525
FEATURE_TO_CODE = {
26-
str(Feature.SDK_DEFAULTS): 1,
27-
str(Feature.LOCAL_MODE): 2,
28-
str(Feature.REMOTE_FUNCTION): 3,
29-
str(Feature.MODEL_TRAINER): 4,
30-
str(Feature.ESTIMATOR): 5,
3126
str(Feature.HYPERPOD): 6, # Added to support telemetry in sagemaker-hyperpod-cli
3227
}
3328

@@ -146,7 +141,7 @@ def _send_telemetry_request(
146141
_requests_helper(url, 2)
147142
logger.info("SageMaker Python SDK telemetry successfully emitted.")
148143
except Exception: # pylint: disable=W0703
149-
logger.debug("SageMaker Python SDK telemetry not emitted!")
144+
logger.warning("SageMaker Python SDK telemetry not emitted!")
150145

151146

152147
def _hyperpod_telemetry_emitter(feature: str, func_name: str):

src/sagemaker/hyperpod/inference/hp_endpoint_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def get_operator_logs(cls, since_hours: float):
164164
return logs
165165

166166
@classmethod
167-
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_logs")
167+
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_logs_endpoint")
168168
def get_logs(
169169
cls,
170170
pod: str,

src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def verify_kube_config(cls):
4949
def get_logger(cls):
5050
return logging.getLogger(__name__)
5151

52-
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "create")
52+
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "create_pytorchjob")
5353
def create(self, debug=False):
5454
self.verify_kube_config()
5555

@@ -88,7 +88,7 @@ def create(self, debug=False):
8888
handle_exception(e, self.metadata.name, self.metadata.namespace)
8989

9090
@classmethod
91-
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list")
91+
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pytorchjobs")
9292
def list(cls, namespace=None) -> List["HyperPodPytorchJob"]:
9393
cls.verify_kube_config()
9494

@@ -112,7 +112,7 @@ def list(cls, namespace=None) -> List["HyperPodPytorchJob"]:
112112
logger.error(f"Failed to list HyperpodPytorchJobs!")
113113
handle_exception(e, "", namespace)
114114

115-
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "delete")
115+
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "delete_pytorchjob")
116116
def delete(self):
117117
self.verify_kube_config()
118118

@@ -135,7 +135,7 @@ def delete(self):
135135
handle_exception(e, self.metadata.name, self.metadata.namespace)
136136

137137
@classmethod
138-
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get")
138+
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_pytorchjob")
139139
def get(cls, name, namespace=None) -> "HyperPodPytorchJob":
140140
cls.verify_kube_config()
141141

@@ -183,7 +183,7 @@ def refresh(self) -> "HyperPodPytorchJob":
183183
logger.error(f"Failed to refresh HyperPodPytorchJob {self.metadata.name}!")
184184
handle_exception(e, self.metadata.name, self.metadata.namespace)
185185

186-
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pods")
186+
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pods_pytorchjob")
187187
def list_pods(self) -> List[str]:
188188
self.verify_kube_config()
189189

@@ -205,7 +205,7 @@ def list_pods(self) -> List[str]:
205205
logger.error(f"Failed to list pod in namespace {self.metadata.namespace}!")
206206
handle_exception(e, self.metadata.name, self.metadata.namespace)
207207

208-
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_logs_from_pod")
208+
@_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_pytorchjob_logs_from_pod")
209209
def get_logs_from_pod(self, pod_name: str, container: Optional[str] = None) -> str:
210210
self.verify_kube_config()
211211

0 commit comments

Comments
 (0)