Skip to content

Commit b249e86

Browse files
authored
Extend error handling of delta issues in crawlers and hive metastore (#795)
1 parent d1439dc commit b249e86

File tree

7 files changed

+101
-16
lines changed

7 files changed

+101
-16
lines changed

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88
from typing import Any, ClassVar, Generic, Protocol, TypeVar
99

1010
from databricks.sdk import WorkspaceClient
11-
from databricks.sdk.errors import BadRequest, NotFound, PermissionDenied, Unknown
11+
from databricks.sdk.errors import (
12+
BadRequest,
13+
DataLoss,
14+
NotFound,
15+
PermissionDenied,
16+
Unknown,
17+
)
1218

1319
from databricks.labs.ucx.mixins.sql import Row, StatementExecutionExt
1420

@@ -183,6 +189,10 @@ def _raise_spark_sql_exceptions(error_message: str):
183189
raise NotFound(error_message) from None
184190
elif "TABLE_OR_VIEW_NOT_FOUND" in error_message:
185191
raise NotFound(error_message) from None
192+
elif "DELTA_TABLE_NOT_FOUND" in error_message:
193+
raise NotFound(error_message) from None
194+
elif "DELTA_MISSING_TRANSACTION_LOG" in error_message:
195+
raise DataLoss(error_message) from None
186196
elif "PARSE_SYNTAX_ERROR" in error_message:
187197
raise BadRequest(error_message) from None
188198
elif "Operation not allowed" in error_message:

src/databricks/labs/ucx/hive_metastore/mapping.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,12 @@ def skip_table(self, schema: str, table: str):
102102
f"ALTER TABLE `{schema}`.`{table}` SET TBLPROPERTIES('{self.UCX_SKIP_PROPERTY}' = true)"
103103
)
104104
except NotFound as nf:
105-
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(nf):
105+
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(nf) or "[DELTA_TABLE_NOT_FOUND]" in str(nf):
106106
logger.error(f"Failed to apply skip marker for Table {schema}.{table}. Table not found.")
107107
else:
108-
logger.error(nf)
108+
logger.error(f"Failed to apply skip marker for Table {schema}.{table}: {nf!s}", exc_info=True)
109109
except BadRequest as br:
110-
logger.error(br)
110+
logger.error(f"Failed to apply skip marker for Table {schema}.{table}: {br!s}", exc_info=True)
111111

112112
def skip_schema(self, schema: str):
113113
# Marks a schema to be skipped in the migration process by applying a table property

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,10 +279,10 @@ def _move_table(
279279

280280
return True
281281
except NotFound as err:
282-
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err):
282+
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err) or "[DELTA_TABLE_NOT_FOUND]" in str(err):
283283
logger.error(f"Could not find table {from_table_name}. Table not found.")
284284
else:
285-
logger.error(err)
285+
logger.error(f"Failed to move table {from_table_name}: {err!s}", exc_info=True)
286286
return False
287287

