Skip to content

Commit 10fdfb3

Browse files
kukushkingjaidisidoLeonLuttenberger
authored
(feat) glue data quality (#1861)
* Add data quality module. * Remove copy-paste * Refactor - create package, move ruleset evaluation utils to _utils * [skip ci] Add data_quality.evaluate_ruleset(), result processing TBC * [skip ci] Docstrings * [skip ci] Return normalized evaluation results * [skip ci] Add examples in docstrings && fix typing * [skip ci] Minor - newlines * [skip ci] Add Glue DQ IAM role to test infra * [skip ci] Handle multiple rulesets && add client token * [skip ci] Add update_ruleset * [skip ci] Add pushdownPredicate test case * [skip ci] Minor - add to final statuses * (feat): Add recommendation and get APIs (#1866) * Add recommendation and get APIs * Minor - revert conftest change * Handle edge case in regex * Add missing permission to data quality IAM role * Add Glue Data Quality tutorial. * [skip ci] Set number of workers to 2 to minimize charges * Add more unit tests to increase coverage Co-authored-by: jaidisido <[email protected]> Co-authored-by: Leon Luttenberger <[email protected]>
1 parent b131b6d commit 10fdfb3

File tree

11 files changed

+1135
-2
lines changed

11 files changed

+1135
-2
lines changed

awswrangler/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
chime,
1414
cloudwatch,
1515
data_api,
16+
data_quality,
1617
dynamodb,
1718
emr,
1819
exceptions,
@@ -40,6 +41,7 @@
4041
"cloudwatch",
4142
"emr",
4243
"data_api",
44+
"data_quality",
4345
"dynamodb",
4446
"exceptions",
4547
"opensearch",
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"""AWS Glue Data Quality package."""
2+
3+
from awswrangler.data_quality._create import (
4+
create_recommendation_ruleset,
5+
create_ruleset,
6+
evaluate_ruleset,
7+
update_ruleset,
8+
)
9+
from awswrangler.data_quality._get import get_ruleset
10+
11+
__all__ = [
12+
"create_recommendation_ruleset",
13+
"create_ruleset",
14+
"evaluate_ruleset",
15+
"get_ruleset",
16+
"update_ruleset",
17+
]
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
"""AWS Glue Data Quality Create module."""
2+
3+
import logging
4+
import pprint
5+
import uuid
6+
from typing import Any, Dict, List, Optional, Union, cast
7+
8+
import boto3
9+
import pandas as pd
10+
11+
from awswrangler import _utils, exceptions
12+
from awswrangler._config import apply_configs
13+
from awswrangler.data_quality._utils import (
14+
_create_datasource,
15+
_get_data_quality_results,
16+
_rules_to_df,
17+
_start_ruleset_evaluation_run,
18+
_wait_ruleset_run,
19+
)
20+
21+
_logger: logging.Logger = logging.getLogger(__name__)
22+
23+
24+
def _create_dqdl(
25+
df_rules: pd.DataFrame,
26+
) -> str:
27+
"""Create DQDL from pandas data frame."""
28+
rules = []
29+
for rule_type, parameter, expression in df_rules.itertuples(index=False):
30+
parameter_str = f' "{parameter}" ' if parameter else " "
31+
expression_str = expression if expression else ""
32+
rules.append(f"{rule_type}{parameter_str}{expression_str}")
33+
return "Rules = [ " + ", ".join(rules) + " ]"
34+
35+
36+
@apply_configs
37+
def create_ruleset(
38+
name: str,
39+
database: str,
40+
table: str,
41+
df_rules: Optional[pd.DataFrame] = None,
42+
dqdl_rules: Optional[str] = None,
43+
description: str = "",
44+
client_token: Optional[str] = None,
45+
boto3_session: Optional[boto3.Session] = None,
46+
) -> None:
47+
"""Create Data Quality ruleset.
48+
49+
Parameters
50+
----------
51+
name : str
52+
Ruleset name.
53+
database : str
54+
Glue database name.
55+
table : str
56+
Glue table name.
57+
df_rules : str, optional
58+
Data frame with `rule_type`, `parameter`, and `expression` columns.
59+
dqdl_rules : str, optional
60+
Data Quality Definition Language definition.
61+
description : str
62+
Ruleset description.
63+
client_token : str, optional
64+
Random id used for idempotency. Is automatically generated if not provided.
65+
boto3_session : boto3.Session, optional
66+
Boto3 Session. If none, the default boto3 session is used.
67+
68+
Examples
69+
--------
70+
>>> import awswrangler as wr
71+
>>> import pandas as pd
72+
>>>
73+
>>> df = pd.DataFrame({"c0": [0, 1, 2], "c1": [0, 1, 2], "c2": [0, 0, 1]})
74+
>>> wr.s3.to_parquet(df, path, dataset=True, database="database", table="table")
75+
>>> wr.data_quality.create_ruleset(
76+
>>> name="ruleset",
77+
>>> database="database",
78+
>>> table="table",
79+
>>> dqdl_rules="Rules = [ RowCount between 1 and 3 ]",
80+
>>>)
81+
82+
>>> import awswrangler as wr
83+
>>> import pandas as pd
84+
>>>
85+
>>> df = pd.DataFrame({"c0": [0, 1, 2], "c1": [0, 1, 2], "c2": [0, 0, 1]})
86+
>>> df_rules = pd.DataFrame({
87+
>>> "rule_type": ["RowCount", "IsComplete", "Uniqueness"],
88+
>>> "parameter": [None, "c0", "c0"],
89+
>>> "expression": ["between 1 and 6", None, "> 0.95"],
90+
>>> })
91+
>>> wr.s3.to_parquet(df, path, dataset=True, database="database", table="table")
92+
>>> wr.data_quality.create_ruleset(
93+
>>> name="ruleset",
94+
>>> database="database",
95+
>>> table="table",
96+
>>> df_rules=df_rules,
97+
>>>)
98+
"""
99+
if (df_rules is not None and dqdl_rules) or (df_rules is None and not dqdl_rules):
100+
raise exceptions.InvalidArgumentCombination("You must pass either ruleset `df_rules` or `dqdl_rules`.")
101+
102+
client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session)
103+
dqdl_rules = _create_dqdl(df_rules) if df_rules is not None else dqdl_rules
104+
105+
try:
106+
client_glue.create_data_quality_ruleset(
107+
Name=name,
108+
Description=description,
109+
Ruleset=dqdl_rules,
110+
TargetTable={
111+
"TableName": table,
112+
"DatabaseName": database,
113+
},
114+
ClientToken=client_token if client_token else uuid.uuid4().hex,
115+
)
116+
except client_glue.exceptions.AlreadyExistsException as not_found:
117+
raise exceptions.AlreadyExists(f"Ruleset {name} already exists.") from not_found
118+
119+
120+
@apply_configs
121+
def update_ruleset(
122+
name: str,
123+
updated_name: Optional[str] = None,
124+
df_rules: Optional[pd.DataFrame] = None,
125+
dqdl_rules: Optional[str] = None,
126+
description: str = "",
127+
boto3_session: Optional[boto3.Session] = None,
128+
) -> None:
129+
"""Update Data Quality ruleset.
130+
131+
Parameters
132+
----------
133+
name : str
134+
Ruleset name.
135+
updated_name : str
136+
New ruleset name if renaming an existing ruleset.
137+
df_rules : str, optional
138+
Data frame with `rule_type`, `parameter`, and `expression` columns.
139+
dqdl_rules : str, optional
140+
Data Quality Definition Language definition.
141+
description : str
142+
Ruleset description.
143+
boto3_session : boto3.Session, optional
144+
Boto3 Session. If none, the default boto3 session is used.
145+
146+
Examples
147+
--------
148+
>>> wr.data_quality.update_ruleset(
149+
>>> name="ruleset",
150+
>>> new_name="my_ruleset",
151+
>>> dqdl_rules="Rules = [ RowCount between 1 and 3 ]",
152+
>>>)
153+
"""
154+
if (df_rules is not None and dqdl_rules) or (df_rules is None and not dqdl_rules):
155+
raise exceptions.InvalidArgumentCombination("You must pass either ruleset `df_rules` or `dqdl_rules`.")
156+
157+
client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session)
158+
dqdl_rules = _create_dqdl(df_rules) if df_rules is not None else dqdl_rules
159+
160+
try:
161+
client_glue.update_data_quality_ruleset(
162+
Name=name,
163+
UpdatedName=updated_name,
164+
Description=description,
165+
Ruleset=dqdl_rules,
166+
)
167+
except client_glue.exceptions.EntityNotFoundException as not_found:
168+
raise exceptions.ResourceDoesNotExist(f"Ruleset {name} does not exist.") from not_found
169+
170+
171+
@apply_configs
172+
def create_recommendation_ruleset(
173+
database: str,
174+
table: str,
175+
iam_role_arn: str,
176+
name: Optional[str] = None,
177+
catalog_id: Optional[str] = None,
178+
connection_name: Optional[str] = None,
179+
additional_options: Optional[Dict[str, Any]] = None,
180+
number_of_workers: int = 5,
181+
timeout: int = 2880,
182+
client_token: Optional[str] = None,
183+
boto3_session: Optional[boto3.Session] = None,
184+
) -> pd.DataFrame:
185+
"""Create recommendation Data Quality ruleset.
186+
187+
Parameters
188+
----------
189+
database : str
190+
Glue database name.
191+
table : str
192+
Glue table name.
193+
iam_role_arn : str
194+
IAM Role ARN.
195+
name : str, optional
196+
Ruleset name.
197+
catalog_id : str, optional
198+
Glue Catalog id.
199+
connection_name : str, optional
200+
Glue connection name.
201+
additional_options : dict, optional
202+
Additional options for the table. Supported keys:
203+
`pushDownPredicate`: to filter on partitions without having to list and read all the files in your dataset.
204+
`catalogPartitionPredicate`: to use server-side partition pruning using partition indexes in the
205+
Glue Data Catalog.
206+
number_of_workers: int, optional
207+
The number of G.1X workers to be used in the run. The default is 5.
208+
timeout: int, optional
209+
The timeout for a run in minutes. The default is 2880 (48 hours).
210+
client_token : str, optional
211+
Random id used for idempotency. Is automatically generated if not provided.
212+
boto3_session : boto3.Session, optional
213+
Boto3 Session. If none, the default boto3 session is used.
214+
215+
Returns
216+
-------
217+
pd.DataFrame
218+
Data frame with recommended ruleset details.
219+
220+
Examples
221+
--------
222+
>>> import awswrangler as wr
223+
224+
>>> df_recommended_ruleset = wr.data_quality.create_recommendation_ruleset(
225+
>>> database="database",
226+
>>> table="table",
227+
>>> iam_role_arn="arn:...",
228+
>>>)
229+
"""
230+
client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session)
231+
232+
args: Dict[str, Any] = {
233+
"DataSource": _create_datasource(
234+
database=database,
235+
table=table,
236+
catalog_id=catalog_id,
237+
connection_name=connection_name,
238+
additional_options=additional_options,
239+
),
240+
"Role": iam_role_arn,
241+
"NumberOfWorkers": number_of_workers,
242+
"Timeout": timeout,
243+
"ClientToken": client_token if client_token else uuid.uuid4().hex,
244+
}
245+
if name:
246+
args["CreatedRulesetName"] = name
247+
_logger.debug("args: \n%s", pprint.pformat(args))
248+
run_id: str = cast(str, client_glue.start_data_quality_rule_recommendation_run(**args)["RunId"])
249+
250+
_logger.debug("run_id: %s", run_id)
251+
dqdl_recommended_rules: str = cast(
252+
str,
253+
_wait_ruleset_run(
254+
run_id=run_id,
255+
run_type="recommendation",
256+
boto3_session=boto3_session,
257+
)["RecommendedRuleset"],
258+
)
259+
return _rules_to_df(rules=dqdl_recommended_rules)
260+
261+
262+
@apply_configs
263+
def evaluate_ruleset(
264+
name: Union[str, List[str]],
265+
iam_role_arn: str,
266+
number_of_workers: int = 5,
267+
timeout: int = 2880,
268+
database: Optional[str] = None,
269+
table: Optional[str] = None,
270+
catalog_id: Optional[str] = None,
271+
connection_name: Optional[str] = None,
272+
additional_options: Optional[Dict[str, str]] = None,
273+
additional_run_options: Optional[Dict[str, str]] = None,
274+
client_token: Optional[str] = None,
275+
boto3_session: Optional[boto3.Session] = None,
276+
) -> pd.DataFrame:
277+
"""Evaluate Data Quality ruleset.
278+
279+
Parameters
280+
----------
281+
name : str or list[str]
282+
Ruleset name or list of names.
283+
iam_role_arn : str
284+
IAM Role ARN.
285+
number_of_workers: int, optional
286+
The number of G.1X workers to be used in the run. The default is 5.
287+
timeout: int, optional
288+
The timeout for a run in minutes. The default is 2880 (48 hours).
289+
database : str, optional
290+
Glue database name. Database associated with the ruleset will be used if not provided.
291+
table : str, optional
292+
Glue table name. Table associated with the ruleset will be used if not provided.
293+
catalog_id : str, optional
294+
Glue Catalog id.
295+
connection_name : str, optional
296+
Glue connection name.
297+
additional_options : dict, optional
298+
Additional options for the table. Supported keys:
299+
`pushDownPredicate`: to filter on partitions without having to list and read all the files in your dataset.
300+
`catalogPartitionPredicate`: to use server-side partition pruning using partition indexes in the
301+
Glue Data Catalog.
302+
additional_run_options : Dict[str, str], optional
303+
Additional run options. Supported keys:
304+
`CloudWatchMetricsEnabled`: whether to enable CloudWatch metrics.
305+
`ResultsS3Prefix`: prefix for Amazon S3 to store results.
306+
client_token : str, optional
307+
Random id used for idempotency. Will be automatically generated if not provided.
308+
boto3_session : boto3.Session, optional
309+
Boto3 Session. If none, the default boto3 session is used.
310+
311+
Returns
312+
-------
313+
pd.DataFrame
314+
Data frame with ruleset evaluation results.
315+
316+
Examples
317+
--------
318+
>>> import awswrangler as wr
319+
>>> import pandas as pd
320+
>>>
321+
>>> df = pd.DataFrame({"c0": [0, 1, 2], "c1": [0, 1, 2], "c2": [0, 0, 1]})
322+
>>> wr.s3.to_parquet(df, path, dataset=True, database="database", table="table")
323+
>>> wr.data_quality.create_ruleset(
324+
>>> name="ruleset",
325+
>>> database="database",
326+
>>> table="table",
327+
>>> dqdl_rules="Rules = [ RowCount between 1 and 3 ]",
328+
>>>)
329+
>>> df_ruleset_results = wr.data_quality.evaluate_ruleset(
330+
>>> name=["ruleset1", "rulseset2"],
331+
>>> iam_role_arn=glue_data_quality_role,
332+
>>> )
333+
"""
334+
run_id: str = _start_ruleset_evaluation_run(
335+
ruleset_names=[name] if isinstance(name, str) else name,
336+
iam_role_arn=iam_role_arn,
337+
number_of_workers=number_of_workers,
338+
timeout=timeout,
339+
database=database,
340+
table=table,
341+
catalog_id=catalog_id,
342+
connection_name=connection_name,
343+
additional_options=additional_options,
344+
additional_run_options=additional_run_options,
345+
client_token=client_token if client_token else uuid.uuid4().hex,
346+
boto3_session=boto3_session,
347+
)
348+
_logger.debug("run_id: %s", run_id)
349+
result_ids: List[str] = cast(
350+
List[str],
351+
_wait_ruleset_run(
352+
run_id=run_id,
353+
run_type="evaluation",
354+
boto3_session=boto3_session,
355+
)["ResultIds"],
356+
)
357+
return _get_data_quality_results(result_ids=result_ids, boto3_session=boto3_session)

0 commit comments

Comments
 (0)