@@ -584,9 +584,9 @@ def save(
584
584
self ,
585
585
entity : Union [Asset , List [Asset ]],
586
586
replace_atlan_tags : bool = False ,
587
- append_atlan_tags : bool = False ,
588
587
replace_custom_metadata : bool = False ,
589
588
overwrite_custom_metadata : bool = False ,
589
+ append_atlan_tags : bool = False ,
590
590
) -> AssetMutationResponse :
591
591
"""
592
592
If an asset with the same qualified_name exists, updates the existing asset. Otherwise, creates the asset.
@@ -597,6 +597,7 @@ def save(
597
597
:param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
598
598
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
599
599
:param overwrite_custom_metadata: overwrites any custom metadata, even with empty values
600
+ :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False)
600
601
:returns: the result of the save
601
602
:raises AtlanError: on any API communication issue
602
603
:raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit
@@ -875,50 +876,33 @@ def _restore_asset(self, asset: Asset) -> AssetMutationResponse:
875
876
raw_json = self ._client ._call_api (BULK_UPDATE , query_params , request )
876
877
return AssetMutationResponse (** raw_json )
877
878
878
- @validate_arguments
879
- def add_atlan_tags (
879
+ def _modify_tags (
880
880
self ,
881
+ type_of_modification ,
881
882
asset_type : Type [A ],
882
883
qualified_name : str ,
883
884
atlan_tag_names : List [str ],
884
885
propagate : bool = False ,
885
886
remove_propagation_on_delete : bool = True ,
886
887
restrict_lineage_propagation : bool = False ,
887
888
restrict_propagation_through_hierarchy : bool = False ,
889
+ replace_atlan_tags : bool = False ,
890
+ append_atlan_tags : bool = False ,
888
891
) -> A :
889
- """
890
- Add one or more Atlan tags to the provided asset.
891
-
892
- :param asset_type: type of asset to which to add the Atlan tags
893
- :param qualified_name: qualified_name of the asset to which to add the Atlan tags
894
- :param atlan_tag_names: human-readable names of the Atlan tags to add to the asset
895
- :param propagate: whether to propagate the Atlan tag (True) or not (False)
896
- :param remove_propagation_on_delete: whether to remove the propagated Atlan tags
897
- when the Atlan tag is removed from this asset (True) or not (False)
898
- :param restrict_lineage_propagation: whether to avoid propagating
899
- through lineage (True) or do propagate through lineage (False)
900
- :param restrict_propagation_through_hierarchy: whether to prevent this Atlan tag from
901
- propagating through hierarchy (True) or allow it to propagate through hierarchy (False)
902
- :returns: the asset that was updated (note that it will NOT contain details of the added Atlan tags)
903
- :raises AtlanError: on any API communication issue
904
- """
905
- from pyatlan .client .atlan import AtlanClient
906
-
907
- client = AtlanClient .get_current_client ()
908
- name_asset = client .asset .get_by_qualified_name (
892
+ reterieved_asset = self .get_by_qualified_name (
909
893
qualified_name = qualified_name ,
910
894
asset_type = asset_type ,
911
- ignore_relationships = False ,
895
+ attributes = [ "anchor" ] ,
912
896
)
913
897
if asset_type in (AtlasGlossaryTerm , AtlasGlossaryCategory ):
914
898
updated_asset = asset_type .updater (
915
899
qualified_name = qualified_name ,
916
- name = name_asset .name ,
917
- glossary_guid = name_asset .anchor .guid , # type: ignore
900
+ name = reterieved_asset .name ,
901
+ glossary_guid = reterieved_asset .anchor .guid , # type: ignore
918
902
)
919
903
else :
920
904
updated_asset = asset_type .updater (
921
- qualified_name = qualified_name , name = name_asset .name
905
+ qualified_name = qualified_name , name = reterieved_asset .name
922
906
)
923
907
924
908
atlan_tag = [
@@ -932,14 +916,66 @@ def add_atlan_tags(
932
916
for name in atlan_tag_names
933
917
]
934
918
935
- updated_asset .add_or_update_classifications = atlan_tag
919
+ if type_of_modification == "add" or "update" :
920
+ updated_asset .add_or_update_classifications = atlan_tag
921
+ if type_of_modification == "remove" :
922
+ updated_asset .remove_classifications = atlan_tag
923
+ if type_of_modification == "replace" :
924
+ updated_asset .classifications = atlan_tag
936
925
937
- response = self .save (entity = updated_asset , append_atlan_tags = True )
926
+ response = self .save (
927
+ entity = updated_asset ,
928
+ replace_atlan_tags = replace_atlan_tags ,
929
+ append_atlan_tags = append_atlan_tags ,
930
+ )
938
931
939
932
if assets := response .assets_updated (asset_type = asset_type ):
940
933
return assets [0 ]
941
934
return updated_asset
942
935
936
+ @validate_arguments
937
+ def add_atlan_tags (
938
+ self ,
939
+ asset_type : Type [A ],
940
+ qualified_name : str ,
941
+ atlan_tag_names : List [str ],
942
+ propagate : bool = False ,
943
+ remove_propagation_on_delete : bool = True ,
944
+ restrict_lineage_propagation : bool = False ,
945
+ restrict_propagation_through_hierarchy : bool = False ,
946
+ ) -> A :
947
+ """
948
+ Add one or more Atlan tags to the provided asset.
949
+
950
+ :param asset_type: type of asset to which to add the Atlan tags
951
+ :param qualified_name: qualified_name of the asset to which to add the Atlan tags
952
+ :param atlan_tag_names: human-readable names of the Atlan tags to add to the asset
953
+ :param propagate: whether to propagate the Atlan tag (True) or not (False)
954
+ :param remove_propagation_on_delete: whether to remove the propagated Atlan tags
955
+ when the Atlan tag is removed from this asset (True) or not (False)
956
+ :param restrict_lineage_propagation: whether to avoid propagating
957
+ through lineage (True) or do propagate through lineage (False)
958
+ :param restrict_propagation_through_hierarchy: whether to prevent this Atlan tag from
959
+ propagating through hierarchy (True) or allow it to propagate through hierarchy (False)
960
+ :returns: the asset that was updated (note that it will NOT contain details of the added Atlan tags)
961
+ :raises AtlanError: on any API communication issue
962
+ """
963
+
964
+ response = self ._modify_tags (
965
+ type_of_modification = "add" ,
966
+ asset_type = asset_type ,
967
+ qualified_name = qualified_name ,
968
+ atlan_tag_names = atlan_tag_names ,
969
+ propagate = propagate ,
970
+ remove_propagation_on_delete = remove_propagation_on_delete ,
971
+ restrict_lineage_propagation = restrict_lineage_propagation ,
972
+ restrict_propagation_through_hierarchy = restrict_propagation_through_hierarchy ,
973
+ replace_atlan_tags = False ,
974
+ append_atlan_tags = True ,
975
+ )
976
+
977
+ return response
978
+
943
979
@validate_arguments
944
980
def update_atlan_tags (
945
981
self ,
@@ -967,40 +1003,21 @@ def update_atlan_tags(
967
1003
:returns: the asset that was updated (note that it will NOT contain details of the updated Atlan tags)
968
1004
:raises AtlanError: on any API communication issue
969
1005
"""
970
- from pyatlan .client .atlan import AtlanClient
971
1006
972
- client = AtlanClient .get_current_client ()
973
- name_asset = client .asset .get_by_qualified_name (
974
- qualified_name = qualified_name ,
1007
+ response = self ._modify_tags (
1008
+ type_of_modification = "update" ,
975
1009
asset_type = asset_type ,
976
- ignore_relationships = False ,
1010
+ qualified_name = qualified_name ,
1011
+ atlan_tag_names = atlan_tag_names ,
1012
+ propagate = propagate ,
1013
+ remove_propagation_on_delete = remove_propagation_on_delete ,
1014
+ restrict_lineage_propagation = restrict_lineage_propagation ,
1015
+ restrict_propagation_through_hierarchy = restrict_propagation_through_hierarchy ,
1016
+ replace_atlan_tags = False ,
1017
+ append_atlan_tags = True ,
977
1018
)
978
- if asset_type in (AtlasGlossaryTerm , AtlasGlossaryCategory ):
979
- updated_asset = asset_type .updater (
980
- qualified_name = qualified_name ,
981
- name = name_asset .name ,
982
- glossary_guid = name_asset .anchor .guid , # type: ignore
983
- )
984
- else :
985
- updated_asset = asset_type .updater (
986
- qualified_name = qualified_name , name = name_asset .name
987
- )
988
1019
989
- atlan_tag = [
990
- AtlanTag (
991
- type_name = AtlanTagName (display_text = name ),
992
- propagate = propagate ,
993
- remove_propagations_on_entity_delete = remove_propagation_on_delete ,
994
- restrict_propagation_through_lineage = restrict_lineage_propagation ,
995
- restrict_propagation_through_hierarchy = restrict_propagation_through_hierarchy ,
996
- )
997
- for name in atlan_tag_names
998
- ]
999
- updated_asset .add_or_update_classifications = atlan_tag
1000
- response = self .save (entity = updated_asset , append_atlan_tags = True )
1001
- if assets := response .assets_updated (asset_type = asset_type ):
1002
- return assets [0 ]
1003
- return updated_asset
1020
+ return response
1004
1021
1005
1022
@validate_arguments
1006
1023
def remove_atlan_tag (
@@ -1018,32 +1035,17 @@ def remove_atlan_tag(
1018
1035
:returns: the asset that was updated (note that it will NOT contain details of the deleted Atlan tag)
1019
1036
:raises AtlanError: on any API communication issue
1020
1037
"""
1021
- from pyatlan .client .atlan import AtlanClient
1022
1038
1023
- client = AtlanClient .get_current_client ()
1024
- name_asset = client .asset .get_by_qualified_name (
1025
- qualified_name = qualified_name ,
1039
+ response = self ._modify_tags (
1040
+ type_of_modification = "remove" ,
1026
1041
asset_type = asset_type ,
1027
- ignore_relationships = False ,
1042
+ qualified_name = qualified_name ,
1043
+ atlan_tag_names = [atlan_tag_name ],
1044
+ replace_atlan_tags = False ,
1045
+ append_atlan_tags = True ,
1028
1046
)
1029
1047
1030
- if asset_type in (AtlasGlossaryTerm , AtlasGlossaryCategory ):
1031
- updated_asset = asset_type .updater (
1032
- qualified_name = qualified_name ,
1033
- name = name_asset .name ,
1034
- glossary_guid = name_asset .anchor .guid , # type: ignore
1035
- )
1036
- else :
1037
- updated_asset = asset_type .updater (
1038
- qualified_name = qualified_name , name = name_asset .name
1039
- )
1040
-
1041
- atlan_tag = [AtlanTag (type_name = AtlanTagName (display_text = atlan_tag_name ))]
1042
- updated_asset .remove_classifications = atlan_tag
1043
- response = self .save (entity = updated_asset , append_atlan_tags = True )
1044
- if assets := response .assets_updated (asset_type = asset_type ):
1045
- return assets [0 ]
1046
- return updated_asset
1048
+ return response
1047
1049
1048
1050
@validate_arguments
1049
1051
def remove_atlan_tags (
@@ -1061,35 +1063,16 @@ def remove_atlan_tags(
1061
1063
:returns: the asset that was updated (note that it will NOT contain details of the deleted Atlan tags)
1062
1064
:raises AtlanError: on any API communication issue
1063
1065
"""
1064
- from pyatlan .client .atlan import AtlanClient
1065
-
1066
- client = AtlanClient .get_current_client ()
1067
- name_asset = client .asset .get_by_qualified_name (
1068
- qualified_name = qualified_name ,
1066
+ response = self ._modify_tags (
1067
+ type_of_modification = "remove" ,
1069
1068
asset_type = asset_type ,
1070
- ignore_relationships = False ,
1069
+ qualified_name = qualified_name ,
1070
+ atlan_tag_names = atlan_tag_names ,
1071
+ replace_atlan_tags = False ,
1072
+ append_atlan_tags = True ,
1071
1073
)
1072
1074
1073
- if asset_type in (AtlasGlossaryTerm , AtlasGlossaryCategory ):
1074
- updated_asset = asset_type .updater (
1075
- qualified_name = qualified_name ,
1076
- name = name_asset .name ,
1077
- glossary_guid = name_asset .anchor .guid , # type: ignore
1078
- )
1079
- else :
1080
- updated_asset = asset_type .updater (
1081
- qualified_name = qualified_name , name = name_asset .name
1082
- )
1083
-
1084
- atlan_tag = [
1085
- AtlanTag (type_name = AtlanTagName (display_text = name ))
1086
- for name in atlan_tag_names
1087
- ]
1088
- updated_asset .remove_classifications = atlan_tag
1089
- response = self .save (entity = updated_asset , append_atlan_tags = True )
1090
- if assets := response .assets_updated (asset_type = asset_type ):
1091
- return assets [0 ]
1092
- return updated_asset
1075
+ return response
1093
1076
1094
1077
def _update_asset_by_attribute (
1095
1078
self , asset : A , asset_type : Type [A ], qualified_name : str
0 commit comments