Skip to content

Commit 74245ae

Browse files
authored
Handle Databricks errors during workspace listings in the table migration status refresher (#3378)
## Changes Handle Databricks errors during workspace listings in the table migration status refresher ### Linked issues Resolves #3262 ### Functionality - [x] modified existing workflow: `assessment` ### Tests - [x] added unit tests
1 parent b422e78 commit 74245ae

File tree

3 files changed

+87
-7
lines changed

3 files changed

+87
-7
lines changed

src/databricks/labs/ucx/hive_metastore/table_migration_status.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66

77
from databricks.labs.lsql.backends import SqlBackend
88
from databricks.sdk import WorkspaceClient
9-
from databricks.sdk.errors import NotFound
9+
from databricks.sdk.errors import DatabricksError, NotFound
10+
from databricks.sdk.service.catalog import CatalogInfo
1011

1112
from databricks.labs.ucx.framework.crawlers import CrawlerBase
1213
from databricks.labs.ucx.framework.utils import escape_sql_identifier
13-
from databricks.labs.ucx.hive_metastore import TablesCrawler
14+
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler
1415

1516
logger = logging.getLogger(__name__)
1617

@@ -93,9 +94,10 @@ def get_seen_tables(self) -> dict[str, str]:
9394
# ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception
9495
tables = list(self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name))
9596
except NotFound:
96-
logger.warning(
97-
f"Schema {schema.catalog_name}.{schema.name} no longer exists. Skipping checking its migration status."
98-
)
97+
logger.warning(f"Schema {schema.full_name} no longer exists. Skipping checking its migration status.")
98+
continue
99+
except DatabricksError as e:
100+
logger.warning(f"Error while listing tables in schema: {schema.full_name}", exc_info=e)
99101
continue
100102
for table in tables:
101103
if not table.properties:
@@ -153,10 +155,19 @@ def _try_fetch(self) -> Iterable[TableMigrationStatus]:
153155
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
154156
yield TableMigrationStatus(*row)
155157

158+
def _iter_catalogs(self) -> Iterable[CatalogInfo]:
159+
try:
160+
yield from self._ws.catalogs.list()
161+
except DatabricksError as e:
162+
logger.error("Cannot list catalogs", exc_info=e)
163+
156164
def _iter_schemas(self):
157-
for catalog in self._ws.catalogs.list():
165+
for catalog in self._iter_catalogs():
158166
try:
159167
yield from self._ws.schemas.list(catalog_name=catalog.name)
160168
except NotFound:
161169
logger.warning(f"Catalog {catalog.name} no longer exists. Skipping checking its migration status.")
162170
continue
171+
except DatabricksError as e:
172+
logger.warning(f"Error while listing schemas in catalog: {catalog.name}", exc_info=e)
173+
continue

tests/unit/hive_metastore/test_table_migrate.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,10 @@ def test_table_status_seen_tables(caplog):
10441044
client = create_autospec(WorkspaceClient)
10451045
client.catalogs.list.return_value = [CatalogInfo(name="cat1"), CatalogInfo(name="deleted_cat")]
10461046
client.schemas.list.side_effect = [
1047-
[SchemaInfo(catalog_name="cat1", name="schema1"), SchemaInfo(catalog_name="cat1", name="deleted_schema")],
1047+
[
1048+
SchemaInfo(catalog_name="cat1", name="schema1", full_name="cat1.schema1"),
1049+
SchemaInfo(catalog_name="cat1", name="deleted_schema", full_name="cat1.deleted_schema"),
1050+
],
10481051
NotFound(),
10491052
]
10501053
client.tables.list.side_effect = [
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from unittest.mock import create_autospec
2+
3+
import pytest
4+
from databricks.sdk import WorkspaceClient
5+
from databricks.sdk.errors import BadRequest, DatabricksError, NotFound
6+
from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo
7+
8+
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler
9+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher
10+
11+
12+
def test_table_migration_status_refresher_get_seen_tables_handles_errors_on_catalogs_list(mock_backend) -> None:
13+
ws = create_autospec(WorkspaceClient)
14+
ws.catalogs.list.side_effect = BadRequest()
15+
tables_crawler = create_autospec(TablesCrawler)
16+
17+
refresher = TableMigrationStatusRefresher(ws, mock_backend, "test", tables_crawler)
18+
19+
seen_tables = refresher.get_seen_tables()
20+
21+
assert not seen_tables
22+
ws.catalogs.list.assert_called_once()
23+
ws.schemas.list.assert_not_called()
24+
ws.tables.list.assert_not_called()
25+
tables_crawler.snapshot.assert_not_called()
26+
27+
28+
@pytest.mark.parametrize("error", [BadRequest(), NotFound()])
29+
def test_table_migration_status_refresher_get_seen_tables_handles_errors_on_schemas_list(
30+
mock_backend, error: DatabricksError
31+
) -> None:
32+
ws = create_autospec(WorkspaceClient)
33+
ws.catalogs.list.return_value = [CatalogInfo(name="test")]
34+
ws.schemas.list.side_effect = error
35+
tables_crawler = create_autospec(TablesCrawler)
36+
37+
refresher = TableMigrationStatusRefresher(ws, mock_backend, "test", tables_crawler)
38+
39+
seen_tables = refresher.get_seen_tables()
40+
41+
assert not seen_tables
42+
ws.catalogs.list.assert_called_once()
43+
ws.schemas.list.assert_called_once()
44+
ws.tables.list.assert_not_called()
45+
tables_crawler.snapshot.assert_not_called()
46+
47+
48+
@pytest.mark.parametrize("error", [BadRequest(), NotFound()])
49+
def test_table_migration_status_refresher_get_seen_tables_handles_errors_on_tables_list(
50+
mock_backend, error: DatabricksError
51+
) -> None:
52+
ws = create_autospec(WorkspaceClient)
53+
ws.catalogs.list.return_value = [CatalogInfo(name="test")]
54+
ws.schemas.list.return_value = [SchemaInfo(catalog_name="test", name="test")]
55+
ws.tables.list.side_effect = error
56+
tables_crawler = create_autospec(TablesCrawler)
57+
58+
refresher = TableMigrationStatusRefresher(ws, mock_backend, "test", tables_crawler)
59+
60+
seen_tables = refresher.get_seen_tables()
61+
62+
assert not seen_tables
63+
ws.catalogs.list.assert_called_once()
64+
ws.schemas.list.assert_called_once()
65+
ws.tables.list.assert_called_once()
66+
tables_crawler.snapshot.assert_not_called()

0 commit comments

Comments
 (0)