11import dataclasses
22import logging
33from collections .abc import Iterable
4- from dataclasses import asdict , dataclass
4+ from dataclasses import asdict , dataclass , field
55from datetime import datetime , timezone
66
77from databricks .sdk import WorkspaceClient
1212
1313from databricks .labs .ucx .framework .utils import escape_sql_identifier
1414from databricks .labs .ucx .hive_metastore .table_migration_status import TableMigrationIndex
15- from databricks .labs .ucx .source_code .base import CurrentSessionState , LineageAtom
15+ from databricks .labs .ucx .source_code .base import CurrentSessionState , LineageAtom , UsedTable
1616from databricks .labs .ucx .source_code .directfs_access import DirectFsAccessCrawler , DirectFsAccess
1717from databricks .labs .ucx .source_code .linters .context import LinterContext
18- from databricks .labs .ucx .source_code .linters .directfs import DirectFsAccessSqlLinter
1918from databricks .labs .ucx .source_code .redash import Redash
19+ from databricks .labs .ucx .source_code .used_table import UsedTablesCrawler
2020
2121logger = logging .getLogger (__name__ )
2222
@@ -33,60 +33,92 @@ class QueryProblem:
3333 message : str
3434
3535
36+ @dataclass
37+ class _ReportingContext :
38+ linted_queries : set [str ] = field (default_factory = set )
39+ all_problems : list [QueryProblem ] = field (default_factory = list )
40+ all_dfsas : list [DirectFsAccess ] = field (default_factory = list )
41+ all_tables : list [UsedTable ] = field (default_factory = list )
42+
43+
3644class QueryLinter :
3745
3846 def __init__ (
3947 self ,
4048 ws : WorkspaceClient ,
4149 migration_index : TableMigrationIndex ,
4250 directfs_crawler : DirectFsAccessCrawler ,
51+ used_tables_crawler : UsedTablesCrawler ,
4352 include_dashboard_ids : list [str ] | None ,
4453 ):
4554 self ._ws = ws
4655 self ._migration_index = migration_index
4756 self ._directfs_crawler = directfs_crawler
57+ self ._used_tables_crawler = used_tables_crawler
4858 self ._include_dashboard_ids = include_dashboard_ids
4959
5060 def refresh_report (self , sql_backend : SqlBackend , inventory_database : str ):
5161 assessment_start = datetime .now (timezone .utc )
52- dashboard_ids = self ._dashboard_ids_in_scope ()
53- logger .info (f"Running { len (dashboard_ids )} linting tasks..." )
54- linted_queries : set [str ] = set ()
55- all_problems : list [QueryProblem ] = []
56- all_dfsas : list [DirectFsAccess ] = []
57- # first lint and collect queries from dashboards
58- for dashboard_id in dashboard_ids :
59- dashboard = self ._ws .dashboards .get (dashboard_id = dashboard_id )
60- problems , dfsas = self ._lint_and_collect_from_dashboard (dashboard , linted_queries )
61- all_problems .extend (problems )
62- all_dfsas .extend (dfsas )
63- for query in self ._queries_in_scope ():
64- if query .id in linted_queries :
65- continue
66- linted_queries .add (query .id )
67- problems = self .lint_query (query )
68- all_problems .extend (problems )
69- dfsas = self .collect_dfsas_from_query ("no-dashboard-id" , query )
70- all_dfsas .extend (dfsas )
71- # dump problems
72- logger .info (f"Saving { len (all_problems )} linting problems..." )
62+ context = _ReportingContext ()
63+ self ._lint_dashboards (context )
64+ self ._lint_queries (context )
65+ assessment_end = datetime .now (timezone .utc )
66+ self ._dump_problems (context , sql_backend , inventory_database )
67+ self ._dump_dfsas (context , assessment_start , assessment_end )
68+ self ._dump_used_tables (context , assessment_start , assessment_end )
69+
70+ def _dump_problems (self , context : _ReportingContext , sql_backend : SqlBackend , inventory_database : str ):
71+ logger .info (f"Saving { len (context .all_problems )} linting problems..." )
7372 sql_backend .save_table (
7473 f'{ escape_sql_identifier (inventory_database )} .query_problems' ,
75- all_problems ,
74+ context . all_problems ,
7675 QueryProblem ,
7776 mode = 'overwrite' ,
7877 )
79- # dump dfsas
80- assessment_end = datetime . now ( timezone . utc )
81- processed = []
82- for dfsa in all_dfsas :
78+
79+ def _dump_dfsas ( self , context : _ReportingContext , assessment_start : datetime , assessment_end : datetime ):
80+ processed_dfsas = []
81+ for dfsa in context . all_dfsas :
8382 dfsa = dataclasses .replace (
8483 dfsa ,
8584 assessment_start_timestamp = assessment_start ,
8685 assessment_end_timestamp = assessment_end ,
8786 )
88- processed .append (dfsa )
89- self ._directfs_crawler .dump_all (processed )
87+ processed_dfsas .append (dfsa )
88+ self ._directfs_crawler .dump_all (processed_dfsas )
89+
90+ def _dump_used_tables (self , context : _ReportingContext , assessment_start : datetime , assessment_end : datetime ):
91+ processed_tables = []
92+ for table in context .all_tables :
93+ table = dataclasses .replace (
94+ table ,
95+ assessment_start_timestamp = assessment_start ,
96+ assessment_end_timestamp = assessment_end ,
97+ )
98+ processed_tables .append (table )
99+ self ._used_tables_crawler .dump_all (processed_tables )
100+
101+ def _lint_dashboards (self , context : _ReportingContext ):
102+ dashboard_ids = self ._dashboard_ids_in_scope ()
103+ logger .info (f"Running { len (dashboard_ids )} linting tasks..." )
104+ for dashboard_id in dashboard_ids :
105+ dashboard = self ._ws .dashboards .get (dashboard_id = dashboard_id )
106+ problems , dfsas , tables = self ._lint_and_collect_from_dashboard (dashboard , context .linted_queries )
107+ context .all_problems .extend (problems )
108+ context .all_dfsas .extend (dfsas )
109+ context .all_tables .extend (tables )
110+
111+ def _lint_queries (self , context : _ReportingContext ):
112+ for query in self ._queries_in_scope ():
113+ if query .id in context .linted_queries :
114+ continue
115+ context .linted_queries .add (query .id )
116+ problems = self .lint_query (query )
117+ context .all_problems .extend (problems )
118+ dfsas = self .collect_dfsas_from_query ("no-dashboard-id" , query )
119+ context .all_dfsas .extend (dfsas )
120+ tables = self .collect_used_tables_from_query ("no-dashboard-id" , query )
121+ context .all_tables .extend (tables )
90122
91123 def _dashboard_ids_in_scope (self ) -> list [str ]:
92124 if self ._include_dashboard_ids is not None : # an empty list is accepted
@@ -102,10 +134,11 @@ def _queries_in_scope(self):
102134
103135 def _lint_and_collect_from_dashboard (
104136 self , dashboard : Dashboard , linted_queries : set [str ]
105- ) -> tuple [Iterable [QueryProblem ], Iterable [DirectFsAccess ]]:
137+ ) -> tuple [Iterable [QueryProblem ], Iterable [DirectFsAccess ], Iterable [ UsedTable ] ]:
106138 dashboard_queries = Redash .get_queries_from_dashboard (dashboard )
107139 query_problems : list [QueryProblem ] = []
108140 query_dfsas : list [DirectFsAccess ] = []
141+ query_tables : list [UsedTable ] = []
109142 dashboard_id = dashboard .id or "<no-id>"
110143 dashboard_parent = dashboard .parent or "<orphan>"
111144 dashboard_name = dashboard .name or "<anonymous>"
@@ -134,7 +167,16 @@ def _lint_and_collect_from_dashboard(
134167 )
135168 source_lineage = [atom ] + dfsa .source_lineage
136169 query_dfsas .append (dataclasses .replace (dfsa , source_lineage = source_lineage ))
137- return query_problems , query_dfsas
170+ tables = self .collect_used_tables_from_query (dashboard_id , query )
171+ for table in tables :
172+ atom = LineageAtom (
173+ object_type = "DASHBOARD" ,
174+ object_id = dashboard_id ,
175+ other = {"parent" : dashboard_parent , "name" : dashboard_name },
176+ )
177+ source_lineage = [atom ] + table .source_lineage
178+ query_tables .append (dataclasses .replace (table , source_lineage = source_lineage ))
179+ return query_problems , query_dfsas , query_tables
138180
139181 def lint_query (self , query : LegacyQuery ) -> Iterable [QueryProblem ]:
140182 if not query .query :
@@ -156,20 +198,34 @@ def lint_query(self, query: LegacyQuery) -> Iterable[QueryProblem]:
156198 message = advice .message ,
157199 )
158200
159- @classmethod
160- def collect_dfsas_from_query (cls , dashboard_id : str , query : LegacyQuery ) -> Iterable [DirectFsAccess ]:
201+ def collect_dfsas_from_query (self , dashboard_id : str , query : LegacyQuery ) -> Iterable [DirectFsAccess ]:
161202 if query .query is None :
162203 return
163- linter = DirectFsAccessSqlLinter ()
204+ ctx = LinterContext (self ._migration_index , CurrentSessionState ())
205+ collector = ctx .dfsa_collector (Language .SQL )
164206 source_id = f"{ dashboard_id } /{ query .id } "
165207 source_name = query .name or "<anonymous>"
166- source_timestamp = cls ._read_timestamp (query .updated_at )
208+ source_timestamp = self ._read_timestamp (query .updated_at )
167209 source_lineage = [LineageAtom (object_type = "QUERY" , object_id = source_id , other = {"name" : source_name })]
168- for dfsa in linter .collect_dfsas (query .query ):
210+ for dfsa in collector .collect_dfsas (query .query ):
169211 yield DirectFsAccess (** asdict (dfsa )).replace_source (
170212 source_id = source_id , source_timestamp = source_timestamp , source_lineage = source_lineage
171213 )
172214
215+ def collect_used_tables_from_query (self , dashboard_id : str , query : LegacyQuery ) -> Iterable [UsedTable ]:
216+ if query .query is None :
217+ return
218+ ctx = LinterContext (self ._migration_index , CurrentSessionState ())
219+ collector = ctx .tables_collector (Language .SQL )
220+ source_id = f"{ dashboard_id } /{ query .id } "
221+ source_name = query .name or "<anonymous>"
222+ source_timestamp = self ._read_timestamp (query .updated_at )
223+ source_lineage = [LineageAtom (object_type = "QUERY" , object_id = source_id , other = {"name" : source_name })]
224+ for table in collector .collect_tables (query .query ):
225+ yield UsedTable (** asdict (table )).replace_source (
226+ source_id = source_id , source_timestamp = source_timestamp , source_lineage = source_lineage
227+ )
228+
173229 @classmethod
174230 def _read_timestamp (cls , timestamp : str | None ) -> datetime :
175231 if timestamp is not None :
0 commit comments