Skip to content

Commit f1db173

Browse files
william-continfx
andauthored
Added a migration state to skip already migrated tables (#325)
Add a migration state to skip already migrated tables using an in-memory state --------- Co-authored-by: Serge Smertin <[email protected]>
1 parent b81abea commit f1db173

File tree

4 files changed

+91
-16
lines changed

4 files changed

+91
-16
lines changed

src/databricks/labs/ucx/hive_metastore/tables.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,15 @@ def __init__(
152152
tc: TablesCrawler,
153153
ws: WorkspaceClient,
154154
backend: SqlBackend,
155-
inventory_database: str,
156155
default_catalog=None,
157156
database_to_catalog_mapping: dict[str, str] | None = None,
158157
):
159158
self._tc = tc
160159
self._backend = backend
161160
self._ws = ws
162-
self._inventory_database = inventory_database
163161
self._database_to_catalog_mapping = database_to_catalog_mapping
164-
self._seen_tables = {}
165162
self._default_catalog = self._init_default_catalog(default_catalog)
163+
self._seen_tables = {}
166164

167165
@staticmethod
168166
def _init_default_catalog(default_catalog):
@@ -172,6 +170,7 @@ def _init_default_catalog(default_catalog):
172170
return "ucx_default" # TODO : Fetch current workspace name and append it to the default catalog.
173171

174172
def migrate_tables(self):
173+
self._init_seen_tables()
175174
tasks = []
176175
for table in self._tc.snapshot():
177176
target_catalog = self._default_catalog
@@ -184,12 +183,26 @@ def _migrate_table(self, target_catalog, table):
184183
try:
185184
sql = table.uc_create_sql(target_catalog)
186185
logger.debug(f"Migrating table {table.key} to using SQL query: {sql}")
186+
target = f"{target_catalog}.{table.database}.{table.name}".lower()
187187

188-
if table.object_type == "MANAGED":
188+
if self._table_already_upgraded(target):
189+
logger.info(f"Table {table.key} already upgraded to {self._seen_tables[target]}")
190+
elif table.object_type == "MANAGED":
189191
self._backend.execute(sql)
190192
self._backend.execute(table.sql_alter_to(target_catalog))
191193
self._backend.execute(table.sql_alter_from(target_catalog))
194+
self._seen_tables[target] = table.key
192195
else:
193196
logger.info(f"Table {table.key} is a {table.object_type} and is not supported for migration yet ")
194197
except Exception as e:
195198
logger.error(f"Could not create table {table.name} because: {e}")
199+
200+
def _init_seen_tables(self):
201+
for catalog in self._ws.catalogs.list():
202+
for schema in self._ws.schemas.list(catalog_name=catalog.name):
203+
for table in self._ws.tables.list(catalog_name=catalog.name, schema_name=schema.name):
204+
if table.properties is not None and "upgraded_from" in table.properties:
205+
self._seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower()
206+
207+
def _table_already_upgraded(self, target) -> bool:
208+
return target in self._seen_tables

tests/integration/conftest.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def create(*, catalog: str = "hive_metastore", schema: str | None = None):
9090
return schema
9191

9292
yield from factory( # noqa: F405
93-
"schema", create, lambda schema_name: sql_exec(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")
93+
"schema", create, lambda schema: sql_exec(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
9494
)
9595

9696

@@ -104,15 +104,18 @@ def make_table(sql_exec, make_schema, make_random):
104104
def create(
105105
*,
106106
catalog="hive_metastore",
107+
name: str | None = None,
107108
schema: str | None = None,
108109
ctas: str | None = None,
109110
non_delta: bool = False,
110111
external: bool = False,
111112
view: bool = False,
113+
tbl_properties: dict[str, str] | None = None,
112114
):
113115
if schema is None:
114116
schema = make_schema(catalog=catalog)
115-
name = f"{schema}.ucx_T{make_random(4)}".lower()
117+
if name is None:
118+
name = f"{schema}.ucx_T{make_random(4)}".lower()
116119
ddl = f'CREATE {"VIEW" if view else "TABLE"} {name}'
117120
if ctas is not None:
118121
# temporary (if not view)
@@ -128,6 +131,10 @@ def create(
128131
else:
129132
# managed table
130133
ddl = f"{ddl} (id INT, value STRING)"
134+
if tbl_properties:
135+
tbl_properties = ",".join([f" '{k}' = '{v}' " for k, v in tbl_properties.items()])
136+
ddl = f"{ddl} TBLPROPERTIES ({tbl_properties})"
137+
131138
sql_exec(ddl)
132139
return name
133140

@@ -150,6 +157,7 @@ def test_table_fixture(make_table):
150157
logger.info(f"Created new external JSON table in new schema: {make_table(non_delta=True)}")
151158
logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}')
152159
logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}')
160+
logger.info(f'Created table with properties: {make_table(tbl_properties={"test": "tableproperty"})}')
153161

154162

155163
@pytest.fixture

tests/integration/hive_metastore/test_migrate.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ def test_migrate_managed_tables(ws, make_catalog, make_schema, make_table):
1919

2020
managed_table = make_table(schema=schema_a)
2121

22-
logger.info(f"target catalog={target_catalog}, managed_table={managed_table}")
22+
logger.info(f"target_catalog={target_catalog}, managed_table={managed_table}")
2323

2424
inventory_schema = make_schema(catalog="hive_metastore")
2525
_, inventory_schema = inventory_schema.split(".")
2626

2727
backend = StatementExecutionBackend(ws, os.environ["TEST_DEFAULT_WAREHOUSE_ID"])
2828
crawler = TablesCrawler(backend, inventory_schema)
29-
tm = TablesMigrate(crawler, ws, backend, target_catalog, inventory_schema)
29+
tm = TablesMigrate(crawler, ws, backend, target_catalog)
3030
tm.migrate_tables()
3131

3232
target_tables = list(backend.fetch(f"SHOW TABLES IN {target_catalog}.{target_schema}"))
@@ -38,13 +38,47 @@ def test_migrate_managed_tables(ws, make_catalog, make_schema, make_table):
3838
assert target_table_properties["upgraded_from"] == managed_table
3939

4040

41+
def test_migrate_tables_with_cache_should_not_create_table(ws, make_random, make_catalog, make_schema, make_table):
42+
target_catalog = make_catalog()
43+
schema_a = make_schema(catalog="hive_metastore")
44+
_, target_schema = schema_a.split(".")
45+
46+
make_schema(catalog=target_catalog, schema=target_schema)
47+
48+
table_name = make_random().lower()
49+
target_table = f"{target_catalog}.{target_schema}.{table_name}"
50+
source_table = f"hive_metastore.{target_schema}.{table_name}"
51+
target_managed_table = make_table(name=target_table, tbl_properties={"upgraded_from": f"{source_table}"})
52+
source_managed_table = make_table(name=source_table, tbl_properties={"upgraded_from": f"{target_table}"})
53+
54+
logger.info(
55+
f"target_catalog={target_catalog}, "
56+
f"source_managed_table={source_managed_table}"
57+
f"target_managed_table={target_managed_table}"
58+
f""
59+
)
60+
61+
inventory_schema = make_schema(catalog="hive_metastore")
62+
_, inventory_schema = inventory_schema.split(".")
63+
64+
backend = StatementExecutionBackend(ws, os.environ["TEST_DEFAULT_WAREHOUSE_ID"])
65+
crawler = TablesCrawler(backend, inventory_schema)
66+
tm = TablesMigrate(crawler, ws, backend, target_catalog)
67+
tm.migrate_tables()
68+
69+
target_tables = list(backend.fetch(f"SHOW TABLES IN {target_catalog}.{target_schema}"))
70+
assert len(target_tables) == 1
71+
assert target_tables[0]["database"] == target_schema
72+
assert target_tables[0]["tableName"] == table_name
73+
74+
4175
@pytest.mark.skip(reason="Needs Storage credential + External Location in place")
4276
def test_migrate_external_table(ws, make_catalog, make_schema, make_table):
4377
target_catalog = make_catalog()
4478
schema_a = make_schema(catalog="hive_metastore")
4579
_, target_schema = schema_a.split(".")
4680

47-
make_schema(catalog=target_catalog, schema_name=target_schema)
81+
make_schema(catalog=target_catalog, schema=target_schema)
4882

4983
external_table = make_table(schema=schema_a, external=True)
5084

tests/unit/hive_metastore/test_migrate.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from unittest.mock import MagicMock
33

4-
import pytest
4+
from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo, TableInfo
55

66
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler, TablesMigrate
77

@@ -28,7 +28,7 @@ def test_migrate_managed_tables_should_produce_proper_queries():
2828
backend = MockBackend(fails_on_first=errors, rows=rows)
2929
tc = TablesCrawler(backend, "inventory_database")
3030
client = MagicMock()
31-
tm = TablesMigrate(tc, client, backend, "")
31+
tm = TablesMigrate(tc, client, backend)
3232
tm.migrate_tables()
3333

3434
assert (list(backend.queries)) == [
@@ -39,18 +39,22 @@ def test_migrate_managed_tables_should_produce_proper_queries():
3939
]
4040

4141

42-
@pytest.mark.skip(reason="Not implemented yet")
4342
def test_migrate_managed_tables_should_do_nothing_if_upgrade_tag_is_present():
4443
errors = {}
4544
rows = {
4645
"SELECT": [
47-
("hive_metastore", "db1", "managed", "MANAGED", "DELTA", None, None, "[upgraded_to=target]"),
46+
("hive_metastore", "db1", "managed", "MANAGED", "DELTA", None, None),
4847
]
4948
}
5049
backend = MockBackend(fails_on_first=errors, rows=rows)
5150
tc = TablesCrawler(backend, "inventory_database")
5251
client = MagicMock()
53-
tm = TablesMigrate(tc, client, backend, "")
52+
client.catalogs.list.return_value = [CatalogInfo(name="catalog_1")]
53+
client.schemas.list.return_value = [SchemaInfo(name="db1")]
54+
client.tables.list.return_value = [
55+
TableInfo(full_name="catalog_1.db1.managed", properties={"upgraded_from": "hive_metastore.db1.managed"})
56+
]
57+
tm = TablesMigrate(tc, client, backend, default_catalog="catalog_1")
5458
tm.migrate_tables()
5559

5660
assert (list(backend.queries)) == ["SELECT * FROM hive_metastore.inventory_database.tables"]
@@ -67,7 +71,7 @@ def test_migrate_tables_should_migrate_tables_to_default_catalog_if_not_found_in
6771
tc = TablesCrawler(backend, "inventory_database")
6872
client = MagicMock()
6973
database_to_catalog_mapping = {"db1": "catalog_1", "db2": "catalog_2"}
70-
tm = TablesMigrate(tc, client, backend, "", database_to_catalog_mapping=database_to_catalog_mapping)
74+
tm = TablesMigrate(tc, client, backend, database_to_catalog_mapping=database_to_catalog_mapping)
7175
tm.migrate_tables()
7276

7377
assert (list(backend.queries)) == [
@@ -88,7 +92,7 @@ def test_migrate_tables_should_migrate_tables_to_default_catalog_if_specified():
8892
backend = MockBackend(fails_on_first=errors, rows=rows)
8993
tc = TablesCrawler(backend, "inventory_database")
9094
client = MagicMock()
91-
tm = TablesMigrate(tc, client, backend, "", default_catalog="test_catalog")
95+
tm = TablesMigrate(tc, client, backend, default_catalog="test_catalog")
9296
tm.migrate_tables()
9397

9498
assert (list(backend.queries)) == [
@@ -97,3 +101,19 @@ def test_migrate_tables_should_migrate_tables_to_default_catalog_if_specified():
97101
"ALTER TABLE hive_metastore.db1.managed SET TBLPROPERTIES ('upgraded_to' = 'test_catalog.db1.managed');",
98102
"ALTER TABLE test_catalog.db1.managed SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1.managed');",
99103
]
104+
105+
106+
def test_migrate_tables_should_add_table_to_cache_when_migrated():
107+
errors = {}
108+
rows = {
109+
"SELECT": [
110+
("hive_metastore", "db1", "managed", "MANAGED", "DELTA", None, None),
111+
]
112+
}
113+
backend = MockBackend(fails_on_first=errors, rows=rows)
114+
tc = TablesCrawler(backend, "inventory_database")
115+
client = MagicMock()
116+
tm = TablesMigrate(tc, client, backend, default_catalog="test_catalog")
117+
tm.migrate_tables()
118+
119+
assert tm._seen_tables == {"test_catalog.db1.managed": "hive_metastore.db1.managed"}

0 commit comments

Comments
 (0)