Skip to content

Commit bf261ae

Browse files
authored
Fix issue with migrating MANAGED hive_metastore table to UC for CONVERT_TO_EXTERNAL scenario (#3020)
<!-- REMOVE IRRELEVANT COMMENTS BEFORE CREATING A PULL REQUEST --> ## Changes This PR updates the process for doing the CONVERT_TO_EXTERAL scenario. It splits the functionality of converting the HMS table to external into a separate workflow task, which is executed from the non-UC cluster. Once converted the migrate table function for external sync ensures the table is migrated as external to UC Resolves #2840 ### Functionality - [ ] added a new workflow - [ ] modified existing workflow: `...` ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [ ] added unit tests - [ ] added integration tests
1 parent 0c2dfe3 commit bf261ae

File tree

8 files changed

+131
-34
lines changed

8 files changed

+131
-34
lines changed

src/databricks/labs/ucx/hive_metastore/mapping.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,13 @@ def unskip_schema(self, schema: str) -> None:
195195
except (NotFound, BadRequest) as e:
196196
logger.error(f"Failed to remove skip marker from schema: {schema}.", exc_info=e)
197197

198-
def get_tables_to_migrate(self, tables_crawler: TablesCrawler) -> Collection[TableToMigrate]:
198+
def get_tables_to_migrate(
199+
self,
200+
tables_crawler: TablesCrawler,
201+
check_uc_table: bool = True,
202+
) -> Collection[TableToMigrate]:
203+
# the check_uc_table is added specifically for convert_managed_hms_to_external method
204+
# so that it doesn't invoke any UC api which are not supported in non uc cluster
199205
rules = self.load()
200206
# Getting all the source tables from the rules
201207
databases_in_scope = self._get_databases_in_scope({rule.src_schema for rule in rules})
@@ -212,7 +218,14 @@ def get_tables_to_migrate(self, tables_crawler: TablesCrawler) -> Collection[Tab
212218
logger.info(f"Table {rule.as_hms_table_key} is a db demo dataset and will not be migrated")
213219
continue
214220
tasks.append(
215-
partial(self._get_table_in_scope_task, TableToMigrate(crawled_tables_keys[rule.as_hms_table_key], rule))
221+
partial(
222+
self._get_table_in_scope_task,
223+
TableToMigrate(
224+
crawled_tables_keys[rule.as_hms_table_key],
225+
rule,
226+
),
227+
check_uc_table,
228+
)
216229
)
217230

218231
return Threads.strict("checking all database properties", tasks)
@@ -243,11 +256,11 @@ def _get_database_in_scope_task(self, database: str) -> str | None:
243256
return None
244257
return database
245258

246-
def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate) -> TableToMigrate | None:
259+
def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate, check_uc_table: bool) -> TableToMigrate | None:
247260
table = table_to_migrate.src
248261
rule = table_to_migrate.rule
249262

