Skip to content

Commit 00c89aa

Browse files
authored
Fix issue with migrating MANAGED hive_metastore table to UC (#2928)
<!-- REMOVE IRRELEVANT COMMENTS BEFORE CREATING A PULL REQUEST --> ## Changes HMS MANAGED tables when deleted also delete their underlying data. If an HMS-managed table is migrated to UC as EXTERNAL, dropping the HMS table will delete the underlying data file and render the UC table unusable, leading to a non-recoverable data loss. Changing the MANAGED table to EXTERNAL may have consequences on regulatory data cleanup, as deleting the EXTERNAL table no longer deletes the underlying table. It would cause leakage of data when tables are dropped. As with the case of duplicating the data, if new data is added to either HMS or UC, the other table goes out of sync requiring re-migration Resolves #2838 ### Functionality - [ ] added relevant user documentation - [ ] modified existing workflow: `...` ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [ ] added unit tests - [ ] added integration tests
1 parent 669b136 commit 00c89aa

File tree

7 files changed

+157
-67
lines changed

7 files changed

+157
-67
lines changed

README.md

Lines changed: 9 additions & 9 deletions
Large diffs are not rendered by default.

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import re
44
from collections import defaultdict
5-
from functools import partial
5+
from functools import partial, cached_property
66

77
from databricks.labs.blueprint.parallel import Threads
88
from databricks.labs.lsql.backends import SqlBackend
@@ -18,6 +18,7 @@
1818
TableMapping,
1919
TableToMigrate,
2020
)
21+
2122
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher
2223
from databricks.labs.ucx.hive_metastore.tables import (
2324
MigrationCount,
@@ -44,6 +45,7 @@ def __init__(
4445
migrate_grants: MigrateGrants,
4546
external_locations: ExternalLocations,
4647
):
48+
4749
self._tc = table_crawler
4850
self._backend = backend
4951
self._ws = ws
@@ -71,6 +73,8 @@ def migrate_tables(
7173
hiveserde_in_place_migrate: bool = False,
7274
managed_table_external_storage: str = "CLONE",
7375
):
76+
if managed_table_external_storage == "CONVERT_TO_EXTERNAL":
77+
self._spark = self._spark_session
7478
if what in [What.DB_DATASET, What.UNKNOWN]:
7579
logger.error(f"Can't migrate tables with type {what.name}")
7680
return None
@@ -123,7 +127,19 @@ def _migrate_views(self):
123127
self.index(force_refresh=True)
124128
return all_tasks
125129

130+
@cached_property
131+
def _spark_session(self):
132+
# pylint: disable-next=import-error,import-outside-toplevel
133+
from pyspark.sql.session import SparkSession # type: ignore[import-not-found]
134+
135+
return SparkSession.builder.getOrCreate()
136+
126137
def _migrate_managed_table(self, managed_table_external_storage: str, src_table: TableToMigrate):
138+
if managed_table_external_storage == 'CONVERT_TO_EXTERNAL':
139+
if self._convert_hms_table_to_external(src_table.src):
140+
return self._migrate_external_table(
141+
src_table.src, src_table.rule
142+
) # _migrate_external_table remains unchanged
127143
if managed_table_external_storage == 'SYNC_AS_EXTERNAL':
128144
return self._migrate_managed_as_external_table(src_table.src, src_table.rule) # new method
129145
if managed_table_external_storage == 'CLONE':
@@ -227,8 +243,58 @@ def _sql_migrate_view(self, src_view: ViewToMigrate) -> str:
227243
# this does not require the index to be refreshed because the dependencies have already been validated
228244
return src_view.sql_migrate_view(self.index())
229245

246+
@cached_property
247+
def _catalog(self):
248+
return self._spark._jsparkSession.sessionState().catalog() # pylint: disable=protected-access
249+
250+
@cached_property
251+
def _table_identifier(self):
252+
return self._spark._jvm.org.apache.spark.sql.catalyst.TableIdentifier # pylint: disable=protected-access
253+
254+
@cached_property
255+
def _catalog_type(self):
256+
return (
257+
self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTableType # pylint: disable=protected-access
258+
)
259+
260+
@cached_property
261+
def _catalog_table(self):
262+
return self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTable # pylint: disable=protected-access
263+
230264
def _convert_hms_table_to_external(self, src_table: Table):
231-
pass
265+
try:
266+
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
267+
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
268+
table_identifier = self._table_identifier(src_table.name, database)
269+
old_table = self._catalog.getTableMetadata(table_identifier)
270+
new_table = self._catalog_table(
271+
old_table.identifier(),
272+
self._catalog_type('EXTERNAL'),
273+
old_table.storage(),
274+
old_table.schema(),
275+
old_table.provider(),
276+
old_table.partitionColumnNames(),
277+
old_table.bucketSpec(),
278+
old_table.owner(),
279+
old_table.createTime(),
280+
old_table.lastAccessTime(),
281+
old_table.createVersion(),
282+
old_table.properties(),
283+
old_table.stats(),
284+
old_table.viewText(),
285+
old_table.comment(),
286+
old_table.unsupportedFeatures(),
287+
old_table.tracksPartitionsInCatalog(),
288+
old_table.schemaPreservesCase(),
289+
old_table.ignoredProperties(),
290+
old_table.viewOriginalText(),
291+
)
292+
self._catalog.alterTable(new_table)
293+
logger.info(f"Converted {src_table.name} to External Table type.")
294+
except Exception as e: # pylint: disable=broad-exception-caught
295+
logger.warning(f"Error converting HMS table {src_table.name} to external: {e}", exc_info=True)
296+
return False
297+
return True
232298

233299
def _migrate_managed_as_external_table(self, src_table: Table, rule: Rule):
234300
target_table_key = rule.as_uc_table_key

src/databricks/labs/ucx/hive_metastore/workflows.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ def migrate_external_tables_sync(self, ctx: RuntimeContext):
1313
"""This workflow task migrates the external tables that are supported by SYNC command from the Hive Metastore
1414
to the Unity Catalog.
1515
"""
16-
ctx.tables_migrator.migrate_tables(what=What.EXTERNAL_SYNC)
16+
ctx.tables_migrator.migrate_tables(
17+
what=What.EXTERNAL_SYNC, managed_table_external_storage=ctx.config.managed_table_external_storage
18+
)
1719

1820
@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
1921
def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext):

