Skip to content

Commit 70e9461

Browse files
Refactor to use from_tables api with scan and search (#76)
* Refactor to use from_tables api with scan and search * Refactored with_sql and apply methods * Metadata query speedup --------- Co-authored-by: Erni Durdevic <[email protected]>
1 parent 2cf4b18 commit 70e9461

File tree

7 files changed

+500
-17
lines changed

7 files changed

+500
-17
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ The available `dx` functions are
6161
* `with_concurrency` defines how many queries are executed concurrently (10 by defailt)
6262
* `apply_sql` applies a SQL template to all tables. After this command you can apply an [action](#from_tables-actions). See in-depth documentation [here](docs/Arbitrary_multi-table_SQL.md).
6363
* `unpivot_string_columns` returns a melted (unpivoted) dataframe with all string columns from the selected tables. After this command you can apply an [action](#from_tables-actions)
64+
* `scan` (experimental) scans the lakehouse with regex expressions defined by the rules and to power the semantic classification.
6465
* `intro` gives an introduction to the library
6566
* `scan` scans the lakehouse with regex expressions defined by the rules and to power the semantic classification. [Documentation](docs/Semantic_classification.md)
6667
* `display_rules` shows the rules available for semantic classification

discoverx/discovery.py

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
from typing import Optional, List, Union
2+
3+
from discoverx import logging
4+
from discoverx.msql import Msql
5+
from discoverx.scanner import TableInfo
6+
from discoverx.scanner import Scanner, ScanResult
7+
from discoverx.rules import Rules, Rule
8+
from pyspark.sql import DataFrame, SparkSession
9+
10+
logger = logging.Logging()
11+
12+
13+
class Discovery:
14+
""" """
15+
16+
COLUMNS_TABLE_NAME = "system.information_schema.columns"
17+
MAX_WORKERS = 10
18+
19+
def __init__(
20+
self,
21+
spark: SparkSession,
22+
catalogs: str,
23+
schemas: str,
24+
tables: str,
25+
table_info_list: list[TableInfo],
26+
custom_rules: Optional[List[Rule]] = None,
27+
locale: str = None,
28+
):
29+
self.spark = spark
30+
self._catalogs = catalogs
31+
self._schemas = schemas
32+
self._tables = tables
33+
self._table_info_list = table_info_list
34+
35+
self.scanner: Optional[Scanner] = None
36+
self._scan_result: Optional[ScanResult] = None
37+
self.rules: Optional[Rules] = Rules(custom_rules=custom_rules, locale=locale)
38+
39+
def _msql(self, msql: str, what_if: bool = False, min_score: Optional[float] = None):
40+
logger.debug(f"Executing sql template: {msql}")
41+
42+
msql_builder = Msql(msql)
43+
44+
# check if classification is available
45+
# Check for more specific exception
46+
classification_result_pdf = self._scan_result.get_classes(min_score)
47+
sql_rows = msql_builder.build(classification_result_pdf)
48+
49+
if what_if:
50+
logger.friendly(f"SQL that would be executed:")
51+
52+
for sql_row in sql_rows:
53+
logger.friendly(sql_row.sql)
54+
55+
return None
56+
else:
57+
logger.debug(f"Executing SQL:\n{sql_rows}")
58+
return msql_builder.execute_sql_rows(sql_rows, self.spark)
59+
60+
def scan(
61+
self,
62+
rules="*",
63+
sample_size=10000,
64+
what_if: bool = False,
65+
):
66+
self.scanner = Scanner(
67+
self.spark,
68+
self.rules,
69+
catalogs=self._catalogs,
70+
schemas=self._schemas,
71+
tables=self._tables,
72+
table_list=self._table_info_list,
73+
rule_filter=rules,
74+
sample_size=sample_size,
75+
what_if=what_if,
76+
columns_table_name=self.COLUMNS_TABLE_NAME,
77+
max_workers=self.MAX_WORKERS,
78+
)
79+
80+
self._scan_result = self.scanner.scan()
81+
logger.friendlyHTML(self.scanner.summary_html)
82+
83+
def _check_scan_result(self):
84+
if self._scan_result is None:
85+
raise Exception("You first need to scan your lakehouse using Scanner.scan()")
86+
87+
@property
88+
def scan_result(self):
89+
"""Returns the scan results as a pandas DataFrame
90+
91+
Raises:
92+
Exception: If the scan has not been run
93+
"""
94+
self._check_scan_result()
95+
96+
return self._scan_result.df
97+
98+
def search(
99+
self,
100+
search_term: str,
101+
from_tables: str = "*.*.*",
102+
by_class: Optional[str] = None,
103+
min_score: Optional[float] = None,
104+
):
105+
"""Searches your lakehouse for columns matching the given search term
106+
107+
Args:
108+
search_term (str): The search term to be used to search for columns.
109+
from_tables (str, optional): The tables to be searched in format
110+
"catalog.schema.table", use "*" as a wildcard. Defaults to "*.*.*".
111+
by_class (str, optional): The class to be used to search for columns.
112+
Defaults to None.
113+
min_score (float, optional): Defines the classification score or frequency
114+
threshold for columns to be considered during the scan. Defaults to None
115+
which means that all columns where at least one record matched the
116+
respective rule during the scan will be included. Has to be either None
117+
or between 0 and 1.
118+
119+
Raises:
120+
ValueError: If search_term is not provided
121+
ValueError: If the search_term type is not valid
122+
ValueError: If the by_class type is not valid
123+
124+
Returns:
125+
DataFrame: A dataframe containing the results of the search
126+
"""
127+
128+
Msql.validate_from_components(from_tables)
129+
130+
if search_term is None:
131+
raise ValueError("search_term has not been provided.")
132+
133+
if not isinstance(search_term, str):
134+
raise ValueError(f"The search_term type {type(search_term)} is not valid. Please use a string type.")
135+
136+
if by_class is None:
137+
# Trying to infer the class by the search term
138+
logger.friendly(
139+
"You did not provide any class to be searched."
140+
"We will try to auto-detect matching rules for the given search term"
141+
)
142+
search_matching_rules = self.rules.match_search_term(search_term)
143+
if len(search_matching_rules) == 0:
144+
raise ValueError(
145+
f"Could not infer any class for the given search term. Please specify the by_class parameter."
146+
)
147+
elif len(search_matching_rules) > 1:
148+
raise ValueError(
149+
f"Multiple classes {search_matching_rules} match the given search term ({search_term}). Please specify the class to search in with the by_class parameter."
150+
)
151+
else:
152+
by_class = search_matching_rules[0]
153+
logger.friendly(f"Discoverx will search your lakehouse using the class {by_class}")
154+
elif isinstance(by_class, str):
155+
search_matching_rules = [by_class]
156+
else:
157+
raise ValueError(f"The provided by_class {by_class} must be of string type.")
158+
159+
sql_filter = f"`[{search_matching_rules[0]}]` = '{search_term}'"
160+
select_statement = (
161+
"named_struct("
162+
+ ", ".join(
163+
[
164+
f"'{rule_name}', named_struct('column_name', '[{rule_name}]', 'value', `[{rule_name}]`)"
165+
for rule_name in search_matching_rules
166+
]
167+
)
168+
+ ") AS search_result"
169+
)
170+
171+
where_statement = f"WHERE {sql_filter}"
172+
173+
return self._msql(
174+
f"SELECT {select_statement}, to_json(struct(*)) AS row_content FROM {from_tables} {where_statement}",
175+
min_score=min_score,
176+
)
177+
178+
def select_by_classes(
179+
self,
180+
from_tables: str = "*.*.*",
181+
by_classes: Optional[Union[List[str], str]] = None,
182+
min_score: Optional[float] = None,
183+
):
184+
"""Selects all columns in the lakehouse from tables that match ALL the given classes
185+
186+
Args:
187+
from_tables (str, optional): The tables to be selected in format
188+
"catalog.schema.table", use "*" as a wildcard. Defaults to "*.*.*".
189+
by_classes (Union[List[str], str], optional): The classes to be used to
190+
search for columns. Defaults to None.
191+
min_score (float, optional): Defines the classification score or frequency
192+
threshold for columns to be considered during the scan. Defaults to None
193+
which means that all columns where at least one record matched the
194+
respective rule during the scan will be included. Has to be either None
195+
or between 0 and 1.
196+
197+
Raises:
198+
ValueError: If the by_classes type is not valid
199+
200+
Returns:
201+
DataFrame: A dataframe containing the UNION ALL results of the select"""
202+
203+
Msql.validate_from_components(from_tables)
204+
205+
if isinstance(by_classes, str):
206+
by_classes = [by_classes]
207+
elif isinstance(by_classes, list) and all(isinstance(elem, str) for elem in by_classes):
208+
by_classes = by_classes
209+
else:
210+
raise ValueError(
211+
f"The provided by_classes {by_classes} have the wrong type. Please provide"
212+
f" either a str or List[str]."
213+
)
214+
215+
from_statement = (
216+
"named_struct("
217+
+ ", ".join(
218+
[
219+
f"'{class_name}', named_struct('column_name', '[{class_name}]', 'value', `[{class_name}]`)"
220+
for class_name in by_classes
221+
]
222+
)
223+
+ ") AS classified_columns"
224+
)
225+
226+
return self._msql(
227+
f"SELECT {from_statement}, to_json(struct(*)) AS row_content FROM {from_tables}", min_score=min_score
228+
)
229+
230+
def delete_by_class(
231+
self,
232+
from_tables="*.*.*",
233+
by_class: str = None,
234+
values: Optional[Union[List[str], str]] = None,
235+
yes_i_am_sure: bool = False,
236+
min_score: Optional[float] = None,
237+
):
238+
"""Deletes all rows in the lakehouse that match any of the provided values in a column classified with the given class
239+
240+
Args:
241+
from_tables (str, optional): The tables to delete from in format
242+
"catalog.schema.table", use "*" as a wildcard. Defaults to "*.*.*".
243+
by_class (str, optional): The class to be used to search for columns.
244+
Defaults to None.
245+
values (Union[List[str], str], optional): The values to be deleted.
246+
Defaults to None.
247+
yes_i_am_sure (bool, optional): Whether you are sure that you want to delete
248+
the data. If False prints the SQL statements instead of executing them. Defaults to False.
249+
min_score (float, optional): Defines the classification score or frequency
250+
threshold for columns to be considered during the scan. Defaults to None
251+
which means that all columns where at least one record matched the
252+
respective rule during the scan will be included. Has to be either None
253+
or between 0 and 1.
254+
255+
Raises:
256+
ValueError: If the from_tables is not valid
257+
ValueError: If the by_class is not valid
258+
ValueError: If the values is not valid
259+
"""
260+
261+
Msql.validate_from_components(from_tables)
262+
263+
if (by_class is None) or (not isinstance(by_class, str)):
264+
raise ValueError(f"Please provide a class to identify the columns to be matched on the provided values.")
265+
266+
if values is None:
267+
raise ValueError(
268+
f"Please specify the values to be deleted. You can either provide a list of values or a single value."
269+
)
270+
elif isinstance(values, str):
271+
value_string = f"'{values}'"
272+
elif isinstance(values, list) and all(isinstance(elem, str) for elem in values):
273+
value_string = "'" + "', '".join(values) + "'"
274+
else:
275+
raise ValueError(
276+
f"The provided values {values} have the wrong type. Please provide" f" either a str or List[str]."
277+
)
278+
279+
if not yes_i_am_sure:
280+
logger.friendly(
281+
f"Please confirm that you want to delete the following values from the table {from_tables} using the class {by_class}: {values}"
282+
)
283+
logger.friendly(
284+
f"If you are sure, please run the same command again but set the parameter yes_i_am_sure to True."
285+
)
286+
287+
delete_result = self._msql(
288+
f"DELETE FROM {from_tables} WHERE `[{by_class}]` IN ({value_string})",
289+
what_if=(not yes_i_am_sure),
290+
min_score=min_score,
291+
)
292+
293+
if delete_result is not None:
294+
delete_result = delete_result.toPandas()
295+
logger.friendlyHTML(f"<p>The affected tables are</p>{delete_result.to_html()}")

0 commit comments

Comments
 (0)