Skip to content

Commit 56db4ce

Browse files
Detect tables that are not present in the mapping file (#2205)
<!-- REMOVE IRRELEVANT COMMENTS BEFORE CREATING A PULL REQUEST --> ## Changes <!-- Summary of your changes that are easy to understand. Add screenshots when necessary --> ### Linked issues <!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved. See https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword --> Resolves #1221 ### Functionality - [ ] added relevant user documentation - [ ] added new CLI command - [ ] modified existing command: `databricks labs ucx ...` - [ ] added a new workflow - [x] modified existing workflow: `table-migration` - [ ] added a new table - [ ] modified existing table: `...` ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [x] manually tested - [x] added unit tests - [x] added integration tests - [x] verified on staging environment (screenshot attached)
1 parent e2e2706 commit 56db4ce

File tree

6 files changed

+126
-14
lines changed

6 files changed

+126
-14
lines changed

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ def __init__(
5757
self._seen_tables: dict[str, str] = {}
5858
self._principal_grants = principal_grants
5959

60+
def get_remaining_tables(self) -> list[Table]:
61+
self.index_full_refresh()
62+
table_rows = []
63+
for crawled_table in self._tc.snapshot():
64+
if not self._is_migrated(crawled_table.database, crawled_table.name):
65+
table_rows.append(crawled_table)
66+
logger.warning(f"remained-hive-metastore-table: {crawled_table.key}")
67+
return table_rows
68+
6069
def index(self):
6170
return self._migration_status_refresher.index()
6271

@@ -490,3 +499,7 @@ def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int):
490499
f"('upgraded_from' = '{source}'"
491500
f" , '{table.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');"
492501
)
502+
503+
def _is_migrated(self, schema: str, table: str) -> bool:
504+
index = self._migration_status_refresher.index()
505+
return index.is_migrated(schema, table)

src/databricks/labs/ucx/hive_metastore/workflows.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ def migrate_views(self, ctx: RuntimeContext):
6969
)
7070

7171
@job_task(job_cluster="table_migration", depends_on=[migrate_views])
72-
def refresh_migration_status(self, ctx: RuntimeContext):
72+
def update_migration_status(self, ctx: RuntimeContext):
7373
"""Refresh the migration status to present it in the dashboard."""
74-
ctx.tables_migrator.index_full_refresh()
74+
ctx.tables_migrator.get_remaining_tables()
7575

7676

7777
class MigrateHiveSerdeTablesInPlace(Workflow):
@@ -109,9 +109,9 @@ def migrate_views(self, ctx: RuntimeContext):
109109
)
110110

111111
@job_task(job_cluster="table_migration", depends_on=[migrate_views])
112-
def refresh_migration_status(self, ctx: RuntimeContext):
112+
def update_migration_status(self, ctx: RuntimeContext):
113113
"""Refresh the migration status to present it in the dashboard."""
114-
ctx.tables_migrator.index_full_refresh()
114+
ctx.tables_migrator.get_remaining_tables()
115115

116116

117117
class MigrateExternalTablesCTAS(Workflow):
@@ -159,9 +159,9 @@ def migrate_views(self, ctx: RuntimeContext):
159159
)
160160

161161
@job_task(job_cluster="table_migration", depends_on=[migrate_views])
162-
def refresh_migration_status(self, ctx: RuntimeContext):
162+
def update_migration_status(self, ctx: RuntimeContext):
163163
"""Refresh the migration status to present it in the dashboard."""
164-
ctx.tables_migrator.index_full_refresh()
164+
ctx.tables_migrator.get_remaining_tables()
165165

166166

167167
class ScanTablesInMounts(Workflow):
@@ -176,9 +176,9 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
176176
ctx.tables_in_mounts.snapshot()
177177

178178
@job_task(job_cluster="table_migration", depends_on=[scan_tables_in_mounts_experimental])
179-
def refresh_migration_status(self, ctx: RuntimeContext):
179+
def update_migration_status(self, ctx: RuntimeContext):
180180
"""Refresh the migration status to present it in the dashboard."""
181-
ctx.tables_migrator.index_full_refresh()
181+
ctx.tables_migrator.get_remaining_tables()
182182

183183

184184
class MigrateTablesInMounts(Workflow):
@@ -191,6 +191,6 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
191191
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)
192192

193193
@job_task(job_cluster="table_migration", depends_on=[migrate_tables_in_mounts_experimental])
194-
def refresh_migration_status(self, ctx: RuntimeContext):
194+
def update_migration_status(self, ctx: RuntimeContext):
195195
"""Refresh the migration status to present it in the dashboard."""
196-
ctx.tables_migrator.index_full_refresh()
196+
ctx.tables_migrator.get_remaining_tables()
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/* --title 'List of remaining tables in HMS' --type table --width 6 */
2+
SELECT
3+
SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) AS message
4+
FROM inventory.logs
5+
WHERE
6+
message LIKE 'remained-hive-metastore-table: %'

tests/integration/hive_metastore/test_workflows.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22
from databricks.sdk.errors import NotFound
3+
from databricks.labs.ucx.hive_metastore.tables import Table
34

45

