Skip to content

Commit ff10e67

Browse files
HariGS-DBFastLeemwojtyczkaprajin-29nfx
authored
Added databricks labs ucx move command (#756)
closes #675 --------- Co-authored-by: Liran Bareket <[email protected]> Co-authored-by: Marcin Wojtyczka <[email protected]> Co-authored-by: prajin-29 <[email protected]> Co-authored-by: Serge Smertin <[email protected]>
1 parent e9c7f5d commit ff10e67

File tree

6 files changed

+439
-3
lines changed

6 files changed

+439
-3
lines changed

labs.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,17 @@ commands:
6666
- name: delete_managed
6767
description: Revert and delete managed tables
6868

69+
70+
- name: move
71+
description: move tables across schema/catalog withing a UC metastore
72+
flags:
73+
- name: from-catalog
74+
description: from catalog name
75+
- name: from-schema
76+
description: schema name to migrate.
77+
- name: from-table
78+
description: table names to migrate. enter * to migrate all tables
79+
- name: to-catalog
80+
description: target catalog to migrate schema to
81+
- name: to-schema
82+
description: target schema to migrate tables to

src/databricks/labs/ucx/cli.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
1212
from databricks.labs.ucx.hive_metastore import ExternalLocations, TablesCrawler
1313
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
14-
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate
14+
from databricks.labs.ucx.hive_metastore.table_migrate import TableMove, TablesMigrate
1515
from databricks.labs.ucx.install import WorkspaceInstaller
1616
from databricks.labs.ucx.installer import InstallationManager
1717

@@ -165,5 +165,38 @@ def revert_migrated_tables(w: WorkspaceClient, schema: str, table: str, *, delet
165165
tm.revert_migrated_tables(schema, table, delete_managed=delete_managed)
166166

167167

168+
@ucx.command
169+
def move(
170+
w: WorkspaceClient,
171+
from_catalog: str,
172+
from_schema: str,
173+
from_table: str,
174+
to_catalog: str,
175+
to_schema: str,
176+
):
177+
"""move a uc table/tables from one schema to another schema in same or different catalog"""
178+
logger.info("Running move command")
179+
prompts = Prompts()
180+
installation_manager = InstallationManager(w)
181+
installation = installation_manager.for_user(w.current_user.me())
182+
if not installation:
183+
logger.error(CANT_FIND_UCX_MSG)
184+
return
185+
sql_backend = StatementExecutionBackend(w, installation.config.warehouse_id)
186+
tables = TableMove(w, sql_backend)
187+
if from_catalog == "" or to_catalog == "":
188+
logger.error("Please enter from_catalog and to_catalog details")
189+
return
190+
if from_schema == "" or to_schema == "" or from_table == "":
191+
logger.error("Please enter from_schema, to_schema and from_table(enter * for migrating all tables) details.")
192+
return
193+
if from_catalog == to_catalog and from_schema == to_schema:
194+
logger.error("please select a different schema or catalog to migrate to")
195+
return
196+
del_table = prompts.confirm(f"should we delete tables/view after moving to new schema {to_catalog}.{to_schema}")
197+
logger.info(f"migrating tables {from_table} from {from_catalog}.{from_schema} to {to_catalog}.{to_schema}")
198+
tables.move_tables(from_catalog, from_schema, from_table, to_catalog, to_schema, del_table)
199+
200+
168201
if "__main__" == __name__:
169202
ucx()

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from databricks.labs.blueprint.parallel import Threads
66
from databricks.sdk import WorkspaceClient
7+
from databricks.sdk.errors import NotFound
8+
from databricks.sdk.service.catalog import PermissionsChange, SecurableType, TableType
79

810
from databricks.labs.ucx.framework.crawlers import SqlBackend
911
from databricks.labs.ucx.hive_metastore import TablesCrawler
@@ -182,3 +184,135 @@ def print_revert_report(self, *, delete_managed: bool) -> bool | None:
182184
print("Migrated Manged Tables (targets) will be left intact.")
183185
print("To revert and delete Migrated Tables, add --delete_managed true flag to the command.")
184186
return True
187+
188+
189+
class TableMove:
190+
def __init__(self, ws: WorkspaceClient, backend: SqlBackend):
191+
self._backend = backend
192+
self._ws = ws
193+
194+
def move_tables(
195+
self,
196+
from_catalog: str,
197+
from_schema: str,
198+
from_table: str,
199+
to_catalog: str,
200+
to_schema: str,
201+
del_table: bool, # noqa: FBT001
202+
):
203+
try:
204+
self._ws.schemas.get(f"{from_catalog}.{from_schema}")
205+
except NotFound:
206+
logger.error(f"schema {from_schema} not found in catalog {from_catalog}, enter correct schema details.")
207+
return
208+
try:
209+
self._ws.schemas.get(f"{to_catalog}.{to_schema}")
210+
except NotFound:
211+
logger.warning(f"schema {to_schema} not found in {to_catalog}, creating...")
212+
self._ws.schemas.create(to_schema, to_catalog)
213+
214+
tables = self._ws.tables.list(from_catalog, from_schema)
215+
table_tasks = []
216+
view_tasks = []
217+
filtered_tables = [table for table in tables if from_table in [table.name, "*"]]
218+
for table in filtered_tables:
219+
try:
220+
self._ws.tables.get(f"{to_catalog}.{to_schema}.{table.name}")
221+
logger.warning(
222+
f"table {from_table} already present in {from_catalog}.{from_schema}. skipping this table..."
223+
)
224+
continue
225+
except NotFound:
226+
if table.table_type and table.table_type in (TableType.EXTERNAL, TableType.MANAGED):
227+
table_tasks.append(
228+
partial(
229+
self._move_table, from_catalog, from_schema, table.name, to_catalog, to_schema, del_table
230+
)
231+
)
232+
else:
233+
view_tasks.append(
234+
partial(
235+
self._move_view,
236+
from_catalog,
237+
from_schema,
238+
table.name,
239+
to_catalog,
240+
to_schema,
241+
del_table,
242+
table.view_definition,
243+
)
244+
)
245+
Threads.strict("creating tables", table_tasks)
246+
logger.info(f"moved {len(list(table_tasks))} tables to the new schema {to_schema}.")
247+
Threads.strict("creating views", view_tasks)
248+
logger.info(f"moved {len(list(view_tasks))} views to the new schema {to_schema}.")
249+
250+
def _move_table(
251+
self,
252+
from_catalog: str,
253+
from_schema: str,
254+
from_table: str,
255+
to_catalog: str,
256+
to_schema: str,
257+
del_table: bool, # noqa: FBT001
258+
) -> bool:
259+
from_table_name = f"{from_catalog}.{from_schema}.{from_table}"
260+
to_table_name = f"{to_catalog}.{to_schema}.{from_table}"
261+
try:
262+
create_sql = str(next(self._backend.fetch(f"SHOW CREATE TABLE {from_table_name}"))[0])
263+
create_table_sql = create_sql.replace(f"CREATE TABLE {from_table_name}", f"CREATE TABLE {to_table_name}")
264+
logger.debug(f"Creating table {to_table_name}.")
265+
self._backend.execute(create_table_sql)
266+
grants = self._ws.grants.get(SecurableType.TABLE, from_table_name)
267+
if grants.privilege_assignments is None:
268+
return True
269+
grants_changes = [
270+
PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments
271+
]
272+
self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes)
273+
if del_table:
274+
logger.info(f"dropping source table {from_table_name}")
275+
drop_sql = f"DROP TABLE {from_table_name}"
276+
self._backend.execute(drop_sql)
277+
return True
278+
except NotFound as err:
279+
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err):
280+
logger.error(f"Could not find table {from_table_name}. Table not found.")
281+
else:
282+
logger.error(err)
283+
return False
284+
285+
def _move_view(
286+
self,
287+
from_catalog: str,
288+
from_schema: str,
289+
from_table: str,
290+
to_catalog: str,
291+
to_schema: str,
292+
del_view: bool, # noqa: FBT001
293+
view_text: str | None = None,
294+
) -> bool:
295+
from_table_name = f"{from_catalog}.{from_schema}.{from_table}"
296+
to_table_name = f"{to_catalog}.{to_schema}.{from_table}"
297+
try:
298+
create_sql = f"CREATE VIEW {to_table_name} AS {view_text}"
299+
logger.debug(f"Creating view {to_table_name}.")
300+
self._backend.execute(create_sql)
301+
grants = self._ws.grants.get(SecurableType.TABLE, from_table_name)
302+
if grants.privilege_assignments is None:
303+
return True
304+
grants_changes = [
305+
PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments
306+
]
307+
self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes)
308+
if del_view:
309+
logger.info(f"dropping source view {from_table_name}")
310+
drop_sql = f"DROP VIEW {from_table_name}"
311+
self._backend.execute(drop_sql)
312+
return True
313+
except NotFound as err:
314+
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err):
315+
logger.error(f"Could not find view {from_table_name}. View not found.")
316+
else:
317+
logger.error(err)
318+
return False
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import logging
2+
from datetime import timedelta
3+
4+
from databricks.sdk.errors import NotFound
5+
from databricks.sdk.retries import retried
6+
from databricks.sdk.service.catalog import Privilege, PrivilegeAssignment, SecurableType
7+
8+
from databricks.labs.ucx.hive_metastore.table_migrate import TableMove
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
@retried(on=[NotFound], timeout=timedelta(minutes=2))
14+
def test_move_tables_no_from_schema(ws, sql_backend, make_random, make_catalog, caplog):
15+
from_catalog = make_catalog()
16+
from_schema = make_random(4)
17+
to_catalog = make_catalog()
18+
tm = TableMove(ws, sql_backend)
19+
tm.move_tables(from_catalog.name, from_schema, "*", to_catalog.name, from_schema, False)
20+
rec_results = [
21+
rec.message
22+
for rec in caplog.records
23+
if f"schema {from_schema} not found in catalog {from_catalog.name}" in rec.message
24+
]
25+
assert len(rec_results) == 1
26+
27+
28+
@retried(on=[NotFound], timeout=timedelta(minutes=2))
29+
def test_move_tables(ws, sql_backend, make_catalog, make_schema, make_table, make_acc_group):
30+
tm = TableMove(ws, sql_backend)
31+
group_a = make_acc_group()
32+
group_b = make_acc_group()
33+
from_catalog = make_catalog()
34+
from_schema = make_schema(catalog_name=from_catalog.name)
35+
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
36+
from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
37+
from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
38+
from_view_1 = make_table(
39+
catalog_name=from_catalog.name,
40+
schema_name=from_schema.name,
41+
view=True,
42+
ctas=f"select * from {from_table_2.full_name}",
43+
)
44+
to_catalog = make_catalog()
45+
to_schema = make_schema(catalog_name=to_catalog.name)
46+
# creating a table in target schema to test skipping
47+
to_table_3 = make_table(catalog_name=to_catalog.name, schema_name=to_schema.name, name=from_table_3.name)
48+
sql_backend.execute(f"GRANT SELECT ON TABLE {from_table_1.full_name} TO `{group_a.display_name}`")
49+
sql_backend.execute(f"GRANT SELECT,MODIFY ON TABLE {from_table_2.full_name} TO `{group_b.display_name}`")
50+
sql_backend.execute(f"GRANT SELECT ON VIEW {from_view_1.full_name} TO `{group_b.display_name}`")
51+
sql_backend.execute(f"GRANT SELECT ON TABLE {to_table_3.full_name} TO `{group_a.display_name}`")
52+
tm.move_tables(from_catalog.name, from_schema.name, "*", to_catalog.name, to_schema.name, False)
53+
tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema.name)
54+
table_1_grant = ws.grants.get(
55+
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_1.name}"
56+
)
57+
table_2_grant = ws.grants.get(
58+
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_2.name}"
59+
)
60+
table_3_grant = ws.grants.get(
61+
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_3.name}"
62+
)
63+
view_1_grant = ws.grants.get(
64+
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_view_1.name}"
65+
)
66+
for t in tables:
67+
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name, from_view_1.name]
68+
expected_table_1_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])]
69+
expected_table_2_grant = [
70+
PrivilegeAssignment(group_b.display_name, [Privilege.MODIFY, Privilege.SELECT]),
71+
]
72+
expected_table_3_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])]
73+
expected_view_1_grant = [PrivilegeAssignment(group_b.display_name, [Privilege.SELECT])]
74+
assert table_1_grant.privilege_assignments == expected_table_1_grant
75+
assert table_2_grant.privilege_assignments == expected_table_2_grant
76+
assert table_3_grant.privilege_assignments == expected_table_3_grant
77+
assert view_1_grant.privilege_assignments == expected_view_1_grant
78+
79+
80+
@retried(on=[NotFound], timeout=timedelta(minutes=2))
81+
def test_move_tables_no_to_schema(ws, sql_backend, make_catalog, make_schema, make_table, make_random):
82+
tm = TableMove(ws, sql_backend)
83+
from_catalog = make_catalog()
84+
from_schema = make_schema(catalog_name=from_catalog.name)
85+
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
86+
from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
87+
from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
88+
to_catalog = make_catalog()
89+
to_schema = make_random(4)
90+
tm.move_tables(from_catalog.name, from_schema.name, from_table_1.name, to_catalog.name, to_schema, True)
91+
tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema)
92+
dropped_tables = ws.tables.list(catalog_name=from_catalog.name, schema_name=from_schema.name)
93+
for t in tables:
94+
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name]
95+
for t in dropped_tables:
96+
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name]

0 commit comments

Comments
 (0)