250-
if self.exists_in_uc(table, rule.as_uc_table_key):
263+
if check_uc_table and self.exists_in_uc(table, rule.as_uc_table_key):
251264
logger.info(f"The intended target for {table.key}, {rule.as_uc_table_key}, already exists.")
252265
return None
253266
properties = self._get_table_properties(table)
@@ -260,7 +273,7 @@ def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate) -> TableToM
260273
return None
261274
if value["key"] == "upgraded_to":
262275
logger.info(f"{table.key} is set as upgraded to {value['value']}")
263-
if self.exists_in_uc(table, value["value"]):
276+
if check_uc_table and self.exists_in_uc(table, value["value"]):
264277
logger.info(
265278
f"The table {table.key} was previously migrated to {value['value']}. "
266279
f"To revert the table and allow it to be migrated again use the CLI command:"

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,39 @@ def get_remaining_tables(self) -> list[Table]:
6767
def index(self, *, force_refresh: bool = False):
6868
return self._migration_status_refresher.index(force_refresh=force_refresh)
6969

70+
def convert_managed_hms_to_external(
71+
self,
72+
managed_table_external_storage: str = "CLONE",
73+
):
74+
# This method contains some of the steps of migrate tables. this was done to separate out the
75+
# code for converting managed hms table to external, since this needs to run in non uc cluster,
76+
# the functionality to call the UC api are removed
77+
78+
if managed_table_external_storage != "CONVERT_TO_EXTERNAL":
79+
logger.info("Not required to convert managed hms table to external, Skipping this task...")
80+
return None
81+
self._spark = self._spark_session
82+
tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, False)
83+
tables_in_scope = filter(lambda t: t.src.what == What.EXTERNAL_SYNC, tables_to_migrate)
84+
tasks = []
85+
for table in tables_in_scope:
86+
tasks.append(
87+
partial(
88+
self._convert_hms_table_to_external,
89+
table.src,
90+
)
91+
)
92+
Threads.strict("convert tables", tasks)
93+
if not tasks:
94+
logger.info("No managed hms table found to convert to external")
95+
return tasks
96+
7097
def migrate_tables(
7198
self,
7299
what: What,
73100
hiveserde_in_place_migrate: bool = False,
74101
managed_table_external_storage: str = "CLONE",
102+
check_uc_table: bool = True,
75103
):
76104
if managed_table_external_storage == "CONVERT_TO_EXTERNAL":
77105
self._spark = self._spark_session
@@ -82,18 +110,17 @@ def migrate_tables(
82110
if what == What.VIEW:
83111
return self._migrate_views()
84112
return self._migrate_tables(
85-
what,
86-
managed_table_external_storage.upper(),
87-
hiveserde_in_place_migrate,
113+
what, managed_table_external_storage.upper(), hiveserde_in_place_migrate, check_uc_table
88114
)
89115

90116
def _migrate_tables(
91117
self,
92118
what: What,
93119
managed_table_external_storage: str,
94120
hiveserde_in_place_migrate: bool = False,
121+
check_uc_table: bool = True,
95122
):
96-
tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler)
123+
tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, check_uc_table)
97124
tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate)
98125
tasks = []
99126
for table in tables_in_scope:
@@ -134,12 +161,15 @@ def _spark_session(self):
134161

135162
return SparkSession.builder.getOrCreate()
136163

137-
def _migrate_managed_table(self, managed_table_external_storage: str, src_table: TableToMigrate):
164+
def _migrate_managed_table(
165+
self,
166+
managed_table_external_storage: str,
167+
src_table: TableToMigrate,
168+
):
138169
if managed_table_external_storage == 'CONVERT_TO_EXTERNAL':
139-
if self._convert_hms_table_to_external(src_table.src):
140-
return self._migrate_external_table(
141-
src_table.src, src_table.rule
142-
) # _migrate_external_table remains unchanged
170+
return self._migrate_external_table(
171+
src_table.src, src_table.rule
172+
) # _migrate_external_table remains unchanged
143173
if managed_table_external_storage == 'SYNC_AS_EXTERNAL':
144174
return self._migrate_managed_as_external_table(src_table.src, src_table.rule) # new method
145175
if managed_table_external_storage == 'CLONE':
@@ -262,8 +292,9 @@ def _catalog_table(self):
262292
return self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTable # pylint: disable=protected-access
263293

264294
def _convert_hms_table_to_external(self, src_table: Table):
295+
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
296+
inventory_table = self._tables_crawler.full_name
265297
try:
266-
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
267298
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
268299
table_identifier = self._table_identifier(src_table.name, database)
269300
old_table = self._catalog.getTableMetadata(table_identifier)
@@ -290,12 +321,17 @@ def _convert_hms_table_to_external(self, src_table: Table):
290321
old_table.viewOriginalText(),
291322
)
292323
self._catalog.alterTable(new_table)
324+
self._update_table_status(src_table, inventory_table)
293325
logger.info(f"Converted {src_table.name} to External Table type.")
294326
except Exception as e: # pylint: disable=broad-exception-caught
295327
logger.warning(f"Error converting HMS table {src_table.name} to external: {e}", exc_info=True)
296328
return False
297329
return True
298330

331+
def _update_table_status(self, src_table: Table, inventory_table: str):
332+
update_sql = f"UPDATE {escape_sql_identifier(inventory_table)} SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='{src_table.database}' AND name='{src_table.name}';"
333+
self._sql_backend.execute(update_sql)
334+
299335
def _migrate_managed_as_external_table(self, src_table: Table, rule: Rule):
300336
target_table_key = rule.as_uc_table_key
301337
table_migrate_sql = src_table.sql_migrate_as_external(target_table_key)

