Skip to content

Commit 06a3539

Browse files
authored
Added databricks labs ucx skip --schema ... --table ... command to mark table/schema for skipping in the table migration process (#680)
closes #672
1 parent 46fad99 commit 06a3539

File tree

5 files changed

+132
-2
lines changed

5 files changed

+132
-2
lines changed

labs.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ commands:
2626
{{range .}}{{.user_name}}\t{{.database}}\t{{.warehouse_id}}
2727
{{end}}
2828
29+
- name: skip
30+
description: Create a skip comment on a schema or a table
31+
flags:
32+
- name: schema
33+
description: Schema Name to Skip
34+
- name: table
35+
description: Table Name to Skip
36+
2937
- name: sync-workspace-info
3038
is_account_level: true
3139
description: upload workspace config to all workspaces in the account where ucx is installed

src/databricks/labs/ucx/cli.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import webbrowser
55

66
from databricks.sdk import WorkspaceClient
7+
from databricks.sdk.errors import NotFound
78

89
from databricks.labs.ucx.account import AccountWorkspaces, WorkspaceInfo
910
from databricks.labs.ucx.config import AccountConfig, ConnectConfig
@@ -40,6 +41,31 @@ def list_installations():
4041
print(json.dumps(all_users))
4142

4243

44+
def skip(schema: str, table: str | None = None):
45+
logger.info("Running skip command")
46+
if not schema:
47+
logger.error("--Schema is a required parameter.")
48+
return None
49+
ws = WorkspaceClient()
50+
installation_manager = WorkspaceInstaller(ws)
51+
logger.info("Fetching installation config.")
52+
try:
53+
warehouse_id = installation_manager._current_config.warehouse_id
54+
sql_backend = StatementExecutionBackend(ws, warehouse_id)
55+
except NotFound:
56+
logger.error(
57+
"Couldn't find UCX configuration in the user's home folder. "
58+
"Make sure the current user has configured and installed UCX."
59+
)
60+
return None
61+
62+
mapping = TableMapping(ws)
63+
if table:
64+
mapping.skip_table(sql_backend, schema, table)
65+
else:
66+
mapping.skip_schema(sql_backend, schema)
67+
68+
4369
def sync_workspace_info():
4470
workspaces = AccountWorkspaces(AccountConfig(connect=ConnectConfig()))
4571
workspaces.sync_workspace_info()
@@ -71,6 +97,7 @@ def create_table_mapping():
7197
"sync-workspace-info": sync_workspace_info,
7298
"manual-workspace-info": manual_workspace_info,
7399
"create-table-mapping": create_table_mapping,
100+
"skip": skip,
74101
}
75102

76103

@@ -86,7 +113,6 @@ def main(raw):
86113
log_level = "info"
87114
databricks_logger = logging.getLogger("databricks")
88115
databricks_logger.setLevel(log_level.upper())
89-
90116
kwargs = {k.replace("-", "_"): v for k, v in flags.items()}
91117
MAPPING[command](**kwargs)
92118

src/databricks/labs/ucx/hive_metastore/mapping.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
import csv
22
import dataclasses
33
import io
4+
import logging
45
import re
56
from dataclasses import dataclass
67

78
from databricks.sdk import WorkspaceClient
8-
from databricks.sdk.errors import NotFound
9+
from databricks.sdk.errors import BadRequest, NotFound
910
from databricks.sdk.service.workspace import ImportFormat
1011

1112
from databricks.labs.ucx.account import WorkspaceInfo
13+
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
1214
from databricks.labs.ucx.hive_metastore import TablesCrawler
1315
from databricks.labs.ucx.hive_metastore.tables import Table
1416

17+
logger = logging.getLogger(__name__)
18+
1519

1620
@dataclass
1721
class Rule:
@@ -35,6 +39,8 @@ def initial(cls, workspace_name: str, catalog_name: str, table: Table) -> "Rule"
3539

3640

3741
class TableMapping:
42+
UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip"
43+
3844
def __init__(self, ws: WorkspaceClient, folder: str | None = None):
3945
if not folder:
4046
folder = f"/Users/{ws.current_user.me().user_name}/.ucx"
@@ -76,3 +82,27 @@ def load(self) -> list[Rule]:
7682
except NotFound:
7783
msg = "Please run: databricks labs ucx table-mapping"
7884
raise ValueError(msg) from None
85+
86+
def skip_table(self, backend: StatementExecutionBackend, schema: str, table: str):
87+
# Marks a table to be skipped in the migration process by applying a table property
88+
try:
89+
backend.execute(f"ALTER TABLE `{schema}`.`{table}` SET TBLPROPERTIES('{self.UCX_SKIP_PROPERTY}' = true)")
90+
except NotFound as nf:
91+
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(nf):
92+
logger.error(f"Failed to apply skip marker for Table {schema}.{table}. Table not found.")
93+
else:
94+
logger.error(nf)
95+
except BadRequest as br:
96+
logger.error(br)
97+
98+
def skip_schema(self, backend: StatementExecutionBackend, schema: str):
99+
# Marks a schema to be skipped in the migration process by applying a table property
100+
try:
101+
backend.execute(f"ALTER SCHEMA `{schema}` SET DBPROPERTIES('{self.UCX_SKIP_PROPERTY}' = true)")
102+
except NotFound as nf:
103+
if "[SCHEMA_NOT_FOUND]" in str(nf):
104+
logger.error(f"Failed to apply skip marker for Schema {schema}. Schema not found.")
105+
else:
106+
logger.error(nf)
107+
except BadRequest as br:
108+
logger.error(br)

