1+ from __future__ import annotations
2+ import datetime as dt
13import logging
24from abc import ABC , abstractmethod
35from collections .abc import Callable , Iterable , Sequence
68from databricks .labs .lsql .backends import SqlBackend
79from databricks .sdk .errors import NotFound
810
11+ from databricks .labs .ucx .framework .history import HistoryLog
912from databricks .labs .ucx .framework .utils import escape_sql_identifier
1013
1114logger = logging .getLogger (__name__ )
@@ -21,13 +24,22 @@ class DataclassInstance(Protocol):
2124
2225
2326class CrawlerBase (ABC , Generic [Result ]):
24- def __init__ (self , backend : SqlBackend , catalog : str , schema : str , table : str , klass : type [Result ]):
27+ def __init__ (
28+ self ,
29+ backend : SqlBackend ,
30+ catalog : str ,
31+ schema : str ,
32+ table : str ,
33+ klass : type [Result ],
34+ history_log : HistoryLog | None = None ,
35+ ):
2536 """
2637 Initializes a CrawlerBase instance.
2738
2839 Args:
2940 backend (SqlBackend): The backend that executes SQL queries:
3041 Statement Execution API or Databricks Runtime.
42+ history_log: The (optional) history log where (new) snapshots should be saved.
3143 catalog (str): The catalog name for the inventory persistence.
3244 schema: The schema name for the inventory persistence.
3345 table: The table name for the inventory persistence.
@@ -36,6 +48,7 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k
3648 self ._schema = self ._valid (schema )
3749 self ._table = self ._valid (table )
3850 self ._backend = backend
51+ self ._history_log = history_log
3952 self ._fetch = backend .fetch
4053 self ._exec = backend .execute
4154 self ._klass = klass
@@ -155,10 +168,16 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool)
155168 except NotFound as e :
156169 logger .debug ("Inventory table not found" , exc_info = e )
157170 logger .debug (f"[{ self .full_name } ] crawling new set of snapshot data for { self ._table } " )
171+ crawl_start_time = dt .datetime .now (tz = dt .timezone .utc )
158172 loaded_records = list (loader ())
159- self ._update_snapshot (loaded_records , mode = "overwrite" )
173+ self ._update_snapshot (loaded_records , crawl_start_time = crawl_start_time , mode = "overwrite" )
160174 return loaded_records
161175
162- def _update_snapshot (self , items : Sequence [Result ], mode : Literal ["append" , "overwrite" ] = "append" ) -> None :
176+ def _update_snapshot (
177+ self , items : Sequence [Result ], * , crawl_start_time : dt .datetime , mode : Literal ["append" , "overwrite" ]
178+ ) -> None :
163179 logger .debug (f"[{ self .full_name } ] found { len (items )} new records for { self ._table } " )
164180 self ._backend .save_table (self .full_name , items , self ._klass , mode = mode )
181+ if self ._history_log :
182+ appender = self ._history_log .appender (self ._klass )
183+ appender .append_snapshot (items , run_start_time = crawl_start_time )
0 commit comments