@@ -163,12 +163,15 @@ class IngestionManagerPandas:
163163 max_workers (int): number of threads to create.
164164 max_processes (int): number of processes to create. Each process spawns
165165 ``max_workers`` threads.
166+ profile_name (str): the profile credential should be used for ``PutRecord``
167+ (default: None).
166168 """
167169
168170 feature_group_name : str = attr .ib ()
169171 sagemaker_fs_runtime_client_config : Config = attr .ib ()
170172 max_workers : int = attr .ib (default = 1 )
171173 max_processes : int = attr .ib (default = 1 )
174+ profile_name : str = attr .ib (default = None )
172175 _async_result : AsyncResult = attr .ib (default = None )
173176 _processing_pool : ProcessingPool = attr .ib (default = None )
174177 _failed_indices : List [int ] = attr .ib (factory = list )
@@ -180,6 +183,7 @@ def _ingest_single_batch(
180183 client_config : Config ,
181184 start_index : int ,
182185 end_index : int ,
186+ profile_name : str = None ,
183187 ) -> List [int ]:
184188 """Ingest a single batch of DataFrame rows into FeatureStore.
185189
@@ -190,6 +194,8 @@ def _ingest_single_batch(
190194 client to perform boto calls.
191195 start_index (int): starting position to ingest in this batch.
192196 end_index (int): ending position to ingest in this batch.
197+ profile_name (str): the profile credential should be used for ``PutRecord``
198+ (default: None).
193199
194200 Returns:
195201 List of row indices that failed to be ingested.
@@ -198,7 +204,7 @@ def _ingest_single_batch(
198204 if "max_attempts" not in retry_config and "total_max_attempts" not in retry_config :
199205 client_config = copy .deepcopy (client_config )
200206 client_config .retries = {"max_attempts" : 10 , "mode" : "standard" }
201- sagemaker_featurestore_runtime_client = boto3 .Session ().client (
207+ sagemaker_featurestore_runtime_client = boto3 .Session (profile_name = profile_name ).client (
202208 service_name = "sagemaker-featurestore-runtime" , config = client_config
203209 )
204210
@@ -287,6 +293,7 @@ def _run_multi_process(self, data_frame: DataFrame, wait=True, timeout=None):
287293 data_frame [start_index :end_index ],
288294 start_index ,
289295 timeout ,
296+ self .profile_name ,
290297 )
291298 ]
292299
@@ -311,6 +318,7 @@ def _run_multi_threaded(
311318 data_frame : DataFrame ,
312319 row_offset = 0 ,
313320 timeout = None ,
321+ profile_name = None ,
314322 ) -> List [int ]:
315323 """Start the ingestion process.
316324
@@ -321,6 +329,8 @@ def _run_multi_threaded(
321329 wait (bool): whether to wait for the ingestion to finish or not.
322330 timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
323331 if timeout is reached.
332+ profile_name (str): the profile credential should be used for ``PutRecord``
333+ (default: None).
324334
325335 Returns:
326336 List of row indices that failed to be ingested.
@@ -342,6 +352,7 @@ def _run_multi_threaded(
342352 start_index = start_index ,
343353 end_index = end_index ,
344354 client_config = sagemaker_fs_runtime_client_config ,
355+ profile_name = profile_name ,
345356 )
346357 ] = (start_index + row_offset , end_index + row_offset )
347358
@@ -581,6 +592,7 @@ def ingest(
581592 max_processes : int = 1 ,
582593 wait : bool = True ,
583594 timeout : Union [int , float ] = None ,
595+ profile_name : str = None ,
584596 ) -> IngestionManagerPandas :
585597 """Ingest the content of a pandas DataFrame to feature store.
586598
@@ -599,6 +611,11 @@ def ingest(
599611 They can also be found from the IngestionManagerPandas' ``failed_rows`` function after
600612 the exception is thrown.
601613
614+ `profile_name` argument is an optional one. It will use the default credential if None is
615+ passed. This `profile_name` is used in the sagemaker_featurestore_runtime client only. See
616+ https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for more
617+ about the default credential.
618+
602619 Args:
603620 data_frame (DataFrame): data_frame to be ingested to feature store.
604621 max_workers (int): number of threads to be created.
@@ -607,6 +624,8 @@ def ingest(
607624 wait (bool): whether to wait for the ingestion to finish or not.
608625 timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
609626 if timeout is reached.
627+ profile_name (str): the profile credential should be used for ``PutRecord``
628+ (default: None).
610629
611630 Returns:
612631 An instance of IngestionManagerPandas.
@@ -622,6 +641,7 @@ def ingest(
622641 sagemaker_fs_runtime_client_config = self .sagemaker_session .sagemaker_featurestore_runtime_client .meta .config ,
623642 max_workers = max_workers ,
624643 max_processes = max_processes ,
644+ profile_name = profile_name ,
625645 )
626646
627647 manager .run (data_frame = data_frame , wait = wait , timeout = timeout )
0 commit comments