Skip to content

Commit f766bb6

Browse files
author
dougspadotto
committed
Add support to sync managed tables
1 parent c78b16f commit f766bb6

File tree

1 file changed

+57
-0
lines changed

1 file changed

+57
-0
lines changed

metastore_export_import/02_recreate_catalog.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,61 @@ def return_schema(df):
116116

117117
# COMMAND ----------
118118

119+
# MAGIC %md
120+
# MAGIC #### Create managed tables
121+
# MAGIC
122+
# MAGIC For this to work you'll have to:
123+
# MAGIC
124+
# MAGIC 1) create a managed identity with Storage Blob Contributor permission to the storage account, or create IAM role with the same process used for the primary metastore storage.
125+
# MAGIC 2) create a storage credential on the secondary site using this managed identity/IAM role
126+
# MAGIC 3) create an external location pointing to the storage account/bucket used by the primary metastore
127+
# MAGIC 4) inform the primary storage location as a parameter
128+
129+
# COMMAND ----------
130+
131+
import requests
132+
133+
#TODO: Move to secrets
134+
databricks_url = "<SECRET>"
135+
my_token = "<SECRET>"
136+
137+
#TODO: move as parameter
138+
metastore_id = "<METASTORE ID>"
139+
140+
header = {'Authorization': 'Bearer {}'.format(my_token)}
141+
142+
endpoint = f"/api/2.0/unity-catalog/metastores/{metastore_id}"
143+
144+
resp = requests.get(
145+
databricks_url + endpoint,
146+
headers=header
147+
)
148+
149+
base_metastore_url=resp.json().get("storage_root")
150+
151+
# COMMAND ----------
119152

153+
#if Catalog uses its own storage, change base_metastore_url
154+
155+
endpoint = f"/api/2.0/unity-catalog/catalogs/{catalog_name}"
156+
157+
resp = requests.get(
158+
databricks_url + endpoint,
159+
headers=header
160+
)
161+
162+
if (resp.json().get("storage_location")):
163+
base_metastore_url = resp.json().get("storage_location")
164+
165+
# COMMAND ----------
166+
167+
#Get only managed tables
168+
tables_df = spark.read.format("delta").load(f"{storage_location}/tables").filter("table_schema<>'information_schema' and table_type='MANAGED'")
169+
170+
for table in tables_df.collect():
171+
columns_df = spark.read.format("delta").load(f"{storage_location}/columns").filter((col("table_schema") == table.table_schema) & (col("table_name") == table.table_name))
172+
columns = return_schema(columns_df)
173+
174+
#Extracted path
175+
spark.sql(f"CREATE OR REPLACE TABLE {catalog_name}.{table.table_schema}.{table.table_name} CLONE delta.`{base_metastore_url}{table.storage_sub_directory}`")
176+
spark.sql(f"ALTER TABLE {catalog_name}.{table.table_schema}.{table.table_name} SET OWNER to `{table.table_owner}`")

0 commit comments

Comments
 (0)