|
11 | 11 |
|
12 | 12 | # COMMAND ---------- |
13 | 13 |
|
| 14 | +from delta.tables import * |
| 15 | + |
| 16 | +# COMMAND ---------- |
| 17 | + |
14 | 18 | dbutils.widgets.removeAll() |
15 | 19 | dbutils.widgets.text("storageLocation", "/mnt/externallocation", "Storage location for copy") |
16 | 20 | dbutils.widgets.text("catalogName", "system", "information_schema catalog") |
| 21 | +dbutils.widgets.dropdown("getExternalLocations", "True", ["True", "False"]) |
17 | 22 |
|
18 | 23 | # COMMAND ---------- |
19 | 24 |
|
20 | 25 | storage_location = dbutils.widgets.get("storageLocation") |
21 | 26 | catalog_name = dbutils.widgets.get("catalogName") |
| 27 | +get_external_location = dbutils.widgets.get("getExternalLocations") |
22 | 28 |
|
23 | 29 | table_list = spark.catalog.listTables(f"{catalog_name}.information_schema") |
24 | 30 |
|
25 | 31 | # COMMAND ---------- |
26 | 32 |
|
27 | 33 | for table in table_list: |
28 | | - df = spark.sql(f"SELECT * FROM {table.catalog}.information_schema.{table.name}") |
29 | | - df.write.format("delta").mode("overwrite").save(f"{storage_location}/{table.name}") |
| 34 | + info_schema_table_df = spark.sql(f"SELECT * FROM {table.catalog}.information_schema.{table.name}") |
| 35 | + info_schema_table_df.write.format("delta").mode("overwrite").save(f"{storage_location}/{table.name}") |
| 36 | + |
| 37 | +# COMMAND ---------- |
| 38 | + |
| 39 | +# MAGIC %md |
| 40 | +# MAGIC ##Optional step |
| 41 | +# MAGIC Get table locations from running DESCRIBE EXTENDED on each table on information_schema.tables |
| 42 | + |
| 43 | +# COMMAND ---------- |
| 44 | + |
| 45 | +if get_external_location: |
| 46 | + table_location_columns = ["table_catalog","table_schema","table_name","table_location"] |
| 47 | + table_location_storage = "external_table_locations" |
| 48 | + location_list = [] |
| 49 | + |
| 50 | + #Need to filter out Unity Catalog data source that counts as external |
| 51 | + describe_table_list = spark.read.table(f"{catalog_name}.information_schema.tables").filter("table_type=='EXTERNAL' AND data_source_format <> 'UNITY_CATALOG'") |
| 52 | + |
| 53 | + for d_table in describe_table_list.collect(): |
| 54 | + d_location = spark.sql(f"DESCRIBE EXTENDED {d_table.table_catalog}.{d_table.table_schema}.{d_table.table_name}").filter("col_name = 'Location'").select("data_type").head()[0] |
| 55 | + location_list.append([d_table.table_catalog, d_table.table_schema, d_table.table_name, d_location]) |
| 56 | + |
| 57 | + location_df = spark.createDataFrame(data=location_list, schema = table_location_columns) |
| 58 | + |
| 59 | + #merge with information_schema.tables and save external locations to storage_sub_directory column (that as of 03/09 only holds Managed table information) |
| 60 | + table_df = DeltaTable.forPath(spark, f"{storage_location}/tables") |
| 61 | + #table_df = spark.sql(f"SELECT * FROM {table.catalog}.information_schema.tables") |
| 62 | + table_df.alias('tables') \ |
| 63 | + .merge( |
| 64 | + location_df.alias('locations'), |
| 65 | + 'tables.table_catalog = locations.table_catalog and tables.table_schema = locations.table_schema and tables.table_name = locations.table_name' |
| 66 | + ) \ |
| 67 | + .whenMatchedUpdate(set = |
| 68 | + { |
| 69 | + "storage_sub_directory": "locations.table_location" |
| 70 | + } |
| 71 | + ) \ |
| 72 | + .execute() |
| 73 | + |
| 74 | + display(table_df) |
| 75 | + #or create a separate table only for this |
| 76 | + #(location_df |
| 77 | + #.write |
| 78 | + #.mode("overwrite") |
| 79 | + #.format("delta") |
| 80 | + #.save(f"{storage_location}/{table_location_storage}")) |
0 commit comments