Skip to content

Commit 6eb5fbe

Browse files
authored
De-duplicated table info fetching code (#78)
* De-duplicated table info fetching code * Improved info fetch speed * Updated SQL to null-safe equality
1 parent ba9bc8e commit 6eb5fbe

File tree

7 files changed

+144
-219
lines changed

7 files changed

+144
-219
lines changed

discoverx/discovery.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
from discoverx import logging
44
from discoverx.msql import Msql
5-
from discoverx.scanner import TableInfo
5+
from discoverx.table_info import TableInfo
66
from discoverx.scanner import Scanner, ScanResult
77
from discoverx.rules import Rules, Rule
8-
from pyspark.sql import DataFrame, SparkSession
8+
from pyspark.sql import SparkSession
99

1010
logger = logging.Logging()
1111

discoverx/explorer.py

Lines changed: 2 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2,108 +2,18 @@
22
import copy
33
import re
44
from typing import Optional, List
5-
65
from discoverx import logging
76
from discoverx.common import helper
87
from discoverx.discovery import Discovery
98
from discoverx.rules import Rule
10-
from discoverx.scanner import ColumnInfo, TableInfo
119
from functools import reduce
1210
from pyspark.sql import DataFrame, SparkSession
1311
from pyspark.sql.functions import lit
14-
from pyspark.sql.types import Row
15-
16-
logger = logging.Logging()
17-
18-
19-
class InfoFetcher:
20-
def __init__(self, spark, columns_table_name="system.information_schema.columns") -> None:
21-
self.columns_table_name = columns_table_name
22-
self.spark = spark
23-
24-
def _to_info_list(self, info_rows: list[Row]) -> list[TableInfo]:
25-
filtered_tables = [
26-
TableInfo(
27-
row["table_catalog"],
28-
row["table_schema"],
29-
row["table_name"],
30-
[
31-
ColumnInfo(col["column_name"], col["data_type"], col["partition_index"], [])
32-
for col in row["table_columns"]
33-
],
34-
)
35-
for row in info_rows
36-
]
37-
return filtered_tables
38-
39-
def get_tables_info(self, catalogs: str, schemas: str, tables: str, columns: list[str] = []) -> list[TableInfo]:
40-
# Filter tables by matching filter
41-
table_list_sql = self._get_table_list_sql(catalogs, schemas, tables, columns)
42-
43-
filtered_tables = self.spark.sql(table_list_sql).collect()
44-
45-
if len(filtered_tables) == 0:
46-
raise ValueError(f"No tables found matching filter: {catalogs}.{schemas}.{tables}")
47-
48-
return self._to_info_list(filtered_tables)
49-
50-
def _get_table_list_sql(self, catalogs: str, schemas: str, tables: str, columns: list[str] = []) -> str:
51-
"""
52-
Returns a SQL expression which returns a list of columns matching
53-
the specified filters
54-
55-
Returns:
56-
string: The SQL expression
57-
"""
58-
59-
if "*" in catalogs:
60-
catalog_sql = f"""AND regexp_like(table_catalog, "^{catalogs.replace("*", ".*")}$")"""
61-
else:
62-
catalog_sql = f"""AND table_catalog = "{catalogs}" """
63-
64-
if "*" in schemas:
65-
schema_sql = f"""AND regexp_like(table_schema, "^{schemas.replace("*", ".*")}$")"""
66-
else:
67-
schema_sql = f"""AND table_schema = "{schemas}" """
6812

69-
if "*" in tables:
70-
table_sql = f"""AND regexp_like(table_name, "^{tables.replace("*", ".*")}$")"""
71-
else:
72-
table_sql = f"""AND table_name = "{tables}" """
73-
74-
if columns:
75-
match_any_col = "|".join([f'({c.replace("*", ".*")})' for c in columns])
76-
columns_sql = f"""AND regexp_like(column_name, "^{match_any_col}$")"""
77-
78-
sql = f"""
79-
WITH tb_list AS (
80-
SELECT DISTINCT
81-
table_catalog,
82-
table_schema,
83-
table_name
84-
FROM {self.columns_table_name}
85-
WHERE
86-
table_schema != "information_schema"
87-
{catalog_sql if catalogs != "*" else ""}
88-
{schema_sql if schemas != "*" else ""}
89-
{table_sql if tables != "*" else ""}
90-
{columns_sql if columns else ""}
91-
)
13+
from discoverx.table_info import InfoFetcher, TableInfo
9214

93-
SELECT
94-
info_schema.table_catalog,
95-
info_schema.table_schema,
96-
info_schema.table_name,
97-
collect_list(struct(column_name, data_type, partition_index)) as table_columns
98-
FROM {self.columns_table_name} info_schema
99-
INNER JOIN tb_list ON (
100-
info_schema.table_catalog <=> tb_list.table_catalog AND
101-
info_schema.table_schema = tb_list.table_schema AND
102-
info_schema.table_name = tb_list.table_name)
103-
GROUP BY info_schema.table_catalog, info_schema.table_schema, info_schema.table_name
104-
"""
10515

106-
return helper.strip_margin(sql)
16+
logger = logging.Logging()
10717

10818

10919
class DataExplorer:

discoverx/msql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from dataclasses import dataclass
33
from functools import reduce
44
from discoverx import logging
5-
from discoverx.scanner import ColumnInfo, TableInfo
5+
from discoverx.table_info import ColumnInfo, TableInfo
66
from discoverx.common.helper import strip_margin
77
from fnmatch import fnmatch
88
from pyspark.sql.functions import lit

discoverx/scanner.py

Lines changed: 3 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,12 @@
99

1010
from discoverx.common.helper import strip_margin, format_regex
1111
from discoverx import logging
12+
from discoverx.table_info import InfoFetcher, TableInfo
1213
from discoverx.rules import Rules, RuleTypes
1314

1415
logger = logging.Logging()
1516

1617

17-
@dataclass
18-
class ColumnInfo:
19-
name: str
20-
data_type: str
21-
partition_index: int
22-
classes: list[str]
23-
24-
25-
@dataclass
26-
class TableInfo:
27-
catalog: Optional[str]
28-
schema: str
29-
table: str
30-
columns: list[ColumnInfo]
31-
32-
def get_columns_by_class(self, class_name: str):
33-
return [ClassifiedColumn(col.name, class_name) for col in self.columns if class_name in col.classes]
34-
35-
36-
@dataclass
37-
class ClassifiedColumn:
38-
name: str
39-
class_name: str
40-
41-
4218
@dataclass
4319
class ScanContent:
4420
table_list: List[TableInfo]
@@ -181,59 +157,12 @@ def __init__(
181157
self.rule_list = self.rules.get_rules(rule_filter=self.rules_filter)
182158
self.scan_result: Optional[ScanResult] = None
183159

184-
def _get_list_of_tables(self) -> List[TableInfo]:
185-
table_list_sql = self._get_table_list_sql()
186-
187-
rows = self.spark.sql(table_list_sql).collect()
188-
filtered_tables = [
189-
TableInfo(
190-
row["table_catalog"],
191-
row["table_schema"],
192-
row["table_name"],
193-
[
194-
ColumnInfo(col["column_name"], col["data_type"], col["partition_index"], [])
195-
for col in row["table_columns"]
196-
],
197-
)
198-
for row in rows
199-
]
200-
return filtered_tables
201-
202-
def _get_table_list_sql(self):
203-
"""
204-
Returns a SQL expression which returns a list of columns matching
205-
the specified filters
206-
207-
Returns:
208-
string: The SQL expression
209-
"""
210-
211-
catalog_sql = f"""AND regexp_like(table_catalog, "^{self.catalogs.replace("*", ".*")}$")"""
212-
schema_sql = f"""AND regexp_like(table_schema, "^{self.schemas.replace("*", ".*")}$")"""
213-
table_sql = f"""AND regexp_like(table_name, "^{self.tables.replace("*", ".*")}$")"""
214-
215-
sql = f"""
216-
SELECT
217-
table_catalog,
218-
table_schema,
219-
table_name,
220-
collect_list(struct(column_name, data_type, partition_index)) as table_columns
221-
FROM {self.columns_table_name}
222-
WHERE
223-
table_schema != "information_schema"
224-
{catalog_sql if self.catalogs != "*" else ""}
225-
{schema_sql if self.schemas != "*" else ""}
226-
{table_sql if self.tables != "*" else ""}
227-
GROUP BY table_catalog, table_schema, table_name
228-
"""
229-
230-
return strip_margin(sql)
231-
232160
def _resolve_scan_content(self) -> ScanContent:
233161
if self.table_list:
234162
table_list = self.table_list
235163
else:
236-
table_list = self._get_list_of_tables()
164+
info_fetcher = InfoFetcher(self.spark, columns_table_name=self.columns_table_name)
165+
table_list = info_fetcher.get_tables_info(self.catalogs, self.schemas, self.tables)
237166
catalogs = set(map(lambda x: x.catalog, table_list))
238167
schemas = set(map(lambda x: f"{x.catalog}.{x.schema}", table_list))
239168

discoverx/table_info.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
from typing import Optional
2+
from discoverx.common import helper
3+
from pyspark.sql.types import Row
4+
from dataclasses import dataclass
5+
6+
7+
@dataclass
8+
class ColumnInfo:
9+
name: str
10+
data_type: str
11+
partition_index: int
12+
classes: list[str]
13+
14+
15+
@dataclass
16+
class TableInfo:
17+
catalog: Optional[str]
18+
schema: str
19+
table: str
20+
columns: list[ColumnInfo]
21+
22+
def get_columns_by_class(self, class_name: str):
23+
return [ClassifiedColumn(col.name, class_name) for col in self.columns if class_name in col.classes]
24+
25+
26+
@dataclass
27+
class ClassifiedColumn:
28+
name: str
29+
class_name: str
30+
31+
32+
class InfoFetcher:
33+
def __init__(self, spark, columns_table_name="system.information_schema.columns") -> None:
34+
self.columns_table_name = columns_table_name
35+
self.spark = spark
36+
37+
def _to_info_list(self, info_rows: list[Row]) -> list[TableInfo]:
38+
filtered_tables = [
39+
TableInfo(
40+
row["table_catalog"],
41+
row["table_schema"],
42+
row["table_name"],
43+
[
44+
ColumnInfo(col["column_name"], col["data_type"], col["partition_index"], [])
45+
for col in row["table_columns"]
46+
],
47+
)
48+
for row in info_rows
49+
]
50+
return filtered_tables
51+
52+
def get_tables_info(self, catalogs: str, schemas: str, tables: str, columns: list[str] = []) -> list[TableInfo]:
53+
# Filter tables by matching filter
54+
table_list_sql = self._get_table_list_sql(catalogs, schemas, tables, columns)
55+
56+
filtered_tables = self.spark.sql(table_list_sql).collect()
57+
58+
if len(filtered_tables) == 0:
59+
raise ValueError(f"No tables found matching filter: {catalogs}.{schemas}.{tables}")
60+
61+
return self._to_info_list(filtered_tables)
62+
63+
def _get_table_list_sql(self, catalogs: str, schemas: str, tables: str, columns: list[str] = []) -> str:
64+
"""
65+
Returns a SQL expression which returns a list of columns matching
66+
the specified filters
67+
68+
Returns:
69+
string: The SQL expression
70+
"""
71+
72+
if "*" in catalogs:
73+
catalog_sql = f"""AND regexp_like(table_catalog, "^{catalogs.replace("*", ".*")}$")"""
74+
else:
75+
catalog_sql = f"""AND table_catalog = "{catalogs}" """
76+
77+
if "*" in schemas:
78+
schema_sql = f"""AND regexp_like(table_schema, "^{schemas.replace("*", ".*")}$")"""
79+
else:
80+
schema_sql = f"""AND table_schema = "{schemas}" """
81+
82+
if "*" in tables:
83+
table_sql = f"""AND regexp_like(table_name, "^{tables.replace("*", ".*")}$")"""
84+
else:
85+
table_sql = f"""AND table_name = "{tables}" """
86+
87+
if columns:
88+
match_any_col = "|".join([f'({c.replace("*", ".*")})' for c in columns])
89+
columns_sql = f"""AND regexp_like(column_name, "^{match_any_col}$")"""
90+
91+
sql = f"""
92+
WITH tb_list AS (
93+
SELECT DISTINCT
94+
table_catalog,
95+
table_schema,
96+
table_name
97+
FROM {self.columns_table_name}
98+
WHERE
99+
table_schema != "information_schema"
100+
AND NOT table_catalog <=> "system"
101+
{catalog_sql if catalogs != "*" else ""}
102+
{schema_sql if schemas != "*" else ""}
103+
{table_sql if tables != "*" else ""}
104+
{columns_sql if columns else ""}
105+
),
106+
107+
col_list AS (
108+
SELECT
109+
info_schema.table_catalog,
110+
info_schema.table_schema,
111+
info_schema.table_name,
112+
collect_list(struct(column_name, data_type, partition_index)) as table_columns
113+
FROM {self.columns_table_name} info_schema
114+
WHERE
115+
table_schema != "information_schema"
116+
AND NOT table_catalog <=> "system"
117+
{catalog_sql if catalogs != "*" else ""}
118+
{schema_sql if schemas != "*" else ""}
119+
{table_sql if tables != "*" else ""}
120+
GROUP BY info_schema.table_catalog, info_schema.table_schema, info_schema.table_name
121+
)
122+
123+
SELECT
124+
col_list.*
125+
FROM col_list
126+
INNER JOIN tb_list ON (
127+
col_list.table_catalog <=> tb_list.table_catalog AND
128+
col_list.table_schema = tb_list.table_schema AND
129+
col_list.table_name = tb_list.table_name)
130+
131+
"""
132+
133+
return helper.strip_margin(sql)

tests/unit/msql_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pandas as pd
55
import pytest
66
from discoverx.common.helper import strip_margin
7-
from discoverx.scanner import ColumnInfo, TableInfo
7+
from discoverx.table_info import ColumnInfo, TableInfo
88
from discoverx.msql import Msql, SQLRow
99
from discoverx.scanner import ScanResult
1010

0 commit comments

Comments
 (0)