56
@pytest.mark.parametrize(
@@ -97,3 +98,35 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables
9798
assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name
9899
except NotFound:
99100
assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}"
101+
102+
103+
@pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True)
104+
def test_table_migration_job_publishes_remianed_tables(
105+
ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog
106+
):
107+
tables, dst_schema = prepare_tables_for_migration
108+
installation_ctx.workspace_installation.run()
109+
second_table = list(tables.values())[1]
110+
table = Table(
111+
"hive_metastore",
112+
dst_schema.name,
113+
second_table.name,
114+
object_type="UNKNOWN",
115+
table_format="UNKNOWN",
116+
)
117+
installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table)
118+
installation_ctx.deployed_workflows.run_workflow("migrate-tables")
119+
assert installation_ctx.deployed_workflows.validate_step("migrate-tables")
120+
121+
remained_tables = list(
122+
sql_backend.fetch(
123+
f"""
124+
SELECT
125+
SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1)
126+
AS message
127+
FROM {installation_ctx.inventory_database}.logs
128+
WHERE message LIKE 'remained-hive-metastore-table: %'
129+
"""
130+
)
131+
)
132+
assert remained_tables[0].message == f'hive_metastore.{dst_schema.name}.{second_table.name}'

tests/unit/hive_metastore/test_table_migrate.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,3 +1349,65 @@ def test_revert_migrated_tables_failed(caplog):
13491349
table_migrate = get_table_migrator(backend)
13501350
table_migrate.revert_migrated_tables(schema="test_schema1")
13511351
assert "Failed to revert table hive_metastore.test_schema1.test_table1: error" in caplog.text
1352+
1353+
1354+
def test_refresh_migration_status_published_remained_tables(caplog):
1355+
backend = MockBackend()
1356+
table_crawler = create_autospec(TablesCrawler)
1357+
grant_crawler = create_autospec(GrantsCrawler)
1358+
client = mock_workspace_client()
1359+
table_crawler.snapshot.return_value = [
1360+
Table(
1361+
object_type="EXTERNAL",
1362+
table_format="DELTA",
1363+
catalog="hive_metastore",
1364+
database="schema1",
1365+
name="table1",
1366+
location="s3://some_location/table1",
1367+
upgraded_to="ucx_default.db1_dst.dst_table1",
1368+
),
1369+
Table(
1370+
object_type="EXTERNAL",
1371+
table_format="DELTA",
1372+
catalog="hive_metastore",
1373+
database="schema1",
1374+
name="table2",
1375+
location="s3://some_location/table2",
1376+
upgraded_to="ucx_default.db1_dst.dst_table2",
1377+
),
1378+
Table(
1379+
object_type="EXTERNAL",
1380+
table_format="DELTA",
1381+
catalog="hive_metastore",
1382+
database="schema1",
1383+
name="table3",
1384+
location="s3://some_location/table3",
1385+
),
1386+
]
1387+
group_manager = GroupManager(backend, client, "inventory_database")
1388+
table_mapping = mock_table_mapping()
1389+
migration_status_refresher = create_autospec(MigrationStatusRefresher)
1390+
migration_index = MigrationIndex(
1391+
[
1392+
MigrationStatus("schema1", "table1", "ucx_default", "db1_dst", "dst_table1"),
1393+
MigrationStatus("schema1", "table2", "ucx_default", "db1_dst", "dst_table2"),
1394+
]
1395+
)
1396+
migration_status_refresher.index.return_value = migration_index
1397+
principal_grants = create_autospec(PrincipalACL)
1398+
table_migrate = TablesMigrator(
1399+
table_crawler,
1400+
grant_crawler,
1401+
client,
1402+
backend,
1403+
table_mapping,
1404+
group_manager,
1405+
migration_status_refresher,
1406+
principal_grants,
1407+
)
1408+
with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore"):
1409+
tables = table_migrate.get_remaining_tables()
1410+
assert 'remained-hive-metastore-table: hive_metastore.schema1.table3' in caplog.messages
1411+
assert len(tables) == 1 and tables[0].key == "hive_metastore.schema1.table3"
1412+
grant_crawler.assert_not_called()
1413+
principal_grants.assert_not_called()

tests/unit/hive_metastore/test_workflows.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import pytest
2-
32
from databricks.labs.ucx.hive_metastore.workflows import (
43
TableMigration,
54
MigrateExternalTablesCTAS,
@@ -64,9 +63,8 @@ def test_migrate_ctas_views(run_workflow):
6463
MigrateTablesInMounts,
6564
],
6665
)
67-
def test_refresh_migration_status_is_refreshed(run_workflow, workflow):
66+
def test_update_migration_status(run_workflow, workflow):
6867
"""Migration status is refreshed by deleting and showing new tables"""
69-
ctx = run_workflow(getattr(workflow, "refresh_migration_status"))
68+
ctx = run_workflow(getattr(workflow, "update_migration_status"))
7069
assert "TRUNCATE TABLE hive_metastore.ucx.migration_status" in ctx.sql_backend.queries
7170
assert "SHOW DATABASES" in ctx.sql_backend.queries
72-
# No "SHOW TABLE FROM" query as table are not mocked

0 commit comments

Comments
 (0)