Skip to content

Commit 56a422e

Browse files
authored
Added migration_status table to capture a snapshot of migrated tables (#1041)
1 parent 7917906 commit 56a422e

File tree

5 files changed

+302
-44
lines changed

5 files changed

+302
-44
lines changed

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ def full_name(self) -> str:
4747
"""
4848
return f"{self._catalog}.{self._schema}.{self._table}"
4949

50+
def reset(self):
51+
"""
52+
Delete the content of the inventory table.
53+
The next call to `snapshot` will re-populate the table.
54+
"""
55+
self._exec(f"DELETE FROM {self.full_name}")
56+
5057
@staticmethod
5158
def _valid(name: str) -> str:
5259
"""

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import datetime
12
import logging
23
from collections import defaultdict
4+
from collections.abc import Iterable
5+
from dataclasses import dataclass
36
from functools import partial
47

58
from databricks.labs.blueprint.installation import Installation
@@ -15,25 +18,38 @@
1518
)
1619

1720
from databricks.labs.ucx.config import WorkspaceConfig
21+
from databricks.labs.ucx.framework.crawlers import CrawlerBase
1822
from databricks.labs.ucx.hive_metastore import TablesCrawler
1923
from databricks.labs.ucx.hive_metastore.mapping import Rule, TableMapping
2024
from databricks.labs.ucx.hive_metastore.tables import MigrationCount, Table, What
2125

2226
logger = logging.getLogger(__name__)
2327

2428

29+
@dataclass
30+
class MigrationStatus:
31+
src_schema: str
32+
src_table: str
33+
dst_catalog: str | None = None
34+
dst_schema: str | None = None
35+
dst_table: str | None = None
36+
update_ts: str | None = None
37+
38+
2539
class TablesMigrate:
2640
def __init__(
2741
self,
2842
tables_crawler: TablesCrawler,
2943
ws: WorkspaceClient,
3044
backend: SqlBackend,
3145
table_mapping: TableMapping,
46+
migration_status_refresher,
3247
):
3348
self._tc = tables_crawler
3449
self._backend = backend
3550
self._ws = ws
3651
self._tm = table_mapping
52+
self._migration_status_refresher = migration_status_refresher
3753
self._seen_tables: dict[str, str] = {}
3854

3955
@classmethod
@@ -43,7 +59,8 @@ def for_cli(cls, ws: WorkspaceClient, product='ucx'):
4359
sql_backend = StatementExecutionBackend(ws, config.warehouse_id)
4460
table_crawler = TablesCrawler(sql_backend, config.inventory_database)
4561
table_mapping = TableMapping(installation, ws, sql_backend)
46-
return cls(table_crawler, ws, sql_backend, table_mapping)
62+
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, config.inventory_database, table_crawler)
63+
return cls(table_crawler, ws, sql_backend, table_mapping, migration_status_refresher)
4764

4865
def migrate_tables(self, *, what: What | None = None):
4966
self._init_seen_tables()
@@ -93,19 +110,6 @@ def _migrate_view(self, src_table: Table, rule: Rule):
93110
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
94111
return True
95112

96-
def _iter_schemas(self):
97-
for catalog in self._ws.catalogs.list():
98-
yield from self._ws.schemas.list(catalog_name=catalog.name)
99-
100-
def _init_seen_tables(self):
101-
for schema in self._iter_schemas():
102-
for table in self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name):
103-
if table.properties is None:
104-
continue
105-
if "upgraded_from" not in table.properties:
106-
continue
107-
self._seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower()
108-
109113
def _table_already_upgraded(self, target) -> bool:
110114
return target in self._seen_tables
111115

@@ -169,13 +173,7 @@ def _get_revert_count(self, schema: str | None = None, table: str | None = None)
169173
return migration_list
170174

171175
def is_upgraded(self, schema: str, table: str) -> bool:
172-
result = self._backend.fetch(f"SHOW TBLPROPERTIES `{schema}`.`{table}`")
173-
for value in result:
174-
if value["key"] == "upgraded_to":
175-
logger.info(f"{schema}.{table} is set as upgraded")
176-
return True
177-
logger.info(f"{schema}.{table} is set as not upgraded")
178-
return False
176+
return self._migration_status_refresher.is_upgraded(schema, table)
179177

