Skip to content

Commit 2532625

Browse files
authored
Improved error handling for all queries executed during table migration (#1679)
## Changes <!-- Summary of your changes that are easy to understand. Add screenshots when necessary --> - As we are migrating multiple tables & views in parallel, any error in SQL execution should not fail the workflow. Added error handling to all `sql_backend.execute` commands ### Linked issues <!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved. See https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword --> Resolves #1676 ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [x] manually tested - [x] added unit tests - [x] verified on staging environment (screenshot attached)
1 parent 636b903 commit 2532625

File tree

2 files changed

+146
-24
lines changed

2 files changed

+146
-24
lines changed

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
ViewToMigrate,
3030
)
3131
from databricks.labs.ucx.workspace_access.groups import GroupManager, MigratedGroup
32-
from databricks.sdk.errors.platform import BadRequest, NotFound
32+
from databricks.sdk.errors.platform import DatabricksError
3333

3434
logger = logging.getLogger(__name__)
3535

@@ -210,9 +210,15 @@ def _view_can_be_migrated(self, view: ViewToMigrate):
210210
def _migrate_view_table(self, src_view: ViewToMigrate, grants: list[Grant] | None = None):
211211
view_migrate_sql = self._sql_migrate_view(src_view)
212212
logger.debug(f"Migrating view {src_view.src.key} to using SQL query: {view_migrate_sql}")
213-
self._backend.execute(view_migrate_sql)
214-
self._backend.execute(src_view.src.sql_alter_to(src_view.rule.as_uc_table_key))
215-
self._backend.execute(src_view.src.sql_alter_from(src_view.rule.as_uc_table_key, self._ws.get_workspace_id()))
213+
try:
214+
self._backend.execute(view_migrate_sql)
215+
self._backend.execute(src_view.src.sql_alter_to(src_view.rule.as_uc_table_key))
216+
self._backend.execute(
217+
src_view.src.sql_alter_from(src_view.rule.as_uc_table_key, self._ws.get_workspace_id())
218+
)
219+
except DatabricksError as e:
220+
logger.warning(f"Failed to migrate view {src_view.src.key} to {src_view.rule.as_uc_table_key}: {e}")
221+
return False
216222
return self._migrate_acl(src_view.src, src_view.rule, grants)
217223