tests/unit/hive_metastore/test_mapping.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,35 @@ def test_load_mapping():
9999
dst_table="bar",
100100
)
101101
] == rules
102+
103+
104+
def test_skip_happy_path(mocker, caplog):
105+
ws = mocker.patch("databricks.sdk.WorkspaceClient.__init__")
106+
sbe = mocker.patch("databricks.labs.ucx.framework.crawlers.StatementExecutionBackend.__init__")
107+
mapping = TableMapping(ws)
108+
mapping.skip_table(sbe, schema="schema", table="table")
109+
sbe.execute.assert_called_with(
110+
f"ALTER TABLE `schema`.`table` SET TBLPROPERTIES('{mapping.UCX_SKIP_PROPERTY}' = true)"
111+
)
112+
assert len(caplog.records) == 0
113+
mapping.skip_schema(sbe, schema="schema")
114+
sbe.execute.assert_called_with(f"ALTER SCHEMA `schema` SET DBPROPERTIES('{mapping.UCX_SKIP_PROPERTY}' = true)")
115+
assert len(caplog.records) == 0
116+
117+
118+
def test_skip_missing_schema(mocker, caplog):
119+
ws = mocker.patch("databricks.sdk.WorkspaceClient.__init__")
120+
sbe = mocker.patch("databricks.labs.ucx.framework.crawlers.StatementExecutionBackend.__init__")
121+
sbe.execute.side_effect = NotFound("[SCHEMA_NOT_FOUND]")
122+
mapping = TableMapping(ws)
123+
mapping.skip_schema(sbe, schema="schema")
124+
assert [rec.message for rec in caplog.records if "schema not found" in rec.message.lower()]
125+
126+
127+
def test_skip_missing_table(mocker, caplog):
128+
ws = mocker.patch("databricks.sdk.WorkspaceClient.__init__")
129+
sbe = mocker.patch("databricks.labs.ucx.framework.crawlers.StatementExecutionBackend.__init__")
130+
sbe.execute.side_effect = NotFound("[TABLE_OR_VIEW_NOT_FOUND]")
131+
mapping = TableMapping(ws)
132+
mapping.skip_table(sbe, schema="schema", table="table")
133+
assert [rec.message for rec in caplog.records if "table not found" in rec.message.lower()]

tests/unit/test_cli.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import pytest
2+
from databricks.sdk.errors import NotFound
3+
from databricks.sdk.service import iam
4+
from databricks.sdk.service.iam import User
5+
6+
from databricks.labs.ucx.cli import skip
7+
8+
9+
@pytest.fixture
10+
def ws(mocker):
11+
ws = mocker.patch("databricks.sdk.WorkspaceClient.__init__")
12+
ws.current_user.me = lambda: iam.User(user_name="[email protected]", groups=[iam.ComplexValue(display="admins")])
13+
ws.return_value = None
14+
15+
16+
def test_skip_no_schema(mocker, caplog):
17+
ws = mocker.patch("databricks.sdk.WorkspaceClient.__init__")
18+
ws.users.list.return_value = [User(user_name="foo")]
19+
ws.workspace.download.side_effect = NotFound(...)
20+
skip(schema=None, table="table")
21+
assert [rec.message for rec in caplog.records if "schema" in rec.message.lower()]
22+
23+
24+
def test_skip_no_ucx(caplog, mocker):
25+
mocker.patch("databricks.sdk.WorkspaceClient.__init__", return_value=None)
26+
mocker.patch("databricks.labs.ucx.install.WorkspaceInstaller.__init__", return_value=None)
27+
mocker.patch("databricks.labs.ucx.install.WorkspaceInstaller._current_config", return_value="foo")
28+
mocker.patch(
29+
"databricks.labs.ucx.framework.crawlers.StatementExecutionBackend.__init__",
30+
return_value=None,
31+
side_effect=NotFound("..."),
32+
)
33+
skip(schema="schema", table="table")
34+
assert [rec.message for rec in caplog.records if "UCX configuration" in rec.message]

0 commit comments

Comments
 (0)