180178
def print_revert_report(self, *, delete_managed: bool) -> bool | None:
181179
migrated_count = self._get_revert_count()
@@ -215,6 +213,9 @@ def print_revert_report(self, *, delete_managed: bool) -> bool | None:
215213
print("To revert and delete Migrated Tables, add --delete_managed true flag to the command")
216214
return True
217215

216+
def _init_seen_tables(self):
217+
self._seen_tables = self._migration_status_refresher.get_seen_tables()
218+
218219

219220
class TableMove:
220221
def __init__(self, ws: WorkspaceClient, backend: SqlBackend):
@@ -458,3 +459,62 @@ def _recreate_view(self, to_view_name, view_text):
458459
create_sql = f"CREATE VIEW {to_view_name} AS {view_text}"
459460
logger.info(f"Creating view {to_view_name}")
460461
self._backend.execute(create_sql)
462+
463+
464+
class MigrationStatusRefresher(CrawlerBase[MigrationStatus]):
465+
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema, table_crawler: TablesCrawler):
466+
super().__init__(sbe, "hive_metastore", schema, "migration_status", MigrationStatus)
467+
self._ws = ws
468+
self._table_crawler = table_crawler
469+
470+
def snapshot(self) -> Iterable[MigrationStatus]:
471+
return self._snapshot(self._try_fetch, self._crawl)
472+
473+
def get_seen_tables(self) -> dict[str, str]:
474+
seen_tables: dict[str, str] = {}
475+
for schema in self._iter_schemas():
476+
for table in self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name):
477+
if not table.properties:
478+
continue
479+
if "upgraded_from" not in table.properties:
480+
continue
481+
if not table.full_name:
482+
logger.warning(f"The table {table.name} in {schema.name} has no full name")
483+
continue
484+
seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower()
485+
return seen_tables
486+
487+
def is_upgraded(self, schema: str, table: str) -> bool:
488+
result = self._backend.fetch(f"SHOW TBLPROPERTIES `{schema}`.`{table}`")
489+
for value in result:
490+
if value["key"] == "upgraded_to":
491+
logger.info(f"{schema}.{table} is set as upgraded")
492+
return True
493+
logger.info(f"{schema}.{table} is set as not upgraded")
494+
return False
495+
496+
def _crawl(self) -> Iterable[MigrationStatus]:
497+
all_tables = self._table_crawler.snapshot()
498+
reverse_seen = {v: k for k, v in self.get_seen_tables().items()}
499+
timestamp = datetime.datetime.now(datetime.timezone.utc).timestamp()
500+
for table in all_tables:
501+
table_migration_status = MigrationStatus(
502+
src_schema=table.database,
503+
src_table=table.name,
504+
update_ts=str(timestamp),
505+
)
506+
if table.key in reverse_seen and self.is_upgraded(table.database, table.name):
507+
target_table = reverse_seen[table.key]
508+
if len(target_table.split(".")) == 3:
509+
table_migration_status.dst_catalog = target_table.split(".")[0]
510+
table_migration_status.dst_schema = target_table.split(".")[1]
511+
table_migration_status.dst_table = target_table.split(".")[2]
512+
yield table_migration_status
513+
514+
def _try_fetch(self) -> Iterable[MigrationStatus]:
515+
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
516+
yield MigrationStatus(*row)
517+
518+
def _iter_schemas(self):
519+
for catalog in self._ws.catalogs.list():
520+
yield from self._ws.schemas.list(catalog_name=catalog.name)

src/databricks/labs/ucx/install.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
from databricks.labs.ucx.framework.tasks import _TASKS, Task
6767
from databricks.labs.ucx.hive_metastore.grants import Grant
6868
from databricks.labs.ucx.hive_metastore.locations import ExternalLocation, Mount
69+
from databricks.labs.ucx.hive_metastore.table_migrate import MigrationStatus
6970
from databricks.labs.ucx.hive_metastore.table_size import TableSize
7071
from databricks.labs.ucx.hive_metastore.tables import Table, TableError
7172
from databricks.labs.ucx.installer.hms_lineage import HiveMetastoreLineageEnabler
@@ -159,6 +160,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
159160
functools.partial(table, "workspace_objects", WorkspaceObjectInfo),
160161
functools.partial(table, "permissions", Permissions),
161162
functools.partial(table, "submit_runs", SubmitRunInfo),
163+
functools.partial(table, "migration_status", MigrationStatus),
162164
],
163165
)
164166
deployer.deploy_view("objects", "queries/views/objects.sql")