288288
def _move_view(
@@ -320,5 +320,5 @@ def _move_view(
320320
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err):
321321
logger.error(f"Could not find view {from_view_name}. View not found.")
322322
else:
323-
logger.error(err)
323+
logger.error(f"Failed to move view {from_view_name}: {err!s}", exc_info=True)
324324
return False

src/databricks/labs/ucx/hive_metastore/table_size.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,13 @@ def _safe_get_table_size(self, table_full_name: str) -> int | None:
7070
try:
7171
return self._spark._jsparkSession.table(table_full_name).queryExecution().analyzed().stats().sizeInBytes()
7272
except Exception as e:
73-
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(e):
73+
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(e) or "[DELTA_TABLE_NOT_FOUND]" in str(e):
7474
logger.warning(f"Failed to evaluate {table_full_name} table size. Table not found.")
7575
return None
7676
if "[DELTA_MISSING_TRANSACTION_LOG]" in str(e):
7777
logger.warning(f"Delta table {table_full_name} is corrupted: missing transaction log.")
7878
return None
79-
raise RuntimeError(str(e)) from e
79+
except: # noqa: E722
80+
logger.error(f"Failed to evaluate {table_full_name} table size: ", exc_info=True)
81+
82+
return None

src/databricks/labs/ucx/mixins/sql.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from typing import Any
88

99
from databricks.sdk import errors
10-
from databricks.sdk.errors import NotFound
10+
from databricks.sdk.errors import DataLoss, NotFound
1111
from databricks.sdk.service.sql import (
1212
ColumnInfoTypeName,
1313
Disposition,
@@ -106,6 +106,10 @@ def _raise_if_needed(status: StatementStatus):
106106
raise NotFound(error_message)
107107
if "TABLE_OR_VIEW_NOT_FOUND" in error_message:
108108
raise NotFound(error_message)
109+
if "DELTA_TABLE_NOT_FOUND" in error_message:
110+
raise NotFound(error_message)
111+
if "DELTA_MISSING_TRANSACTION_LOG" in error_message:
112+
raise DataLoss(error_message)
109113
mapping = {
110114
ServiceErrorCode.ABORTED: errors.Aborted,
111115
ServiceErrorCode.ALREADY_EXISTS: errors.AlreadyExists,

tests/unit/framework/test_crawlers.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44
from unittest import mock
55

66
import pytest
7-
from databricks.sdk.errors import BadRequest, NotFound, PermissionDenied, Unknown
7+
from databricks.sdk.errors import (
8+
BadRequest,
9+
DataLoss,
10+
NotFound,
11+
PermissionDenied,
12+
Unknown,
13+
)
814
from databricks.sdk.service import sql
915

1016
from databricks.labs.ucx.framework.crawlers import (
@@ -260,6 +266,14 @@ def test_raise_spark_sql_exceptions(mocker):
260266
with pytest.raises(NotFound):
261267
rb._raise_spark_sql_exceptions(error_message_invalid_table)
262268

269+
error_message_invalid_table = "DELTA_TABLE_NOT_FOUND foo table does not exist"
270+
with pytest.raises(NotFound):
271+
rb._raise_spark_sql_exceptions(error_message_invalid_table)
272+
273+
error_message_invalid_table = "DELTA_MISSING_TRANSACTION_LOG foo table does not exist"
274+
with pytest.raises(DataLoss):
275+
rb._raise_spark_sql_exceptions(error_message_invalid_table)
276+
263277
error_message_invalid_syntax = "PARSE_SYNTAX_ERROR foo"
264278
with pytest.raises(BadRequest):
265279
rb._raise_spark_sql_exceptions(error_message_invalid_syntax)
@@ -293,6 +307,25 @@ def test_execute(mocker):
293307
with pytest.raises(NotFound):
294308
rb.execute(sql_query)
295309

310+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
311+
"DELTA_TABLE_NOT_FOUND"
312+
)
313+
with pytest.raises(NotFound):
314+
rb.execute(sql_query)
315+
316+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
317+
"Unexpected exception"
318+
)
319+
320+
with pytest.raises(BaseException): # noqa: B017
321+
rb.execute(sql_query)
322+
323+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
324+
"DELTA_MISSING_TRANSACTION_LOG"
325+
)
326+
with pytest.raises(DataLoss):
327+
rb.execute(sql_query)
328+
296329
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
297330
"PARSE_SYNTAX_ERROR"
298331
)
@@ -332,6 +365,18 @@ def test_fetch(mocker):
332365
with pytest.raises(NotFound):
333366
rb.fetch(sql_query)
334367

368+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
369+
"DELTA_TABLE_NOT_FOUND"
370+
)
371+
with pytest.raises(NotFound):
372+
rb.fetch(sql_query)
373+
374+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
375+
"DELTA_MISSING_TRANSACTION_LOG"
376+
)
377+
with pytest.raises(DataLoss):
378+
rb.fetch(sql_query)
379+
335380
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
336381
"PARSE_SYNTAX_ERROR"
337382
)

tests/unit/hive_metastore/test_table_size.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import sys
22

3-
import pytest
4-
53
from databricks.labs.ucx.hive_metastore.table_size import TableSize, TableSizeCrawler
64
from tests.unit.framework.mocks import MockBackend
75

@@ -38,7 +36,7 @@ def test_table_size_crawler(mocker):
3836
assert TableSize("hive_metastore", "db1", "table2", 200) in results
3937

4038

41-
def test_table_size_table_not_found_unknown_message(mocker):
39+
def test_table_size_unknown_error(mocker):
4240
errors = {}
4341
rows = {
4442
"table_size": [],
@@ -53,8 +51,9 @@ def test_table_size_table_not_found_unknown_message(mocker):
5351
tsc = TableSizeCrawler(backend, "inventory_database")
5452
tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.side_effect = Exception(...)
5553

56-
with pytest.raises(RuntimeError):
57-
tsc.snapshot()
54+
results = tsc.snapshot()
55+
56+
assert len(results) == 0
5857

5958

6059
def test_table_size_table_or_view_not_found(mocker):
@@ -81,6 +80,30 @@ def test_table_size_table_or_view_not_found(mocker):
8180
assert len(results) == 0
8281

8382

83+
def test_table_size_delta_table_not_found(mocker):
84+
errors = {}
85+
rows = {
86+
"table_size": [],
87+
"hive_metastore.inventory_database.tables": [
88+
("hive_metastore", "db1", "table1", "MANAGED", "DELTA", "dbfs:/location/table", None),
89+
],
90+
"SHOW DATABASES": [("db1",)],
91+
}
92+
backend = MockBackend(fails_on_first=errors, rows=rows)
93+
pyspark_sql_session = mocker.Mock()
94+
sys.modules["pyspark.sql.session"] = pyspark_sql_session
95+
tsc = TableSizeCrawler(backend, "inventory_database")
96+
97+
# table removed after crawling
98+
tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.side_effect = Exception(
99+
"[DELTA_TABLE_NOT_FOUND]"
100+
)
101+
102+
results = tsc.snapshot()
103+
104+
assert len(results) == 0
105+
106+
84107
def test_table_size_when_table_corrupted(mocker):
85108
errors = {}
86109
rows = {

0 commit comments

Comments
 (0)