Skip to content

Commit 895722a

Browse files
authored
Tag info metadata (#74)
* Improved explorer filter performance * Added tag info in query * Added table tags info * Added map function * Fixed information schema refactor * Fixed map and added example * Fixed some of the unit tests * Fixed unit tests * Improved test coverage * Improved test coverage * Added all types of tags on query * Fixed unit test * Added with_tags to explorer * Fixed tags info * Deduplicated TableInfo * Fixed discovery class * black reformat * Added table info map example * Added docs for map function and refactored * Fixed docstring * black reformat
1 parent e78d16c commit 895722a

19 files changed

+765
-99
lines changed

discoverx/discovery.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
class Discovery:
1414
""" """
1515

16-
COLUMNS_TABLE_NAME = "system.information_schema.columns"
16+
INFORMATION_SCHEMA = "system.information_schema"
1717
MAX_WORKERS = 10
1818

1919
def __init__(
@@ -73,7 +73,7 @@ def scan(
7373
rule_filter=rules,
7474
sample_size=sample_size,
7575
what_if=what_if,
76-
columns_table_name=self.COLUMNS_TABLE_NAME,
76+
information_schema=self.INFORMATION_SCHEMA,
7777
max_workers=self.MAX_WORKERS,
7878
)
7979

@@ -229,7 +229,8 @@ def select_by_classes(
229229
)
230230

231231
return self._msql(
232-
f"SELECT {from_statement}, to_json(struct(*)) AS row_content FROM {from_tables}", min_score=min_score
232+
f"SELECT {from_statement}, to_json(struct(*)) AS row_content FROM {from_tables}",
233+
min_score=min_score,
233234
)
234235

235236
def delete_by_class(

discoverx/dx.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class DX:
2525
Defaults to None.
2626
"""
2727

28-
COLUMNS_TABLE_NAME = "system.information_schema.columns"
28+
INFORMATION_SCHEMA = "system.information_schema"
2929
MAX_WORKERS = 10
3030

3131
def __init__(
@@ -49,10 +49,10 @@ def __init__(
4949

5050
def _can_read_columns_table(self) -> bool:
5151
try:
52-
self.spark.sql(f"SELECT * FROM {self.COLUMNS_TABLE_NAME} WHERE table_catalog = 'system' LIMIT 1")
52+
self.spark.sql(f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' LIMIT 1")
5353
return True
5454
except Exception as e:
55-
self.logger.error(f"Error while reading table {self.COLUMNS_TABLE_NAME}: {e}")
55+
self.logger.error(f"Error while reading table {self.INFORMATION_SCHEMA}.columns: {e}")
5656
return False
5757

5858
def intro(self):
@@ -137,7 +137,7 @@ def scan(
137137
rule_filter=rules,
138138
sample_size=sample_size,
139139
what_if=what_if,
140-
columns_table_name=self.COLUMNS_TABLE_NAME,
140+
information_schema=self.INFORMATION_SCHEMA,
141141
max_workers=self.MAX_WORKERS,
142142
)
143143

@@ -400,7 +400,7 @@ def from_tables(self, from_tables: str = "*.*.*"):
400400
401401
"""
402402

403-
return DataExplorer(from_tables, self.spark, InfoFetcher(self.spark, self.COLUMNS_TABLE_NAME))
403+
return DataExplorer(from_tables, self.spark, InfoFetcher(self.spark, self.INFORMATION_SCHEMA))
404404

405405
def _msql(self, msql: str, what_if: bool = False, min_score: Optional[float] = None):
406406
self.logger.debug(f"Executing sql template: {msql}")

discoverx/explorer.py

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
from discoverx.common import helper
77
from discoverx.discovery import Discovery
88
from discoverx.rules import Rule
9+
from discoverx.table_info import TagsInfo, ColumnTagInfo, TagInfo, ColumnInfo, TableInfo
910
from functools import reduce
1011
from pyspark.sql import DataFrame, SparkSession
1112
from pyspark.sql.functions import lit
12-
1313
from discoverx.table_info import InfoFetcher, TableInfo
1414

1515

@@ -21,12 +21,17 @@ class DataExplorer:
2121

2222
def __init__(self, from_tables, spark: SparkSession, info_fetcher: InfoFetcher) -> None:
2323
self._from_tables = from_tables
24-
self._catalogs, self._schemas, self._tables = DataExplorer.validate_from_components(from_tables)
24+
(
25+
self._catalogs,
26+
self._schemas,
27+
self._tables,
28+
) = DataExplorer.validate_from_components(from_tables)
2529
self._spark = spark
2630
self._info_fetcher = info_fetcher
2731
self._having_columns = []
2832
self._sql_query_template = None
2933
self._max_concurrency = 10
34+
self._with_tags = False
3035

3136
@staticmethod
3237
def validate_from_components(from_tables: str):
@@ -48,6 +53,7 @@ def __deepcopy__(self, memo):
4853
new_obj._having_columns = copy.deepcopy(self._having_columns)
4954
new_obj._sql_query_template = copy.deepcopy(self._sql_query_template)
5055
new_obj._max_concurrency = copy.deepcopy(self._max_concurrency)
56+
new_obj._with_tags = copy.deepcopy(self._with_tags)
5157

5258
new_obj._spark = self._spark
5359
new_obj._info_fetcher = self._info_fetcher
@@ -70,6 +76,12 @@ def with_concurrency(self, max_concurrency) -> "DataExplorer":
7076
new_obj._max_concurrency = max_concurrency
7177
return new_obj
7278

79+
def with_tags(self, use_tags=True) -> "DataExplorer":
80+
"""Defines if tags should be collected when getting table metadata"""
81+
new_obj = copy.deepcopy(self)
82+
new_obj._with_tags = use_tags
83+
return new_obj
84+
7385
def with_sql(self, sql_query_template: str) -> "DataExplorerActions":
7486
"""Sets the SQL query template to use for the data exploration
7587
@@ -135,10 +147,44 @@ def scan(
135147
discover.scan(rules=rules, sample_size=sample_size, what_if=what_if)
136148
return discover
137149

150+
def map(self, f) -> list[any]:
151+
"""Runs a function for each table in the data explorer
152+
153+
Args:
154+
f (function): The function to run. The function should accept a TableInfo object as input and return any object as output.
155+
156+
Returns:
157+
list[any]: A list of the results of running the function for each table
158+
"""
159+
res = []
160+
table_list = self._info_fetcher.get_tables_info(
161+
self._catalogs,
162+
self._schemas,
163+
self._tables,
164+
self._having_columns,
165+
self._with_tags,
166+
)
167+
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_concurrency) as executor:
168+
# Submit tasks to the thread pool
169+
futures = [executor.submit(f, table_info) for table_info in table_list]
170+
171+
# Process completed tasks
172+
for future in concurrent.futures.as_completed(futures):
173+
result = future.result()
174+
if result is not None:
175+
res.append(result)
176+
177+
logger.debug("Finished lakehouse map task")
178+
179+
return res
180+
138181

139182
class DataExplorerActions:
140183
def __init__(
141-
self, data_explorer: DataExplorer, spark: SparkSession = None, info_fetcher: InfoFetcher = None
184+
self,
185+
data_explorer: DataExplorer,
186+
spark: SparkSession = None,
187+
info_fetcher: InfoFetcher = None,
142188
) -> None:
143189
self._data_explorer = data_explorer
144190
if spark is None:
@@ -193,10 +239,18 @@ def _get_sql_commands(self, data_explorer: DataExplorer) -> list[tuple[str, Tabl
193239
logger.debug("Launching lakehouse scanning task\n")
194240

195241
table_list = self._info_fetcher.get_tables_info(
196-
data_explorer._catalogs, data_explorer._schemas, data_explorer._tables, data_explorer._having_columns
242+
data_explorer._catalogs,
243+
data_explorer._schemas,
244+
data_explorer._tables,
245+
data_explorer._having_columns,
246+
data_explorer._with_tags,
197247
)
198248
sql_commands = [
199-
(DataExplorerActions._build_sql(data_explorer._sql_query_template, table), table) for table in table_list
249+
(
250+
DataExplorerActions._build_sql(data_explorer._sql_query_template, table),
251+
table,
252+
)
253+
for table in table_list
200254
]
201255
return sql_commands
202256

discoverx/msql.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def build(self, classified_result_pdf) -> list[SQLRow]:
9797
row[1],
9898
row[2],
9999
[ColumnInfo(col[0], "", None, col[1]) for col in row[3]], # col name # TODO # TODO # Classes
100+
None,
100101
)
101102
for _, row in df.iterrows()
102103
if fnmatch(row[0], self.catalogs) and fnmatch(row[1], self.schemas) and fnmatch(row[2], self.tables)

discoverx/scanner.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from discoverx.common.helper import strip_margin, format_regex
1111
from discoverx import logging
12-
from discoverx.table_info import InfoFetcher, TableInfo
12+
from discoverx.table_info import InfoFetcher, TableInfo, ColumnInfo
1313
from discoverx.rules import Rules, RuleTypes
1414

1515
logger = logging.Logging()
@@ -138,7 +138,7 @@ def __init__(
138138
rule_filter: str = "*",
139139
sample_size: int = 1000,
140140
what_if: bool = False,
141-
columns_table_name: str = "",
141+
information_schema: str = "",
142142
max_workers: int = 10,
143143
):
144144
self.spark = spark
@@ -150,18 +150,67 @@ def __init__(
150150
self.rules_filter = rule_filter
151151
self.sample_size = sample_size
152152
self.what_if = what_if
153-
self.columns_table_name = columns_table_name
153+
self.information_schema = information_schema
154154
self.max_workers = max_workers
155155

156156
self.content: ScanContent = self._resolve_scan_content()
157157
self.rule_list = self.rules.get_rules(rule_filter=self.rules_filter)
158158
self.scan_result: Optional[ScanResult] = None
159159

160+
def _get_list_of_tables(self) -> List[TableInfo]:
161+
table_list_sql = self._get_table_list_sql()
162+
163+
rows = self.spark.sql(table_list_sql).collect()
164+
filtered_tables = [
165+
TableInfo(
166+
row["table_catalog"],
167+
row["table_schema"],
168+
row["table_name"],
169+
[
170+
ColumnInfo(col["column_name"], col["data_type"], col["partition_index"], [])
171+
for col in row["table_columns"]
172+
],
173+
None,
174+
)
175+
for row in rows
176+
]
177+
return filtered_tables
178+
179+
def _get_table_list_sql(self):
180+
"""
181+
Returns a SQL expression which returns a list of columns matching
182+
the specified filters
183+
184+
Returns:
185+
string: The SQL expression
186+
"""
187+
188+
catalog_sql = f"""AND regexp_like(table_catalog, "^{self.catalogs.replace("*", ".*")}$")"""
189+
schema_sql = f"""AND regexp_like(table_schema, "^{self.schemas.replace("*", ".*")}$")"""
190+
table_sql = f"""AND regexp_like(table_name, "^{self.tables.replace("*", ".*")}$")"""
191+
192+
sql = f"""
193+
SELECT
194+
table_catalog,
195+
table_schema,
196+
table_name,
197+
collect_list(struct(column_name, data_type, partition_index)) as table_columns
198+
FROM {self.information_schema}.columns
199+
WHERE
200+
table_schema != "information_schema"
201+
{catalog_sql if self.catalogs != "*" else ""}
202+
{schema_sql if self.schemas != "*" else ""}
203+
{table_sql if self.tables != "*" else ""}
204+
GROUP BY table_catalog, table_schema, table_name
205+
"""
206+
207+
return strip_margin(sql)
208+
160209
def _resolve_scan_content(self) -> ScanContent:
161210
if self.table_list:
162211
table_list = self.table_list
163212
else:
164-
info_fetcher = InfoFetcher(self.spark, columns_table_name=self.columns_table_name)
213+
info_fetcher = InfoFetcher(self.spark, information_schema=self.information_schema)
165214
table_list = info_fetcher.get_tables_info(self.catalogs, self.schemas, self.tables)
166215
catalogs = set(map(lambda x: x.catalog, table_list))
167216
schemas = set(map(lambda x: f"{x.catalog}.{x.schema}", table_list))

0 commit comments

Comments
 (0)