218224
def _sql_migrate_view(self, src_view: ViewToMigrate) -> str:
@@ -231,7 +237,8 @@ def _migrate_external_table(self, src_table: Table, rule: Rule, grants: list[Gra
231237
sync_result = next(iter(self._backend.fetch(table_migrate_sql)))
232238
if sync_result.status_code != "SUCCESS":
233239
logger.warning(
234-
f"SYNC command failed to migrate {src_table.key} to {target_table_key}. Status code: {sync_result.status_code}. Description: {sync_result.description}"
240+
f"SYNC command failed to migrate table {src_table.key} to {target_table_key}. "
241+
f"Status code: {sync_result.status_code}. Description: {sync_result.description}"
235242
)
236243
return False
237244
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
@@ -271,9 +278,13 @@ def _migrate_external_table_hiveserde_in_place(
271278
logger.debug(
272279
f"Migrating external table {src_table.key} to {rule.as_uc_table_key} using SQL query: {table_migrate_sql}"
273280
)
274-
self._backend.execute(table_migrate_sql)
275-
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
276-
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
281+
try:
282+
self._backend.execute(table_migrate_sql)
283+
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
284+
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
285+
except DatabricksError as e:
286+
logger.warning(f"Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}")
287+
return False
277288
return self._migrate_acl(src_table, rule, grants)
278289

279290
def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
@@ -282,9 +293,13 @@ def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule, grants: list[Gr
282293
logger.debug(
283294
f"Migrating managed table {src_table.key} to {rule.as_uc_table_key} using SQL query: {table_migrate_sql}"
284295
)
285-
self._backend.execute(table_migrate_sql)
286-
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
287-
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
296+
try:
297+
self._backend.execute(table_migrate_sql)
298+
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
299+
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
300+
except DatabricksError as e:
301+
logger.warning(f"Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}")
302+
return False
288303
return self._migrate_acl(src_table, rule, grants)
289304

290305
def _migrate_table_create_ctas(self, src_table: Table, rule: Rule, grants: list[Grant], mounts: list[Mount]):
@@ -299,9 +314,13 @@ def _migrate_table_create_ctas(self, src_table: Table, rule: Rule, grants: list[
299314
dst_table_location = ExternalLocations.resolve_mount(src_table.location, mounts) + "_ctas_migrated"
300315
table_migrate_sql = src_table.sql_migrate_ctas_external(rule.as_uc_table_key, dst_table_location)
301316
logger.debug(f"Migrating table {src_table.key} to {rule.as_uc_table_key} using SQL query: {table_migrate_sql}")
302-
self._backend.execute(table_migrate_sql)
303-
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
304-
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
317+
try:
318+
self._backend.execute(table_migrate_sql)
319+
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
320+
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
321+
except DatabricksError as e:
322+
logger.warning(f"Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}")
323+
return False
305324
return self._migrate_acl(src_table, rule, grants)
306325

307326
def _migrate_acl(self, src: Table, rule: Rule, grants: list[Grant] | None):
@@ -315,7 +334,7 @@ def _migrate_acl(self, src: Table, rule: Rule, grants: list[Grant] | None):
315334
logger.debug(f"Migrating acls on {rule.as_uc_table_key} using SQL query: {acl_migrate_sql}")
316335
try:
317336
self._backend.execute(acl_migrate_sql)
318-
except (BadRequest, NotFound) as e:
337+
except DatabricksError as e:
319338
logger.warning(f"Failed to migrate ACL for {src.key} to {rule.as_uc_table_key}: {e}")
320339
return True
321340

@@ -360,8 +379,11 @@ def _revert_migrated_table(self, table: Table, target_table_key: str):
360379
logger.info(
361380
f"Reverting {table.object_type} table {table.database}.{table.name} upgraded_to {table.upgraded_to}"
362381
)
363-
self._backend.execute(table.sql_unset_upgraded_to())
364-
self._backend.execute(f"DROP {table.kind} IF EXISTS {target_table_key}")
382+
try:
383+
self._backend.execute(table.sql_unset_upgraded_to())
384+
self._backend.execute(f"DROP {table.kind} IF EXISTS {target_table_key}")
385+
except DatabricksError as e:
386+
logger.warning(f"Failed to revert table {table.key}: {e}")
365387

366388
def _get_revert_count(self, schema: str | None = None, table: str | None = None) -> list[MigrationCount]:
367389
self._init_seen_tables()

tests/unit/hive_metastore/test_table_migrate.py

Lines changed: 107 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141

4242
@pytest.fixture
43-
def ws() -> WorkspaceClient:
43+
def ws():
4444
client = create_autospec(WorkspaceClient)
4545
client.get_workspace_id.return_value = "12345"
4646
return client
@@ -230,7 +230,7 @@ def test_migrate_external_table_failed_sync(ws, caplog):
230230

231231

232232
@pytest.mark.parametrize(
233-
'hiveserde_in_place_migrate, describe, ddl, migrated, expected_value',
233+
'hiveserde_in_place_migrate, describe, ddl, errors, migrated, expected_value',
234234
[
235235
# test migrate parquet hiveserde table in place
236236
(
@@ -245,6 +245,7 @@ def test_migrate_external_table_failed_sync(ws, caplog):
245245
"CREATE TABLE hive_metastore.schema.test_parquet (id INT) USING PARQUET LOCATION 'dbfs:/mnt/test/table1'"
246246
),
247247
],
248+
{},
248249
True,
249250
"CREATE TABLE ucx_default.db1_dst.external_dst (id INT) USING PARQUET LOCATION 's3://test/folder/table1'",
250251
),
@@ -253,6 +254,7 @@ def test_migrate_external_table_failed_sync(ws, caplog):
253254
True,
254255
MockBackend.rows("col_name", "data_type", "comment")[("dummy", "dummy", None)],
255256
MockBackend.rows("createtab_stmt")[("dummy"),],
257+
{},
256258
False,
257259
"hive_metastore.db1_src.external_src table can only be migrated using CTAS.",
258260
),
@@ -265,6 +267,7 @@ def test_migrate_external_table_failed_sync(ws, caplog):
265267
("OutputFormat", "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", None),
266268
],
267269
MockBackend.rows("createtab_stmt")[("dummy"),],
270+
{},
268271
False,
269272
"Failed to generate in-place migration DDL for hive_metastore.db1_src.external_src, skip the in-place migration. It can be migrated in CTAS workflow",
270273
),
@@ -273,20 +276,39 @@ def test_migrate_external_table_failed_sync(ws, caplog):
273276
False,
274277
None,
275278
None,
279+
{},
276280
False,
277281
"",
278282
),
283+
# test failed migration
284+
(
285+
True,
286+
MockBackend.rows("col_name", "data_type", "comment")[
287+
("Serde Library", "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", None),
288+
("InputFormat", "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", None),
289+
("OutputFormat", "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", None),
290+
],
291+
MockBackend.rows("createtab_stmt")[
292+
(
293+
"CREATE TABLE hive_metastore.schema.test_parquet (id INT) USING PARQUET LOCATION 'dbfs:/mnt/test/table1'"
294+
),
295+
],
296+
{"CREATE TABLE ucx": "error"},
297+
False,
298+
"Failed to migrate table hive_metastore.db1_src.external_src to ucx_default.db1_dst.external_dst: error",
299+
),
279300
],
280301
)
281302
def test_migrate_external_hiveserde_table_in_place(
282-
ws, caplog, hiveserde_in_place_migrate, describe, ddl, migrated, expected_value
303+
ws, caplog, hiveserde_in_place_migrate, describe, ddl, errors, migrated, expected_value
283304
):
284305
caplog.set_level(logging.INFO)
285306
backend = MockBackend(
286307
rows={
287308
"DESCRIBE TABLE EXTENDED *": describe,
288309
"SHOW CREATE TABLE *": ddl,
289-
}
310+
},
311+
fails_on_first=errors,
290312
)
291313
table_crawler = TablesCrawler(backend, "inventory_database")
292314
udf_crawler = UdfsCrawler(backend, "inventory_database")
@@ -632,7 +654,7 @@ def get_table_migrator(backend: SqlBackend) -> TablesMigrator:
632654
return table_migrate
633655

