-
Notifications
You must be signed in to change notification settings - Fork 99
Crawlers: append snapshots to history journal, if available #2743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
15dd48d
625f16a
622792e
3d31a1f
4ebf9b6
d82e06b
4d676bb
cf48771
e877532
b321903
02fc40f
6170a63
4b55625
155cda7
260becf
0efb3e5
6e649af
a2dd1d5
f897d98
4a5321c
76aeefb
e321589
4e3e3c8
616b0f1
a52720d
6f38192
af1fff2
724abb2
dd73486
31740dd
00d5d85
57f63ba
3aa2431
4c86ba6
5c27d8b
39f1105
1d50ce1
d431322
95d2a48
4ce4ce4
7441077
07bc711
3b3e5bd
a9dd77f
0f9a4cd
ad482b8
fd2b3ac
9d4ed60
3703848
62390b5
c1fea83
8c70541
6f41fc0
927392a
78f592d
6dfed4c
34b1e72
77a06e1
ac60675
f00a06e
4d6989c
17100d5
78a9a13
21ad44d
febc9ce
383027d
e223612
c938dd8
8a378dc
2e5869d
bf542a4
3aad8bf
a990327
2381d18
23e7720
567d809
f0bd963
5b2ab77
0ff2e95
971cb4c
be16738
2b9b476
f17dc74
823c886
7377727
fdb03b2
b7af858
9b489b9
0a2f4f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| from __future__ import annotations | ||
| import datetime as dt | ||
| import logging | ||
| from abc import ABC, abstractmethod | ||
| from collections.abc import Callable, Iterable, Sequence | ||
|
|
@@ -6,6 +8,7 @@ | |
| from databricks.labs.lsql.backends import SqlBackend | ||
| from databricks.sdk.errors import NotFound | ||
|
|
||
| from databricks.labs.ucx.framework.history import HistoryLog | ||
| from databricks.labs.ucx.framework.utils import escape_sql_identifier | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -21,13 +24,22 @@ class DataclassInstance(Protocol): | |
|
|
||
|
|
||
| class CrawlerBase(ABC, Generic[Result]): | ||
| def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]): | ||
| def __init__( | ||
| self, | ||
| backend: SqlBackend, | ||
| catalog: str, | ||
| schema: str, | ||
| table: str, | ||
| klass: type[Result], | ||
| history_log: HistoryLog | None = None, | ||
|
||
| ): | ||
| """ | ||
| Initializes a CrawlerBase instance. | ||
|
|
||
| Args: | ||
| backend (SqlBackend): The backend that executes SQL queries: | ||
| Statement Execution API or Databricks Runtime. | ||
| history_log: The (optional) history log where (new) snapshots should be saved. | ||
| catalog (str): The catalog name for the inventory persistence. | ||
| schema: The schema name for the inventory persistence. | ||
| table: The table name for the inventory persistence. | ||
|
|
@@ -36,6 +48,7 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k | |
| self._schema = self._valid(schema) | ||
| self._table = self._valid(table) | ||
| self._backend = backend | ||
| self._history_appender = history_log.appender(klass) if history_log is not None else None | ||
| self._fetch = backend.fetch | ||
| self._exec = backend.execute | ||
| self._klass = klass | ||
|
|
@@ -155,10 +168,15 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool) | |
| except NotFound as e: | ||
| logger.debug("Inventory table not found", exc_info=e) | ||
| logger.debug(f"[{self.full_name}] crawling new set of snapshot data for {self._table}") | ||
| crawl_start_time = dt.datetime.now(tz=dt.timezone.utc) | ||
| loaded_records = list(loader()) | ||
| self._update_snapshot(loaded_records, mode="overwrite") | ||
| self._update_snapshot(loaded_records, crawl_start_time=crawl_start_time, mode="overwrite") | ||
| return loaded_records | ||
|
|
||
| def _update_snapshot(self, items: Sequence[Result], mode: Literal["append", "overwrite"] = "append") -> None: | ||
| def _update_snapshot( | ||
| self, items: Sequence[Result], *, crawl_start_time: dt.datetime, mode: Literal["append", "overwrite"] | ||
| ) -> None: | ||
| logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}") | ||
| self._backend.save_table(self.full_name, items, self._klass, mode=mode) | ||
| if self._history_appender: | ||
| self._history_appender.append_snapshot(items, run_start_time=crawl_start_time) | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,158 @@ | ||||||
| from __future__ import annotations | ||||||
| import dataclasses | ||||||
| import datetime as dt | ||||||
| import json | ||||||
| import logging | ||||||
| import os | ||||||
| from collections.abc import Callable, Sequence | ||||||
| from dataclasses import dataclass | ||||||
| from functools import cached_property | ||||||
| from typing import ClassVar, Protocol, TypeVar | ||||||
|
|
||||||
| from databricks.labs.lsql.backends import SqlBackend | ||||||
| from databricks.sdk import WorkspaceClient | ||||||
|
|
||||||
|
|
||||||
| logger = logging.getLogger(__name__) | ||||||
|
|
||||||
|
|
||||||
| @dataclass(frozen=True, kw_only=True) | ||||||
| class HistoricalRecord: | ||||||
| workspace_id: int | ||||||
| """The identifier of the workspace where this record was generated.""" | ||||||
|
|
||||||
| run_id: int | ||||||
| """The identifier of the workflow run that generated this record.""" | ||||||
|
|
||||||
| snapshot_id: int | ||||||
| """An identifier that is unique to the records produced for a given snapshot.""" | ||||||
|
|
||||||
| run_start_time: dt.datetime | ||||||
| """When this record was generated.""" | ||||||
|
|
||||||
| object_type: str | ||||||
| """The inventory table for which this record was generated.""" | ||||||
|
|
||||||
| object_type_version: int | ||||||
| """Versioning of inventory table, for forward compatibility.""" | ||||||
|
|
||||||
| object_id: list[str] | ||||||
| """The type-specific identifier for this inventory record.""" | ||||||
|
|
||||||
| object_data: dict[str, str] | ||||||
| """Type-specific data of the inventory record. Keys are top-level attributes, values are their JSON-encoded values.""" | ||||||
|
|
||||||
| failures: list[str] | ||||||
| """The list of problems associated with the object that this inventory record covers.""" | ||||||
|
|
||||||
| owner: str | ||||||
| """The identity of the account that created this inventory record.""" | ||||||
|
|
||||||
|
|
||||||
| class DataclassInstance(Protocol): | ||||||
| __dataclass_fields__: ClassVar[dict] | ||||||
| # TODO: Once all record types provide the property: key_fields: ClassVar[Sequence[str]] | ||||||
|
|
||||||
|
|
||||||
| Record = TypeVar("Record", bound=DataclassInstance) | ||||||
|
|
||||||
|
|
||||||
| class HistoryLog: | ||||||
| __slots__ = ("_ws", "_backend", "_run_id", "_catalog", "_schema", "_table") | ||||||
|
||||||
|
|
||||||
| def __init__( | ||||||
| self, | ||||||
| ws: WorkspaceClient, | ||||||
| backend: SqlBackend, | ||||||
| run_id: int, | ||||||
| catalog: str, | ||||||
| schema: str, | ||||||
| table: str, | ||||||
|
||||||
| table: str, |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self._table = table | |
| self._table = "history" |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not an object owner. it's defined differently for every object type. probably there'll need to be a separate PR to fix that and make it consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've created #2761 to track this. That PR blocks progress on this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the new implementation (HistoricalEncoder) the owner property comes from the supplied Ownership implementation.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| snapshot_id = int.from_bytes(os.urandom(7), byteorder="big") |
rely on workflow run id
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JCZuurmond in the parse_logs task, create the WorkflowRuns entities, where you record run start time and run end time. otherwise the writer side gets too complicated. parse_logs is guaranteed to be executed at the end of every job run, so it's a good place to do so. or add one more task explicitly, which is a bit more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asnare marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and parse them out of JSON remove them from object_as_dict, because object_data has to be dict[str,str] for readers to be simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will work for the classes that have failures. (Not all do: 5 of the 9 currently being updated during migration-progress have it, 4 do not.)
Aside from these, ReconResult has an error_message attribute instead of failures, but it fulfils the same purpose. This will need to be special-cased.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is superseded by a new one. Encoding is handled by HistoricalEncoder: it treats failures as a special property, and does not include it in the object data. If the class doesn't have a failures attribute we set to this an empty list ([]) because it's a mandatory field.
Uh oh!
There was an error while loading. Please reload this page.