3939)
4040from sagemaker .session import get_execution_role , Session
4141from tests .integ .timeout import timeout
42+ from urllib .parse import urlparse
4243
4344BUCKET_POLICY = {
4445 "Version" : "2012-10-17" ,
@@ -635,8 +636,8 @@ def test_create_dataset_with_feature_group_base(
635636 )
636637
637638 with timeout (minutes = 10 ) and cleanup_offline_store (
638- base_table_name , feature_store_session
639- ) and cleanup_offline_store (feature_group_table_name , feature_store_session ):
639+ base , feature_store_session
640+ ) and cleanup_offline_store (feature_group , feature_store_session ):
640641 feature_store = FeatureStore (sagemaker_session = feature_store_session )
641642 df , query_string = (
642643 feature_store .create_dataset (base = base , output_path = offline_store_s3_uri )
@@ -663,7 +664,7 @@ def test_create_dataset_with_feature_group_base(
663664
664665 assert sorted_df .equals (expect_df )
665666 assert (
666- query_string
667+ query_string . strip ()
667668 == "WITH fg_base AS (WITH table_base AS (\n "
668669 + "SELECT *\n "
669670 + "FROM (\n "
@@ -817,8 +818,8 @@ def test_create_dataset_with_feature_group_base_with_additional_params(
817818 )
818819
819820 with timeout (minutes = 10 ) and cleanup_offline_store (
820- base_table_name , feature_store_session
821- ) and cleanup_offline_store (feature_group_table_name , feature_store_session ):
821+ base , feature_store_session
822+ ) and cleanup_offline_store (feature_group , feature_store_session ):
822823 feature_store = FeatureStore (sagemaker_session = feature_store_session )
823824 df , query_string = (
824825 feature_store .create_dataset (base = base , output_path = offline_store_s3_uri )
@@ -850,7 +851,7 @@ def test_create_dataset_with_feature_group_base_with_additional_params(
850851
851852 assert sorted_df .equals (expect_df )
852853 assert (
853- query_string
854+ query_string . strip ()
854855 == "WITH fg_base AS (WITH table_base AS (\n "
855856 + "SELECT *\n "
856857 + "FROM (\n "
@@ -1068,25 +1069,29 @@ def cleanup_feature_group(feature_group: FeatureGroup):
10681069
10691070
10701071@contextmanager
1071- def cleanup_offline_store (table_name : str , feature_store_session : Session ):
1072+ def cleanup_offline_store (feature_group : FeatureGroup , feature_store_session : Session ):
10721073 try :
10731074 yield
10741075 finally :
1076+ feature_group_metadata = feature_group .describe ()
1077+ feature_group_name = feature_group_metadata ["FeatureGroupName" ]
10751078 try :
1079+ s3_uri = feature_group_metadata ["OfflineStoreConfig" ]["S3StorageConfig" ][
1080+ "ResolvedOutputS3Uri"
1081+ ]
1082+ parsed_uri = urlparse (s3_uri )
1083+ bucket_name , prefix = parsed_uri .netloc , parsed_uri .path
1084+ prefix = prefix .strip ("/" )
1085+ prefix = prefix [:- 5 ] if prefix .endswith ("/data" ) else prefix
10761086 region_name = feature_store_session .boto_session .region_name
10771087 s3_client = feature_store_session .boto_session .client (
10781088 service_name = "s3" , region_name = region_name
10791089 )
1080- account_id = feature_store_session .account_id ()
1081- bucket_name = f"sagemaker-test-featurestore-{ region_name } -{ account_id } "
1082- response = s3_client .list_objects_v2 (
1083- Bucket = bucket_name ,
1084- Prefix = f"{ account_id } /sagemaker/{ region_name } /offline-store/{ table_name } /" ,
1085- )
1090+ response = s3_client .list_objects_v2 (Bucket = bucket_name , Prefix = prefix )
10861091 files_in_folder = response ["Contents" ]
10871092 files_to_delete = []
10881093 for f in files_in_folder :
10891094 files_to_delete .append ({"Key" : f ["Key" ]})
10901095 s3_client .delete_objects (Bucket = bucket_name , Delete = {"Objects" : files_to_delete })
1091- except Exception :
1092- raise RuntimeError (f"Failed to delete data under { table_name } " )
1096+ except Exception as e :
1097+ raise RuntimeError (f"Failed to delete data for feature_group { feature_group_name } " , e )
0 commit comments