src/databricks/labs/ucx/hive_metastore/workflows.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,15 @@ class TableMigration(Workflow):
88
def __init__(self):
99
super().__init__('migrate-tables')
1010

11-
@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
11+
@job_task(job_cluster="main", depends_on=[Assessment.crawl_tables])
12+
def convert_managed_table(self, ctx: RuntimeContext):
13+
"""This workflow task converts managed HMS tables to external table if `managed_table_external_storage` is set to `CONVERT_TO_EXTERNAL
14+
See documentation for more detail."""
15+
ctx.tables_migrator.convert_managed_hms_to_external(
16+
managed_table_external_storage=ctx.config.managed_table_external_storage
17+
)
18+
19+
@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table])
1220
def migrate_external_tables_sync(self, ctx: RuntimeContext):
1321
"""This workflow task migrates the external tables that are supported by SYNC command from the Hive Metastore
1422
to the Unity Catalog.
@@ -17,15 +25,18 @@ def migrate_external_tables_sync(self, ctx: RuntimeContext):
1725
what=What.EXTERNAL_SYNC, managed_table_external_storage=ctx.config.managed_table_external_storage
1826
)
1927

20-
@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
28+
@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table])
2129
def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext):
2230
"""This workflow task migrates delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog
2331
using deep clone.
2432
"""
2533
ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA)
2634

27-
@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
28-
def migrate_dbfs_root_non_delta_tables(self, ctx: RuntimeContext):
35+
@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table])
36+
def migrate_dbfs_root_non_delta_tables(
37+
self,
38+
ctx: RuntimeContext,
39+
):
2940
"""This workflow task migrates non delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog
3041
using CTAS.
3142
"""

src/databricks/labs/ucx/install.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ def _config_table_migration(self, spark_conf_dict) -> tuple[int, int, dict, str]
380380
managed_table_migration_choices = {
381381
"Migrate MANAGED HMS table as EXTERNAL UC table. This option would require you to convert MANAGED HMS tables to EXTERNAL HMS tables once UC migration is complete, otherwise deleting HMS MANAGED table would delete the migrated UC table": 'SYNC_AS_EXTERNAL',
382382
"Copy data from MANAGED HMS to MANAGED UC table": 'CLONE',
383-
"Convert MANAGED HMS table to EXTERNAL HMS table and migrate as EXTERNAL UC table. This risks data leakage, as once the relevant HMS tables are deleted, the underlying data won't get deleted anymore.": 'CONVERT_TO_EXTERNAL',
383+
"Convert MANAGED HMS table to EXTERNAL HMS table and migrate as EXTERNAL UC table. Once the relevant HMS tables are deleted, the underlying data won't get deleted anymore, consider the impact of this change on your data workloads": 'CONVERT_TO_EXTERNAL',
384384
}
385385
managed_table_migration_choice = self.prompts.choice_from_dict(
386386
"If hive_metastore contains managed table with external"

tests/integration/conftest.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -611,14 +611,17 @@ def save_tables(self, is_hiveserde: bool = False):
611611
continue
612612
table_type = table.table_type.value if table.table_type else ""
613613
table_format = table.data_source_format.value if table.data_source_format else default_table_format
614+
storage_location = table.storage_location
615+
if table_type == "MANAGED":
616+
storage_location = ""
614617
tables_to_save.append(
615618
Table(
616619
catalog=table.catalog_name,
617620
database=table.schema_name,
618621
name=table.name,
619622
object_type=table_type,
620623
table_format=table_format,
621-
location=str(table.storage_location or ""),
624+
location=str(storage_location or ""),
622625
view_text=table.view_definition,
623626
)
624627
)
@@ -1205,13 +1208,18 @@ def prepare_tables_for_migration(
12051208
is_hiveserde = scenario == "hiveserde"
12061209
random = make_random(5).lower()
12071210
# create external and managed tables to be migrated
1208-
if is_hiveserde:
1211+
if scenario == "hiveserde":
12091212
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}")
12101213
table_base_dir = make_storage_dir(
12111214
path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}'
12121215
)
12131216
tables = prepare_hiveserde_tables(installation_ctx, random, schema, table_base_dir)
1214-
else:
1217+
elif scenario == "managed":
1218+
schema_name = f"managed_{random}"
1219+
schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}'
1220+
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location)
1221+
tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema)
1222+
elif scenario == "regular":
12151223
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}")
12161224
tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema)
12171225

tests/integration/hive_metastore/test_migrate.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def test_migrate_managed_table_to_external_table_without_conversion(
186186
env_or_skip,
187187
):
188188
src_schema_name = f"dummy_s{make_random(4)}".lower()
189-
src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}"
189+
src_schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/{src_schema_name}'
190190
src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location)
191191
src_external_table = runtime_ctx.make_table(
192192
schema_name=src_schema.name,
@@ -230,7 +230,7 @@ def test_migrate_managed_table_to_external_table_with_clone(
230230
env_or_skip,
231231
):
232232
src_schema_name = f"dummy_s{make_random(4)}".lower()
233-
src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}"
233+
src_schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/{src_schema_name}'
234234
src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location)
235235
src_external_table = runtime_ctx.make_table(
236236
schema_name=src_schema.name,

tests/integration/hive_metastore/test_workflows.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import pytest
22
from databricks.sdk.errors import NotFound
3+
4+
from databricks.labs.ucx.framework.utils import escape_sql_identifier
35
from databricks.labs.ucx.hive_metastore.tables import Table
46

57

@@ -60,6 +62,39 @@ def test_table_migration_job_refreshes_migration_status(
6062
assert len(asserts) == 0, assert_message
6163

6264

65+
@pytest.mark.parametrize(
66+
"prepare_tables_for_migration,workflow",
67+
[
68+
("managed", "migrate-tables"),
69+
],
70+
indirect=("prepare_tables_for_migration",),
71+
)
72+
def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_for_migration, workflow, sql_backend):
73+
# This test cases test the CONVERT_TO_EXTERNAL scenario.
74+
tables, dst_schema = prepare_tables_for_migration
75+
ctx = installation_ctx.replace(
76+
extend_prompts={
77+
r"If hive_metastore contains managed table with external.*": "0",
78+
r".*Do you want to update the existing installation?.*": 'yes',
79+
},
80+
)
81+
82+
ctx.workspace_installation.run()
83+
ctx.deployed_workflows.run_workflow(workflow)
84+
85+
for table in tables.values():
86+
try:
87+
assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name
88+
except NotFound:
89+
assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}"
90+
managed_table = tables["src_managed_table"]
91+
92+
for key, value, _ in sql_backend.fetch(f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}"):
93+
if key == "Type":
94+
assert value == "EXTERNAL"
95+
break
96+
97+
6398
@pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True)
6499
def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_tables_for_migration):
65100
tables, dst_schema = prepare_tables_for_migration

tests/unit/hive_metastore/test_table_migrate.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -217,18 +217,12 @@ def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspa
217217
table_migrate = TablesMigrator(
218218
table_crawler, ws, backend, table_mapping, migration_status_refresher, migrate_grants, external_locations
219219
)
220-
table_migrate.migrate_tables(what=What.EXTERNAL_SYNC, managed_table_external_storage="CONVERT_TO_EXTERNAL")
220+
table_migrate.convert_managed_hms_to_external(managed_table_external_storage="CONVERT_TO_EXTERNAL")
221221

222-
migrate_grants.apply.assert_called()
223222
external_locations.resolve_mount.assert_not_called()
224-
223+
migrate_grants.apply.assert_not_called()
225224
assert backend.queries == [
226-
"SYNC TABLE `ucx_default`.`db1_dst`.`managed_other` FROM `hive_metastore`.`db1_src`.`managed_other`;",
227-
(
228-
f"ALTER TABLE `ucx_default`.`db1_dst`.`managed_other` "
229-
f"SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1_src.managed_other' , "
230-
f"'{Table.UPGRADED_FROM_WS_PARAM}' = '123');"
231-
),
225+
"UPDATE `hive_metastore`.`inventory_database`.`tables` SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='db1_src' AND name='managed_other';"
232226
]
233227

234228

0 commit comments

Comments
 (0)