1818import attr
1919from botocore .exceptions import ClientError
2020
21+ from sagemaker .feature_store .feature_processor ._event_bridge_scheduler_helper import (
22+ EventBridgeSchedulerHelper ,
23+ )
2124from sagemaker .feature_store .feature_processor .lineage ._lineage_association_handler import (
2225 LineageAssociationHandler ,
2326)
@@ -102,7 +105,7 @@ class FeatureProcessorLineageHandler:
102105 output : str = attr .ib (default = None )
103106 transformation_code : TransformationCode = attr .ib (default = None )
104107
105- def create_lineage (self ) -> None :
108+ def create_lineage (self , tags : Optional [ List [ Dict [ str , str ]]] = None ) -> None :
106109 """Create and Update Feature Processor Lineage"""
107110 input_feature_group_contexts : List [
108111 FeatureGroupContexts
@@ -120,6 +123,8 @@ def create_lineage(self) -> None:
120123 )
121124 if transformation_code_artifact is not None :
122125 logger .info ("Created Transformation Code Artifact: %s" , transformation_code_artifact )
126+ if tags :
127+ transformation_code_artifact .set_tags (tags ) # pylint: disable=E1101
123128 # Create the Pipeline Lineage for the first time
124129 if not self ._check_if_pipeline_lineage_exists ():
125130 self ._create_new_pipeline_lineage (
@@ -160,6 +165,7 @@ def create_schedule_lineage(
160165 schedule_expression ,
161166 state ,
162167 start_date : datetime ,
168+ tags : Optional [List [Dict [str , str ]]] = None ,
163169 ) -> None :
164170 """Class to Create and Update FeatureProcessor Lineage Entities.
165171
@@ -171,10 +177,12 @@ def create_schedule_lineage(
171177 state (str):Specifies whether the schedule is enabled or disabled. Valid values are
172178 ENABLED and DISABLED. See https://docs.aws.amazon.com/scheduler/latest/APIReference/
173179 API_CreateSchedule.html#scheduler-CreateSchedule-request-State for more details.
174- If not specified, it will default to DISABLED .
180+ If not specified, it will default to ENABLED .
175181 start_date (Optional[datetime]): The date, in UTC, after which the schedule can begin
176182 invoking its target. Depending on the schedule’s recurrence expression, invocations
177183 might occur on, or after, the StartDate you specify.
184+ tags (Optional[List[Dict[str, str]]]): Custom tags to be attached to schedule
185+ lineage resource.
178186 """
179187 pipeline_context : Context = self ._get_pipeline_context ()
180188 pipeline_version_context : Context = self ._get_pipeline_version_context (
@@ -192,13 +200,55 @@ def create_schedule_lineage(
192200 pipeline_schedule = pipeline_schedule ,
193201 sagemaker_session = self .sagemaker_session ,
194202 )
203+ if tags :
204+ schedule_artifact .set_tags (tags )
195205
196206 LineageAssociationHandler .add_upstream_schedule_associations (
197207 schedule_artifact = schedule_artifact ,
198208 pipeline_version_context_arn = pipeline_version_context .context_arn ,
199209 sagemaker_session = self .sagemaker_session ,
200210 )
201211
212+ def upsert_tags_for_lineage_resources (self , tags : List [Dict [str , str ]]) -> None :
213+ """Add or update tags for lineage resources using tags attached to sagemaker pipeline as
214+
215+ source of truth.
216+
217+ Args:
218+ tags (List[Dict[str, str]]): Custom tags to be attached to lineage resources.
219+ """
220+ if not tags :
221+ return
222+ pipeline_context : Context = self ._get_pipeline_context ()
223+ current_pipeline_version_context : Context = self ._get_pipeline_version_context (
224+ last_update_time = pipeline_context .properties [LAST_UPDATE_TIME ]
225+ )
226+ input_raw_data_artifacts : List [Artifact ] = self ._retrieve_input_raw_data_artifacts ()
227+ pipeline_context .set_tags (tags )
228+ current_pipeline_version_context .set_tags (tags )
229+ for input_raw_data_artifact in input_raw_data_artifacts :
230+ input_raw_data_artifact .set_tags (tags )
231+
232+ event_bridge_scheduler_helper = EventBridgeSchedulerHelper (
233+ self .sagemaker_session ,
234+ self .sagemaker_session .boto_session .client ("scheduler" ),
235+ )
236+ event_bridge_schedule = event_bridge_scheduler_helper .describe_schedule (self .pipeline_name )
237+
238+ if event_bridge_schedule :
239+ schedule_artifact_summary = S3LineageEntityHandler ._load_artifact_from_s3_uri (
240+ s3_uri = event_bridge_schedule ["Arn" ],
241+ sagemaker_session = self .sagemaker_session ,
242+ )
243+ if schedule_artifact_summary is not None :
244+ pipeline_schedule_artifact : Artifact = (
245+ S3LineageEntityHandler .load_artifact_from_arn (
246+ artifact_arn = schedule_artifact_summary .artifact_arn ,
247+ sagemaker_session = self .sagemaker_session ,
248+ )
249+ )
250+ pipeline_schedule_artifact .set_tags (tags )
251+
202252 def _create_new_pipeline_lineage (
203253 self ,
204254 input_feature_group_contexts : List [FeatureGroupContexts ],
0 commit comments