tests/integration/hive_metastore/test_migrate.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
from databricks.sdk.retries import retried
77

88
from databricks.labs.ucx.hive_metastore.mapping import Rule
9-
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate
9+
from databricks.labs.ucx.hive_metastore.table_migrate import (
10+
MigrationStatusRefresher,
11+
TablesMigrate,
12+
)
1013
from databricks.labs.ucx.hive_metastore.tables import Table
1114

1215
from ..conftest import StaticTableMapping, StaticTablesCrawler
@@ -38,7 +41,8 @@ def test_migrate_managed_tables(ws, sql_backend, inventory_schema, make_catalog,
3841
),
3942
]
4043
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
41-
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping)
44+
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
45+
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping, migration_status_refresher)
4246

4347
table_migrate.migrate_tables()
4448

@@ -94,7 +98,8 @@ def test_migrate_tables_with_cache_should_not_create_table(
9498
),
9599
]
96100
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
97-
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping)
101+
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
102+
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping, migration_status_refresher)
98103

99104
# FIXME: flaky: databricks.sdk.errors.platform.NotFound: Catalog 'ucx_cjazg' does not exist.
100105
table_migrate.migrate_tables()
@@ -110,16 +115,11 @@ def test_migrate_external_table(ws, sql_backend, inventory_schema, make_catalog,
110115
if not ws.config.is_azure:
111116
pytest.skip("temporary: only works in azure test env")
112117
src_schema = make_schema(catalog_name="hive_metastore")
113-
114118
mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c'
115119
src_external_table = make_table(schema_name=src_schema.name, external_csv=mounted_location)
116-
117120
dst_catalog = make_catalog()
118121
dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name)
119-
120122
logger.info(f"dst_catalog={dst_catalog.name}, external_table={src_external_table.full_name}")
121-
122-
# crawler = TablesCrawler(sql_backend, inventory_schema)
123123
table_crawler = StaticTablesCrawler(sql_backend, inventory_schema, [src_external_table])
124124
rules = [
125125
Rule(
@@ -131,8 +131,10 @@ def test_migrate_external_table(ws, sql_backend, inventory_schema, make_catalog,
131131
src_external_table.name,
132132
),
133133
]
134-
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
135-
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping)
134+
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
135+
table_migrate = TablesMigrate(
136+
table_crawler, ws, sql_backend, StaticTableMapping(ws, sql_backend, rules=rules), migration_status_refresher
137+
)
136138

137139
table_migrate.migrate_tables()
138140

@@ -142,6 +144,14 @@ def test_migrate_external_table(ws, sql_backend, inventory_schema, make_catalog,
142144
assert target_table_properties["upgraded_from"] == src_external_table.full_name
143145
assert target_table_properties[Table.UPGRADED_FROM_WS_PARAM] == str(ws.get_workspace_id())
144146

147+
migration_status = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler).snapshot()
148+
assert len(migration_status) == 1
149+
assert migration_status[0].src_schema == src_external_table.schema_name
150+
assert migration_status[0].src_table == src_external_table.name
151+
assert migration_status[0].dst_catalog == dst_catalog.name
152+
assert migration_status[0].dst_schema == dst_schema.name
153+
assert migration_status[0].dst_table == src_external_table.name
154+
145155

146156
@retried(on=[NotFound], timeout=timedelta(minutes=5))
147157
def test_revert_migrated_table(
@@ -178,7 +188,8 @@ def test_revert_migrated_table(
178188
),
179189
]
180190
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
181-
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping)
191+
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
192+
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping, migration_status_refresher)
182193
table_migrate.migrate_tables()
183194

184195
table_migrate.revert_migrated_tables(src_schema1.name, delete_managed=True)
@@ -284,7 +295,8 @@ def test_mapping_reverts_table(
284295
),
285296
]
286297
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
287-
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping)
298+
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
299+
table_migrate = TablesMigrate(table_crawler, ws, sql_backend, table_mapping, migration_status_refresher)
288300
table_migrate.migrate_tables()
289301

290302
target_table_properties = ws.tables.get(f"{dst_schema.full_name}.{table_to_skip.name}").properties

0 commit comments

Comments
 (0)