src/databricks/labs/ucx/install.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ def _config_table_migration(self, spark_conf_dict) -> tuple[int, int, dict, str]
379379
managed_table_migration_choices = {
380380
"Migrate MANAGED HMS table as EXTERNAL UC table. This option would require you to convert MANAGED HMS tables to EXTERNAL HMS tables once UC migration is complete, otherwise deleting HMS MANAGED table would delete the migrated UC table": 'SYNC_AS_EXTERNAL',
381381
"Copy data from MANAGED HMS to MANAGED UC table": 'CLONE',
382+
"Convert MANAGED HMS table to EXTERNAL HMS table and migrate as EXTERNAL UC table. This risks data leakage, as once the relevant HMS tables are deleted, the underlying data won't get deleted anymore.": 'CONVERT_TO_EXTERNAL',
382383
}
383384
managed_table_migration_choice = self.prompts.choice_from_dict(
384385
"If hive_metastore contains managed table with external"

tests/integration/hive_metastore/test_migrate.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,14 @@ def test_migrate_external_table(
172172

173173
@retried(on=[NotFound], timeout=timedelta(minutes=2))
174174
def test_migrate_managed_table_to_external_table_without_conversion(
175-
ws,
176-
sql_backend,
177-
runtime_ctx,
178-
make_catalog,
179-
make_mounted_location,
175+
ws, sql_backend, runtime_ctx, make_catalog, make_mounted_location, make_random, env_or_skip
180176
):
181-
# TODO: update pytest fixture for make_schema to take location as parameter to create managed schema
182-
# TODO: update azure blueprint to add spn in sql warehouse data access config
183-
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
177+
src_schema_name = f"dummy_s{make_random(4)}".lower()
178+
src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}"
179+
src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location)
184180
src_external_table = runtime_ctx.make_table(
185181
schema_name=src_schema.name,
186-
external_csv=make_mounted_location,
182+
external=False,
187183
columns=[("`back`ticks`", "STRING")], # Test with column that needs escaping
188184
)
189185
dst_catalog = make_catalog()
@@ -214,18 +210,14 @@ def test_migrate_managed_table_to_external_table_without_conversion(
214210

215211
@retried(on=[NotFound], timeout=timedelta(minutes=2))
216212
def test_migrate_managed_table_to_external_table_with_clone(
217-
ws,
218-
sql_backend,
219-
runtime_ctx,
220-
make_catalog,
221-
make_mounted_location,
213+
ws, sql_backend, runtime_ctx, make_catalog, make_mounted_location, make_random, env_or_skip
222214
):
223-
# TODO: update pytest fixture for make_schema to take location as parameter to create managed schema
224-
# TODO: update azure blueprint to add spn in sql warehouse data access config
225-
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
215+
src_schema_name = f"dummy_s{make_random(4)}".lower()
216+
src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}"
217+
src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location)
226218
src_external_table = runtime_ctx.make_table(
227219
schema_name=src_schema.name,
228-
external_csv=make_mounted_location,
220+
external=False,
229221
columns=[("`back`ticks`", "STRING")], # Test with column that needs escaping
230222
)
231223
dst_catalog = make_catalog()

0 commit comments

Comments
 (0)