Skip to content

Commit 63b3f01

Browse files
authored
APP-8059 : Added support for DQ rules schedule (#692)
* added dq rules schedule support * added unit test * added doc string and method name change * made the suggested changes * fixed the doc string
1 parent c2fc76e commit 63b3f01

File tree

3 files changed

+82
-2
lines changed

3 files changed

+82
-2
lines changed

pyatlan/client/asset.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
EntityStatus,
8585
SaveSemantic,
8686
SortOrder,
87+
alpha_DQScheduleType,
8788
)
8889
from pyatlan.model.fields.atlan_fields import AtlanField
8990
from pyatlan.model.lineage import LineageDirection, LineageListRequest
@@ -2029,6 +2030,35 @@ def process_assets(
20292030
func(asset)
20302031
return len(guids_processed)
20312032

2033+
@validate_arguments
2034+
def add_dq_rule_schedule(
2035+
self,
2036+
asset_type: Type[A],
2037+
asset_name: str,
2038+
asset_qualified_name: str,
2039+
schedule_crontab: str,
2040+
schedule_time_zone: str,
2041+
) -> AssetMutationResponse:
2042+
"""
2043+
Add a data quality rule schedule to an asset.
2044+
2045+
:param asset_type: the type of asset to update (e.g., Table)
2046+
:param asset_name: the name of the asset to update
2047+
:param asset_qualified_name: the qualified name of the asset to update
2048+
:param schedule_crontab: cron expression string defining the schedule for the DQ rules, e.g: `5 4 * * *`.
2049+
:param schedule_time_zone: timezone for the schedule, e.g: `Europe/Paris`.
2050+
:returns: the result of the save
2051+
:raises AtlanError: on any API communication issue
2052+
"""
2053+
updated_asset = asset_type.updater(
2054+
qualified_name=asset_qualified_name, name=asset_name
2055+
)
2056+
updated_asset.alpha_asset_d_q_schedule_time_zone = schedule_time_zone
2057+
updated_asset.alpha_asset_d_q_schedule_crontab = schedule_crontab
2058+
updated_asset.alpha_asset_d_q_schedule_type = alpha_DQScheduleType.CRON
2059+
response = self.save(updated_asset)
2060+
return response
2061+
20322062

20332063
class SearchResults(ABC, Iterable):
20342064
"""

pyatlan/model/utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def encoders():
2525
}
2626

2727

28-
def convert_with_fixed_prefix(input_str, fixed_prefix="alpha_dq"):
28+
def convert_with_fixed_prefix(input_str, fixed_prefix):
2929
prefix = fixed_prefix
3030
remaining = input_str[len(prefix) + 1 :]
3131
parts = remaining.split("_")
@@ -39,7 +39,9 @@ def to_camel_case(value: str) -> str:
3939
if value == "__root__":
4040
return value
4141
if value.startswith("alpha_dq"):
42-
return convert_with_fixed_prefix(value)
42+
return convert_with_fixed_prefix(input_str=value, fixed_prefix="alpha_dq")
43+
if value.startswith("alpha_asset"):
44+
return convert_with_fixed_prefix(input_str=value, fixed_prefix="alpha_asset")
4345
if value in CAMEL_CASE_OVERRIDES:
4446
return CAMEL_CASE_OVERRIDES[value]
4547
value = "".join(word.capitalize() for word in value.split("_"))

tests/unit/test_client.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
LineageDirection,
4949
SaveSemantic,
5050
SortOrder,
51+
alpha_DQScheduleType,
5152
)
5253
from pyatlan.model.fluent_search import CompoundQuery, FluentSearch
5354
from pyatlan.model.group import GroupRequest
@@ -2716,3 +2717,50 @@ def test_get_by_qualified_name_asset_not_found(mock_api_caller):
27162717
)
27172718

27182719
mock_execute.assert_called_once()
2720+
2721+
2722+
def test_add_dq_rule_schedule(mock_api_caller):
2723+
asset_client = AssetClient(mock_api_caller)
2724+
asset_type = Table
2725+
asset_name = "Test Table"
2726+
asset_qualified_name = "test/qualified/name"
2727+
schedule_cron_string = "0 0 * * *"
2728+
schedule_time_zone = "UTC"
2729+
2730+
updated_table = Table()
2731+
updated_table.guid = "test-guid-123"
2732+
updated_table.alpha_asset_d_q_schedule_time_zone = schedule_time_zone
2733+
updated_table.alpha_asset_d_q_schedule_crontab = schedule_cron_string
2734+
updated_table.alpha_asset_d_q_schedule_type = alpha_DQScheduleType.CRON
2735+
2736+
mock_response = Mock(spec=AssetMutationResponse)
2737+
2738+
with patch.object(
2739+
asset_type, "updater", return_value=updated_table
2740+
) as mock_updater:
2741+
with patch.object(
2742+
asset_client, "save", return_value=mock_response
2743+
) as mock_save:
2744+
result = asset_client.add_dq_rule_schedule(
2745+
asset_type=asset_type,
2746+
asset_name=asset_name,
2747+
asset_qualified_name=asset_qualified_name,
2748+
schedule_crontab=schedule_cron_string,
2749+
schedule_time_zone=schedule_time_zone,
2750+
)
2751+
2752+
mock_updater.assert_called_once_with(
2753+
qualified_name=asset_qualified_name,
2754+
name=asset_name,
2755+
)
2756+
assert (
2757+
updated_table.alpha_asset_d_q_schedule_time_zone == schedule_time_zone
2758+
)
2759+
assert (
2760+
updated_table.alpha_asset_d_q_schedule_crontab == schedule_cron_string
2761+
)
2762+
assert (
2763+
updated_table.alpha_asset_d_q_schedule_type == alpha_DQScheduleType.CRON
2764+
)
2765+
mock_save.assert_called_once_with(updated_table)
2766+
assert result == mock_response

0 commit comments

Comments
 (0)