Skip to content

Commit 44ad51c

Browse files
authored
Added databricks labs ucx alias command to create a view of tables from one schema/catalog in another schema/catalog (#837)
1 parent aeda4c9 commit 44ad51c

File tree

6 files changed

+495
-13
lines changed

6 files changed

+495
-13
lines changed

labs.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,23 @@ commands:
8080
- name: to-schema
8181
description: target schema to migrate tables to
8282

83+
- name: alias
84+
description: |
85+
alias tables across schema/catalog withing a UC metastore
86+
create a view pointing to the "from" table
87+
if a view is aliased, recreates the same view in the target schema/catalog
88+
flags:
89+
- name: from-catalog
90+
description: from catalog name
91+
- name: from-schema
92+
description: from schema
93+
- name: from-table
94+
description: table names to alias. enter * to migrate all tables
95+
- name: to-catalog
96+
description: target catalog to migrate schema to
97+
- name: to-schema
98+
description: target schema to migrate tables to
99+
83100
- name: save-azure-storage-accounts
84101
description: Identifies all storage account used by tables, identify spn and its permission on each storage accounts
85102
flags:

src/databricks/labs/ucx/cli.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,36 @@ def move(
237237
tables.move_tables(from_catalog, from_schema, from_table, to_catalog, to_schema, del_table)
238238

239239

240+
@ucx.command
241+
def alias(
242+
w: WorkspaceClient,
243+
from_catalog: str,
244+
from_schema: str,
245+
from_table: str,
246+
to_catalog: str,
247+
to_schema: str,
248+
):
249+
"""move a uc table/tables from one schema to another schema in same or different catalog"""
250+
installation_manager = InstallationManager(w)
251+
installation = installation_manager.for_user(w.current_user.me())
252+
if not installation:
253+
logger.error(CANT_FIND_UCX_MSG)
254+
return
255+
sql_backend = StatementExecutionBackend(w, installation.config.warehouse_id)
256+
tables = TableMove(w, sql_backend)
257+
if from_catalog == "" or to_catalog == "":
258+
logger.error("Please enter from_catalog and to_catalog details")
259+
return
260+
if from_schema == "" or to_schema == "" or from_table == "":
261+
logger.error("Please enter from_schema, to_schema and from_table(enter * for migrating all tables) details.")
262+
return
263+
if from_catalog == to_catalog and from_schema == to_schema:
264+
logger.error("please select a different schema or catalog to migrate to")
265+
return
266+
logger.info(f"aliasing table {from_table} from {from_catalog}.{from_schema} to {to_catalog}.{to_schema}")
267+
tables.alias_tables(from_catalog, from_schema, from_table, to_catalog, to_schema)
268+
269+
240270
@ucx.command
241271
def save_azure_storage_accounts(w: WorkspaceClient, subscription_id: str):
242272
"""identifies all azure storage account used by external tables

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 108 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55
from databricks.labs.blueprint.parallel import Threads
66
from databricks.sdk import WorkspaceClient
77
from databricks.sdk.errors import NotFound
8-
from databricks.sdk.service.catalog import PermissionsChange, SecurableType, TableType
8+
from databricks.sdk.service.catalog import (
9+
PermissionsChange,
10+
Privilege,
11+
SecurableType,
12+
TableType,
13+
)
914

1015
from databricks.labs.ucx.framework.crawlers import SqlBackend
1116
from databricks.labs.ucx.hive_metastore import TablesCrawler
@@ -229,7 +234,8 @@ def move_tables(
229234
self._move_table, from_catalog, from_schema, table.name, to_catalog, to_schema, del_table
230235
)
231236
)
232-
else:
237+
continue
238+
if table.view_definition:
233239
view_tasks.append(
234240
partial(
235241
self._move_view,
@@ -242,11 +248,70 @@ def move_tables(
242248
table.view_definition,
243249
)
244250
)
251+
continue
252+
logger.warning(
253+
f"table {from_table} was not identified as a valid table or view. skipping this table..."
254+
)
245255
Threads.strict("Creating tables", table_tasks)
246256
logger.info(f"Moved {len(list(table_tasks))} tables to the new schema {to_schema}.")
247257
Threads.strict("Creating views", view_tasks)
248258
logger.info(f"Moved {len(list(view_tasks))} views to the new schema {to_schema}.")
249259

260+
def alias_tables(
261+
self,
262+
from_catalog: str,
263+
from_schema: str,
264+
from_table: str,
265+
to_catalog: str,
266+
to_schema: str,
267+
):
268+
try:
269+
self._ws.schemas.get(f"{from_catalog}.{from_schema}")
270+
except NotFound:
271+
logger.error(f"schema {from_schema} not found in catalog {from_catalog}, enter correct schema details.")
272+
return
273+
try:
274+
self._ws.schemas.get(f"{to_catalog}.{to_schema}")
275+
except NotFound:
276+
logger.warning(f"schema {to_schema} not found in {to_catalog}, creating...")
277+
self._ws.schemas.create(to_schema, to_catalog)
278+
279+
tables = self._ws.tables.list(from_catalog, from_schema)
280+
alias_tasks = []
281+
filtered_tables = [table for table in tables if from_table in [table.name, "*"]]
282+
for table in filtered_tables:
283+
try:
284+
self._ws.tables.get(f"{to_catalog}.{to_schema}.{table.name}")
285+
logger.warning(
286+
f"table {from_table} already present in {from_catalog}.{from_schema}. skipping this table..."
287+
)
288+
continue
289+
except NotFound:
290+
if table.table_type and table.table_type in (TableType.EXTERNAL, TableType.MANAGED):
291+
alias_tasks.append(
292+
partial(self._alias_table, from_catalog, from_schema, table.name, to_catalog, to_schema)
293+
)
294+
continue
295+
if table.view_definition:
296+
alias_tasks.append(
297+
partial(
298+
self._move_view,
299+
from_catalog,
300+
from_schema,
301+
table.name,
302+
to_catalog,
303+
to_schema,
304+
False,
305+
table.view_definition,
306+
)
307+
)
308+
continue
309+
logger.warning(
310+
f"table {from_table} was not identified as a valid table or view. skipping this table..."
311+
)
312+
Threads.strict("Creating aliases", alias_tasks)
313+
logger.info(f"Created {len(list(alias_tasks))} table and view aliases in the new schema {to_schema}.")
314+
250315
def _move_table(
251316
self,
252317
from_catalog: str,
@@ -273,13 +338,45 @@ def _move_table(
273338
logger.error(f"Failed to move table {from_table_name}: {err!s}", exc_info=True)
274339
return False
275340

276-
def _reapply_grants(self, from_table_name, to_table_name):
341+
def _alias_table(
342+
self,
343+
from_catalog: str,
344+
from_schema: str,
345+
from_table: str,
346+
to_catalog: str,
347+
to_schema: str,
348+
) -> bool:
349+
from_table_name = f"{from_catalog}.{from_schema}.{from_table}"
350+
to_table_name = f"{to_catalog}.{to_schema}.{from_table}"
351+
try:
352+
self._create_alias_view(from_table_name, to_table_name)
353+
self._reapply_grants(from_table_name, to_table_name, target_view=True)
354+
return True
355+
except NotFound as err:
356+
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err) or "[DELTA_TABLE_NOT_FOUND]" in str(err):
357+
logger.error(f"Could not find table {from_table_name}. Table not found.")
358+
else:
359+
logger.error(f"Failed to alias table {from_table_name}: {err!s}", exc_info=True)
360+
return False
361+
362+
def _reapply_grants(self, from_table_name, to_table_name, *, target_view: bool = False):
277363
grants = self._ws.grants.get(SecurableType.TABLE, from_table_name)
278364
if grants.privilege_assignments is not None:
279365
logger.info(f"Applying grants on table {to_table_name}")
280-
grants_changes = [
281-
PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments
282-
]
366+
grants_changes = []
367+
for permission in grants.privilege_assignments:
368+
if not permission.privileges:
369+
continue
370+
if not target_view:
371+
grants_changes.append(PermissionsChange(list(permission.privileges), permission.principal))
372+
continue
373+
privileges = set()
374+
for privilege in permission.privileges:
375+
if privilege != Privilege.MODIFY:
376+
privileges.add(privilege)
377+
if privileges:
378+
grants_changes.append(PermissionsChange(list(privileges), permission.principal))
379+
283380
self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes)
284381

285382
def _recreate_table(self, from_table_name, to_table_name):
@@ -288,6 +385,11 @@ def _recreate_table(self, from_table_name, to_table_name):
288385
logger.info(f"Creating table {to_table_name}")
289386
self._backend.execute(create_table_sql)
290387

388+
def _create_alias_view(self, from_table_name, to_table_name):
389+
create_view_sql = f"CREATE VIEW {to_table_name} AS SELECT * FROM {from_table_name}"
390+
logger.info(f"Creating view {to_table_name} on {from_table_name}")
391+
self._backend.execute(create_view_sql)
392+
291393
def _move_view(
292394
self,
293395
from_catalog: str,

tests/integration/hive_metastore/test_table_move.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,57 @@ def test_move_views(ws, sql_backend, make_catalog, make_schema, make_table, make
136136

137137
for t in from_views:
138138
assert t.name in [from_view_not_to_migrate.name, from_table.name]
139+
140+
141+
@retried(on=[NotFound], timeout=timedelta(minutes=2))
142+
def test_alias_tables(ws, sql_backend, make_catalog, make_schema, make_table, make_acc_group):
143+
tm = TableMove(ws, sql_backend)
144+
group_a = make_acc_group()
145+
group_b = make_acc_group()
146+
from_catalog = make_catalog()
147+
from_schema = make_schema(catalog_name=from_catalog.name)
148+
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
149+
from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
150+
from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
151+
from_view_1 = make_table(
152+
catalog_name=from_catalog.name,
153+
schema_name=from_schema.name,
154+
view=True,
155+
ctas=f"select * from {from_table_2.full_name} where 1=2",
156+
)
157+
to_catalog = make_catalog()
158+
to_schema = make_schema(catalog_name=to_catalog.name)
159+
# creating a table in target schema to test skipping
160+
to_table_3 = make_table(catalog_name=to_catalog.name, schema_name=to_schema.name, name=from_table_3.name)
161+
sql_backend.execute(f"GRANT SELECT ON TABLE {from_table_1.full_name} TO `{group_a.display_name}`")
162+
sql_backend.execute(f"GRANT SELECT,MODIFY ON TABLE {from_table_2.full_name} TO `{group_b.display_name}`")
163+
sql_backend.execute(f"GRANT SELECT ON VIEW {from_view_1.full_name} TO `{group_b.display_name}`")
164+
sql_backend.execute(f"GRANT SELECT ON TABLE {to_table_3.full_name} TO `{group_a.display_name}`")
165+
166+
tm.alias_tables(from_catalog.name, from_schema.name, "*", to_catalog.name, to_schema.name)
167+
168+
to_tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema.name)
169+
table_1_grant = ws.grants.get(
170+
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_1.name}"
171+
)
172+
table_2_grant = ws.grants.get(
173+
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_2.name}"
174+
)
175+
table_3_grant = ws.grants.get(
176+
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_3.name}"
177+
)
178+
view_1_grant = ws.grants.get(
179+
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_view_1.name}"
180+
)
181+
for t in to_tables:
182+
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name, from_view_1.name]
183+
184+
expected_table_1_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])]
185+
expected_table_2_grant = [PrivilegeAssignment(group_b.display_name, [Privilege.SELECT])]
186+
expected_table_3_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])]
187+
expected_view_1_grant = [PrivilegeAssignment(group_b.display_name, [Privilege.SELECT])]
188+
189+
assert table_1_grant.privilege_assignments == expected_table_1_grant
190+
assert table_2_grant.privilege_assignments == expected_table_2_grant
191+
assert table_3_grant.privilege_assignments == expected_table_3_grant
192+
assert view_1_grant.privilege_assignments == expected_view_1_grant

0 commit comments

Comments
 (0)