634656

635-
def test_revert_migrated_tables_skip_managed(ws):
657+
def test_revert_migrated_tables_skip_managed():
636658
errors = {}
637659
rows = {}
638660
backend = MockBackend(fails_on_first=errors, rows=rows)
@@ -651,7 +673,7 @@ def test_revert_migrated_tables_skip_managed(ws):
651673
assert "DROP VIEW IF EXISTS cat1.schema1.dest_view1" in revert_queries
652674

653675

654-
def test_revert_migrated_tables_including_managed(ws):
676+
def test_revert_migrated_tables_including_managed():
655677
errors = {}
656678
rows = {}
657679
backend = MockBackend(fails_on_first=errors, rows=rows)
@@ -682,7 +704,6 @@ def test_no_migrated_tables(ws):
682704
backend = MockBackend(fails_on_first=errors, rows=rows)
683705
table_crawler = create_autospec(TablesCrawler)
684706
grant_crawler = create_autospec(GrantsCrawler)
685-
ws = create_autospec(WorkspaceClient)
686707
table_mapping = create_autospec(TableMapping)
687708
table_mapping.load.return_value = [
688709
Rule("workspace", "catalog_1", "db1", "db1", "managed", "managed"),
@@ -1144,3 +1165,82 @@ def test_migrate_views_should_be_properly_sequenced(ws):
11441165
assert table_keys.index("hive_metastore.db1_src.v1_src") > table_keys.index("hive_metastore.db1_src.v3_src")
11451166
assert table_keys.index("hive_metastore.db1_src.v3_src") > table_keys.index("hive_metastore.db1_src.v2_src")
11461167
assert next((key for key in table_keys if key == "hive_metastore.db1_src.t1_src"), None) is None
1168+
1169+
1170+
def test_migrate_view_failed(ws, caplog):
1171+
errors = {"CREATE OR REPLACE VIEW": "error"}
1172+
create = "CREATE OR REPLACE VIEW hive_metastore.db1_src.view_src (a,b) AS SELECT * FROM db1_src.managed_dbfs"
1173+
rows = {"SHOW CREATE TABLE": [{"createtab_stmt": create}]}
1174+
backend = MockBackend(fails_on_first=errors, rows=rows)
1175+
table_crawler = TablesCrawler(backend, "inventory_database")
1176+
udf_crawler = UdfsCrawler(backend, "inventory_database")
1177+
grant_crawler = GrantsCrawler(table_crawler, udf_crawler)
1178+
table_mapping = table_mapping_mock(["managed_dbfs", "view"])
1179+
group_manager = GroupManager(backend, ws, "inventory_database")
1180+
migration_status_refresher = create_autospec(MigrationStatusRefresher)
1181+
migration_status_refresher.get_seen_tables.return_value = {
1182+
"ucx_default.db1_dst.managed_dbfs": "hive_metastore.db1_src.managed_dbfs"
1183+
}
1184+
migration_index = MigrationIndex(
1185+
[
1186+
MigrationStatus("db1_src", "managed_dbfs", "ucx_default", "db1_dst", "new_managed_dbfs"),
1187+
MigrationStatus("db1_src", "view_src", "ucx_default", "db1_dst", "view_dst"),
1188+
]
1189+
)
1190+
migration_status_refresher.index.return_value = migration_index
1191+
principal_grants = create_autospec(PrincipalACL)
1192+
table_migrate = TablesMigrator(
1193+
table_crawler,
1194+
grant_crawler,
1195+
ws,
1196+
backend,
1197+
table_mapping,
1198+
group_manager,
1199+
migration_status_refresher,
1200+
principal_grants,
1201+
)
1202+
table_migrate.migrate_tables(what=What.VIEW)
1203+
1204+
assert (
1205+
"Failed to migrate view hive_metastore.db1_src.view_src to ucx_default.db1_dst.view_dst: error" in caplog.text
1206+
)
1207+
principal_grants.get_interactive_cluster_grants.assert_not_called()
1208+
1209+
1210+
def test_migrate_dbfs_root_tables_failed(ws, caplog):
1211+
errors = {"CREATE TABLE IF NOT EXISTS": "error"}
1212+
backend = MockBackend(fails_on_first=errors, rows={})
1213+
table_crawler = TablesCrawler(backend, "inventory_database")
1214+
udf_crawler = UdfsCrawler(backend, "inventory_database")
1215+
grant_crawler = GrantsCrawler(table_crawler, udf_crawler)
1216+
table_mapping = table_mapping_mock(["managed_dbfs"])
1217+
group_manager = GroupManager(backend, ws, "inventory_database")
1218+
migration_status_refresher = MigrationStatusRefresher(ws, backend, "inventory_database", table_crawler)
1219+
principal_grants = create_autospec(PrincipalACL)
1220+
table_migrate = TablesMigrator(
1221+
table_crawler,
1222+
grant_crawler,
1223+
ws,
1224+
backend,
1225+
table_mapping,
1226+
group_manager,
1227+
migration_status_refresher,
1228+
principal_grants,
1229+
)
1230+
table_migrate.migrate_tables(what=What.DBFS_ROOT_DELTA)
1231+
1232+
principal_grants.get_interactive_cluster_grants.assert_not_called()
1233+
1234+
assert (
1235+
"Failed to migrate table hive_metastore.db1_src.managed_dbfs to ucx_default.db1_dst.managed_dbfs: error"
1236+
in caplog.text
1237+
)
1238+
1239+
1240+
def test_revert_migrated_tables_failed(caplog):
1241+
errors = {"ALTER TABLE": "error"}
1242+
rows = {}
1243+
backend = MockBackend(fails_on_first=errors, rows=rows)
1244+
table_migrate = get_table_migrator(backend)
1245+
table_migrate.revert_migrated_tables(schema="test_schema1")
1246+
assert "Failed to revert table hive_metastore.test_schema1.test_table1: error" in caplog.text

0 commit comments

Comments
 (0)