|
| 1 | +# Databricks notebook source |
| 2 | +# MAGIC %md |
| 3 | +# MAGIC # External tables to UC managed tables |
| 4 | +# MAGIC |
| 5 | +# MAGIC This notebook will migrate all external tables from a Hive metastore to a UC catalog. |
| 6 | +# MAGIC |
| 7 | +# MAGIC **Important:** |
| 8 | +# MAGIC - This notebook needs to run on a cluster with spark.databricks.sql.initial.catalog.name set to hive_metastore or the base catalog where the external tables will be pulled for cloning |
| 9 | +# MAGIC - Optional: table descriptions can be written to a temporary Delta table for portability across workspaces. |
| 10 | + |
| 11 | +# COMMAND ---------- |
| 12 | + |
| 13 | +# MAGIC %md |
| 14 | +# MAGIC ## Configuration |
| 15 | + |
| 16 | +# COMMAND ---------- |
| 17 | + |
| 18 | +from pyspark.sql.functions import col |
| 19 | + |
| 20 | +delta_table_location = "/tmp/hive-metastore-external-tables-data" |
| 21 | +source_catalog = "hive_metastore" |
| 22 | +destination_catalog = "dspadotto-uc-work" |
| 23 | + |
| 24 | +storage_credential = "dspadotto-uc-work-sc" |
| 25 | + |
| 26 | +# COMMAND ---------- |
| 27 | + |
| 28 | +# MAGIC %md |
| 29 | +# MAGIC # PART I: Get all tables from Hive Metastore or original catalog |
| 30 | +# MAGIC |
| 31 | +# MAGIC For this, the initial catalog name must be set to the desired source catalog |
| 32 | + |
| 33 | +# COMMAND ---------- |
| 34 | + |
| 35 | +# MAGIC %md |
| 36 | +# MAGIC ### Select all EXTERNAL tables from External Metastore |
| 37 | + |
| 38 | +# COMMAND ---------- |
| 39 | + |
| 40 | +def get_value(lst, idx, idy, default): |
| 41 | + try: |
| 42 | + return lst[idx][idy] |
| 43 | + except IndexError: |
| 44 | + return default |
| 45 | + |
| 46 | +# COMMAND ---------- |
| 47 | + |
| 48 | +# Get all tables from Hive Metastore |
| 49 | +# For this you need to set the initial catalog to hive_metastore |
| 50 | +tables = spark.catalog.listTables() |
| 51 | + |
| 52 | +# Get all databases/schemas from the Hive metastore |
| 53 | +databases = spark.sql("show databases") |
| 54 | +descriptions = [] |
| 55 | +# Loop through each database/schema |
| 56 | +for db in databases.collect(): |
| 57 | + # Get all tables from the current database/schema |
| 58 | + tables = spark.sql("show tables in {}".format(db[0])).select("tableName").collect() |
| 59 | + |
| 60 | + # Loop through each table and run the describe command |
| 61 | + for table in tables: |
| 62 | + table_name = table.tableName |
| 63 | + try: |
| 64 | + desc = spark.sql(f"DESCRIBE FORMATTED {db[0]}.{table_name}").filter("col_name = 'Location' OR col_name='Database' OR col_name='Table' OR col_name='Type'") |
| 65 | + for info in desc: |
| 66 | + desc_all = desc.collect() |
| 67 | + #catalog_name = get_value(desc_all, 0, 1, "NA") |
| 68 | + database_name = get_value(desc_all, 0, 1, "NA") |
| 69 | + table_name = get_value(desc_all, 1, 1, "NA") |
| 70 | + table_type = get_value(desc_all, 2, 1, "NA") |
| 71 | + table_location = get_value(desc_all, 3, 1, "NA") |
| 72 | + |
| 73 | + #print(f"{database_name}.{table_name} is {table_type} and is located at {table_location}") |
| 74 | + |
| 75 | + descriptions.append((database_name, table_name, table_type, table_location)) |
| 76 | + #To handle missing external tables |
| 77 | + except: |
| 78 | + print(f"Error on {db[0]}.{table_name}.") |
| 79 | + |
| 80 | +# Create DataFrame from the results |
| 81 | +source_catalog_tables = spark.createDataFrame(descriptions, ['database_name', 'table_name', 'table_type', 'table_location']).filter("table_type='EXTERNAL'") |
| 82 | + |
| 83 | +# Optional: Write the DataFrame to a Delta table |
| 84 | +#df.write.format("delta").mode("overwrite").save(delta_table_location) |
| 85 | + |
| 86 | +# COMMAND ---------- |
| 87 | + |
| 88 | +# MAGIC %md |
| 89 | +# MAGIC # PART II - CLONE tables |
| 90 | + |
| 91 | +# COMMAND ---------- |
| 92 | + |
| 93 | +# MAGIC %md |
| 94 | +# MAGIC ### Optional: Create all external locations and use a set credential |
| 95 | +# MAGIC |
| 96 | +# MAGIC Here we're assuming that: |
| 97 | +# MAGIC - the same storage credential can access all external locations |
| 98 | +# MAGIC - one external location per table will be created - this can be generalized to different levels of folder |
| 99 | +# MAGIC - the external location name will be the last level of folder (on the initial code, the table name) |
| 100 | + |
| 101 | +# COMMAND ---------- |
| 102 | + |
| 103 | +#df_external_locations = source_catalog_tables.select('table_location').distinct() |
| 104 | + |
| 105 | +#for el in df_external_locations.collect(): |
| 106 | +# try: |
| 107 | +# spark.sql("CREATE EXTERNAL LOCATION IF NOT EXISTS `{}` URL '{}' WITH (STORAGE CREDENTIAL {})".format(el.table_location.split("/")[-1], el.table_location, storage_credential)) |
| 108 | +# print("CREATE EXTERNAL LOCATION IF NOT EXISTS `{}` URL '{}' WITH (STORAGE CREDENTIAL {})".format(el.table_location.split("/")[-1], el.table_location, storage_credential)) |
| 109 | +# except Exception as e: |
| 110 | +# print('Failure on creating external location for path {}: {}'.format(el.external_location, str(e))) |
| 111 | + |
| 112 | +# COMMAND ---------- |
| 113 | + |
| 114 | +# MAGIC %md |
| 115 | +# MAGIC ### CLONE all tables |
| 116 | + |
| 117 | +# COMMAND ---------- |
| 118 | + |
| 119 | +#Create all missing databases on destination catalog |
| 120 | +databases = source_catalog_tables.select(col("database_name")).distinct().collect() |
| 121 | + |
| 122 | +for database in databases: |
| 123 | + spark.sql(f"CREATE DATABASE IF NOT EXISTS `{destination_catalog}`.{database[0]}") |
| 124 | + |
| 125 | +# COMMAND ---------- |
| 126 | + |
| 127 | +#Clone external tables into managed tables |
| 128 | +tables = source_catalog_tables.collect() |
| 129 | + |
| 130 | +for table in tables: |
| 131 | + print(f"Converting table {table[0]}.{table[1]}...") |
| 132 | + spark.sql(f"CREATE OR REPLACE TABLE `{destination_catalog}`.{table[0]}.{table[1]} DEEP CLONE {source_catalog}.{table[0]}.{table[1]}") |
| 133 | + |
| 134 | +# COMMAND ---------- |
| 135 | + |
| 136 | + |
0 commit comments