1717import sys
1818from time import perf_counter
1919from typing import List
20+ import functools
21+ import requests
2022
23+ import boto3
24+ from sagemaker .session import Session
2125from sagemaker .utils import resolve_value_from_config
2226from sagemaker .config .config_schema import TELEMETRY_OPT_OUT_PATH
2327from sagemaker .telemetry .constants import (
4751FEATURE_TO_CODE = {
4852 str (Feature .SDK_DEFAULTS ): 1 ,
4953 str (Feature .LOCAL_MODE ): 2 ,
54+ str (Feature .REMOTE_FUNCTION ): 3 ,
5055}
5156
5257STATUS_TO_CODE = {
@@ -59,86 +64,103 @@ def _telemetry_emitter(feature: str, func_name: str):
5964 """Decorator to emit telemetry logs for SageMaker Python SDK functions"""
6065
6166 def decorator (func ):
62- def wrapper (self , * args , ** kwargs ):
63- logger .info (TELEMETRY_OPT_OUT_MESSAGING )
64- response = None
65- caught_ex = None
66- studio_app_type = process_studio_metadata_file ()
67-
68- # Check if telemetry is opted out
69- telemetry_opt_out_flag = resolve_value_from_config (
70- direct_input = None ,
71- config_path = TELEMETRY_OPT_OUT_PATH ,
72- default_value = False ,
73- sagemaker_session = self .sagemaker_session ,
74- )
75- logger .debug ("TelemetryOptOut flag is set to: %s" , telemetry_opt_out_flag )
76-
77- # Construct the feature list to track feature combinations
78- feature_list : List [int ] = [FEATURE_TO_CODE [str (feature )]]
79- if self .sagemaker_session :
80- if self .sagemaker_session .sagemaker_config and feature != Feature .SDK_DEFAULTS :
67+ @functools .wraps (func )
68+ def wrapper (* args , ** kwargs ):
69+ sagemaker_session = None
70+ if len (args ) > 0 and hasattr (args [0 ], "sagemaker_session" ):
71+ # Get the sagemaker_session from the instance method args
72+ sagemaker_session = args [0 ].sagemaker_session
73+ elif feature == Feature .REMOTE_FUNCTION :
74+ # Get the sagemaker_session from the function keyword arguments for remote function
75+ sagemaker_session = kwargs .get (
76+ "sagemaker_session" , _get_default_sagemaker_session ()
77+ )
78+
79+ if sagemaker_session :
80+ logger .debug ("sagemaker_session found, preparing to emit telemetry..." )
81+ logger .info (TELEMETRY_OPT_OUT_MESSAGING )
82+ response = None
83+ caught_ex = None
84+ studio_app_type = process_studio_metadata_file ()
85+
86+ # Check if telemetry is opted out
87+ telemetry_opt_out_flag = resolve_value_from_config (
88+ direct_input = None ,
89+ config_path = TELEMETRY_OPT_OUT_PATH ,
90+ default_value = False ,
91+ sagemaker_session = sagemaker_session ,
92+ )
93+ logger .debug ("TelemetryOptOut flag is set to: %s" , telemetry_opt_out_flag )
94+
95+ # Construct the feature list to track feature combinations
96+ feature_list : List [int ] = [FEATURE_TO_CODE [str (feature )]]
97+
98+ if sagemaker_session .sagemaker_config and feature != Feature .SDK_DEFAULTS :
8199 feature_list .append (FEATURE_TO_CODE [str (Feature .SDK_DEFAULTS )])
82100
83- if self . sagemaker_session .local_mode and feature != Feature .LOCAL_MODE :
101+ if sagemaker_session .local_mode and feature != Feature .LOCAL_MODE :
84102 feature_list .append (FEATURE_TO_CODE [str (Feature .LOCAL_MODE )])
85103
86- # Construct the extra info to track platform and environment usage metadata
87- extra = (
88- f"{ func_name } "
89- f"&x-sdkVersion={ SDK_VERSION } "
90- f"&x-env={ PYTHON_VERSION } "
91- f"&x-sys={ OS_NAME_VERSION } "
92- f"&x-platform={ studio_app_type } "
93- )
94-
95- # Add endpoint ARN to the extra info if available
96- if self .sagemaker_session and self .sagemaker_session .endpoint_arn :
97- extra += f"&x-endpointArn={ self .sagemaker_session .endpoint_arn } "
98-
99- start_timer = perf_counter ()
100- try :
101- # Call the original function
102- response = func (self , * args , ** kwargs )
103- stop_timer = perf_counter ()
104- elapsed = stop_timer - start_timer
105- extra += f"&x-latency={ round (elapsed , 2 )} "
106- if not telemetry_opt_out_flag :
107- _send_telemetry_request (
108- STATUS_TO_CODE [str (Status .SUCCESS )],
109- feature_list ,
110- self .sagemaker_session ,
111- None ,
112- None ,
113- extra ,
114- )
115- except Exception as e : # pylint: disable=W0703
116- stop_timer = perf_counter ()
117- elapsed = stop_timer - start_timer
118- extra += f"&x-latency={ round (elapsed , 2 )} "
119- if not telemetry_opt_out_flag :
120- _send_telemetry_request (
121- STATUS_TO_CODE [str (Status .FAILURE )],
122- feature_list ,
123- self .sagemaker_session ,
124- str (e ),
125- e .__class__ .__name__ ,
126- extra ,
127- )
128- caught_ex = e
129- finally :
130- if caught_ex :
131- raise caught_ex
132- return response # pylint: disable=W0150
104+ # Construct the extra info to track platform and environment usage metadata
105+ extra = (
106+ f"{ func_name } "
107+ f"&x-sdkVersion={ SDK_VERSION } "
108+ f"&x-env={ PYTHON_VERSION } "
109+ f"&x-sys={ OS_NAME_VERSION } "
110+ f"&x-platform={ studio_app_type } "
111+ )
112+
113+ # Add endpoint ARN to the extra info if available
114+ if sagemaker_session .endpoint_arn :
115+ extra += f"&x-endpointArn={ sagemaker_session .endpoint_arn } "
116+
117+ start_timer = perf_counter ()
118+ try :
119+ # Call the original function
120+ response = func (* args , ** kwargs )
121+ stop_timer = perf_counter ()
122+ elapsed = stop_timer - start_timer
123+ extra += f"&x-latency={ round (elapsed , 2 )} "
124+ if not telemetry_opt_out_flag :
125+ _send_telemetry_request (
126+ STATUS_TO_CODE [str (Status .SUCCESS )],
127+ feature_list ,
128+ sagemaker_session ,
129+ None ,
130+ None ,
131+ extra ,
132+ )
133+ except Exception as e : # pylint: disable=W0703
134+ stop_timer = perf_counter ()
135+ elapsed = stop_timer - start_timer
136+ extra += f"&x-latency={ round (elapsed , 2 )} "
137+ if not telemetry_opt_out_flag :
138+ _send_telemetry_request (
139+ STATUS_TO_CODE [str (Status .FAILURE )],
140+ feature_list ,
141+ sagemaker_session ,
142+ str (e ),
143+ e .__class__ .__name__ ,
144+ extra ,
145+ )
146+ caught_ex = e
147+ finally :
148+ if caught_ex :
149+ raise caught_ex
150+ return response # pylint: disable=W0150
151+ else :
152+ logger .debug (
153+ "Unable to send telemetry for function %s. "
154+ "sagemaker_session is not provided or not valid." ,
155+ func_name ,
156+ )
157+ return func (* args , ** kwargs )
133158
134159 return wrapper
135160
136161 return decorator
137162
138163
139- from sagemaker .session import Session # noqa: E402 pylint: disable=C0413
140-
141-
142164def _send_telemetry_request (
143165 status : int ,
144166 feature_list : List [int ],
@@ -165,9 +187,9 @@ def _send_telemetry_request(
165187 # Send the telemetry request
166188 logger .debug ("Sending telemetry request to [%s]" , url )
167189 _requests_helper (url , 2 )
168- logger .debug ("SageMaker Python SDK telemetry successfully emitted! " )
190+ logger .debug ("SageMaker Python SDK telemetry successfully emitted. " )
169191 except Exception : # pylint: disable=W0703
170- logger .debug ("SageMaker Python SDK telemetry not emitted!! " )
192+ logger .debug ("SageMaker Python SDK telemetry not emitted!" )
171193
172194
173195def _construct_url (
@@ -196,9 +218,6 @@ def _construct_url(
196218 return base_url
197219
198220
199- import requests # noqa: E402 pylint: disable=C0413,C0411
200-
201-
202221def _requests_helper (url , timeout ):
203222 """Make a GET request to the given URL"""
204223
@@ -227,3 +246,11 @@ def _get_region_or_default(session):
227246 return session .boto_session .region_name
228247 except Exception : # pylint: disable=W0703
229248 return DEFAULT_AWS_REGION
249+
250+
251+ def _get_default_sagemaker_session ():
252+ """Return the default sagemaker session"""
253+ boto_session = boto3 .Session (region_name = DEFAULT_AWS_REGION )
254+ sagemaker_session = Session (boto_session = boto_session )
255+
256+ return sagemaker_session
0 commit comments