|
| 1 | +# Databricks notebook source |
| 2 | +# MAGIC %md |
| 3 | +# MAGIC ##Re-create catalog, schemas and external tables at a remote UC |
| 4 | +# MAGIC |
| 5 | +# MAGIC This notebook will read from an external container with a backup from a source catalog and re-create it. |
| 6 | +# MAGIC |
| 7 | +# MAGIC If run on a DR site, the information_schema parameter should be the same as the source catalog. |
| 8 | +# MAGIC |
| 9 | +# MAGIC Assumptions: |
| 10 | +# MAGIC - The storage credential(s) and external location(s) of the parent external location needs to be created on the target UC beforehand. |
| 11 | +# MAGIC - The external location for all schemas is the same, formed of ```<storage location root>/<schema name>/<table name>``` |
| 12 | +# MAGIC - All tables are Delta |
| 13 | + |
| 14 | +# COMMAND ---------- |
| 15 | + |
| 16 | +dbutils.widgets.removeAll() |
| 17 | +dbutils.widgets.text("storageLocation", "/mnt/externallocation", "Storage with source catalog info") |
| 18 | +dbutils.widgets.text("catalogName", "system", "information_schema catalog") |
| 19 | +dbutils. widgets. text( "rootExternalStorage", "abfss://[email protected]/root/", "Root of external tables' path") |
| 20 | + |
| 21 | +# COMMAND ---------- |
| 22 | + |
| 23 | +storage_location = dbutils.widgets.get("storageLocation") |
| 24 | +catalog_name = dbutils.widgets.get("catalogName") |
| 25 | +root_externalstorage = dbutils.widgets.get("rootExternalStorage") |
| 26 | + |
| 27 | +#table_list = dbutils.fs.ls(storage_location) |
| 28 | + |
| 29 | +# COMMAND ---------- |
| 30 | + |
| 31 | +import re |
| 32 | +def return_schema(df): |
| 33 | + column_names = df.orderBy(df.ordinal_position.asc()).select("column_name", upper("full_data_type")).collect() |
| 34 | + schema = "" |
| 35 | + for x,y in column_names: |
| 36 | + sql =f''' {x} {y},''' |
| 37 | + schema += sql |
| 38 | + if y == []: |
| 39 | + break |
| 40 | + |
| 41 | + p = re.compile('(,$)') |
| 42 | + schema_no_ending_comma = p.sub('', schema) |
| 43 | + return(schema_no_ending_comma) |
| 44 | + |
| 45 | +# COMMAND ---------- |
| 46 | + |
| 47 | +# MAGIC %md |
| 48 | +# MAGIC #### Create catalog |
| 49 | + |
| 50 | +# COMMAND ---------- |
| 51 | + |
| 52 | +catalog_df = spark.read.format("delta").load(f"{storage_location}/catalogs") |
| 53 | + |
| 54 | +#Change to this iteration when running on a remote UC |
| 55 | +for catalog in catalog_df.collect(): |
| 56 | + spark.sql(f"CREATE CATALOG {catalog.catalog_name} COMMENT '{catalog.comment}'") |
| 57 | + spark.sql(f"ALTER CATALOG {catalog.catalog_name} SET OWNER to `{catalog.catalog_owner}`") |
| 58 | + |
| 59 | +#Local testing with different catalog name |
| 60 | +#spark.sql(f"CREATE CATALOG {catalog_name} COMMENT '{catalog_df.collect()[0].comment}'") |
| 61 | +#spark.sql(f"ALTER CATALOG {catalog.catalog_name} SET OWNER to `{catalog_df.collect()[0].catalog_owner}`") |
| 62 | + |
| 63 | +# COMMAND ---------- |
| 64 | + |
| 65 | +# MAGIC %md |
| 66 | +# MAGIC #### Create schemas |
| 67 | + |
| 68 | +# COMMAND ---------- |
| 69 | + |
| 70 | +from pyspark.sql.functions import col, when, collect_list, upper |
| 71 | + |
| 72 | + |
| 73 | +#Get only user schemas |
| 74 | +schemas_df = spark.read.format("delta").load(f"{storage_location}/schemata").filter("schema_name<>'information_schema'") |
| 75 | + |
| 76 | +#Drop the default schema |
| 77 | +spark.sql(f"DROP SCHEMA {catalog_name}.default") |
| 78 | + |
| 79 | +#Create all user schemas on the target catalog |
| 80 | +for schema in schemas_df.collect(): |
| 81 | + spark.sql(f"CREATE SCHEMA {catalog_name}.{schema.schema_name} COMMENT '{schema.comment}'") |
| 82 | + spark.sql(f"ALTER SCHEMA {catalog_name}.{schema.schema_name} SET OWNER to `{schema.schema_owner}`") |
| 83 | + |
| 84 | + |
| 85 | +# COMMAND ---------- |
| 86 | + |
| 87 | +# MAGIC %md |
| 88 | +# MAGIC #### Create external tables |
| 89 | + |
| 90 | +# COMMAND ---------- |
| 91 | + |
| 92 | +#Get only external user tables |
| 93 | +tables_df = spark.read.format("delta").load(f"{storage_location}/tables").filter("table_schema<>'information_schema' and table_type='EXTERNAL'") |
| 94 | + |
| 95 | +for table in tables_df.collect(): |
| 96 | + columns_df = spark.read.format("delta").load(f"{storage_location}/columns").filter((col("table_schema") == table.table_schema) & (col("table_name") == table.table_name)) |
| 97 | + columns = return_schema(columns_df) |
| 98 | + |
| 99 | + #Create Table |
| 100 | + spark.sql(f"CREATE OR REPLACE TABLE {catalog_name}.{table.table_schema}.{table.table_name}({columns}) COMMENT '{table.comment}' LOCATION '{root_externalstorage}{table.table_schema}/{table.table_name}'") |
| 101 | + spark.sql(f"ALTER TABLE {catalog_name}.{table.table_schema}.{table.table_name} SET OWNER to `{table.table_owner}`") |
| 102 | + |
| 103 | +# COMMAND ---------- |
| 104 | + |
| 105 | + |
0 commit comments