1+ import io
12import logging
23import os
34import re
67from typing import ClassVar
78
89from databricks .sdk import WorkspaceClient
10+ from databricks .sdk .service .workspace import ImportFormat
911
1012from databricks .labs .ucx .framework .crawlers import CrawlerBase , SqlBackend
1113from databricks .labs .ucx .mixins .sql import Row
1618@dataclass
1719class ExternalLocation :
1820 location : str
21+ table_count : int
1922
2023
2124@dataclass
@@ -30,6 +33,7 @@ class ExternalLocations(CrawlerBase[ExternalLocation]):
3033 def __init__ (self , ws : WorkspaceClient , sbe : SqlBackend , schema ):
3134 super ().__init__ (sbe , "hive_metastore" , schema , "external_locations" , ExternalLocation )
3235 self ._ws = ws
36+ self ._folder = f"/Users/{ ws .current_user .me ().user_name } /.ucx"
3337
3438 def _external_locations (self , tables : list [Row ], mounts ) -> Iterable [ExternalLocation ]:
3539 min_slash = 2
@@ -57,12 +61,14 @@ def _external_locations(self, tables: list[Row], mounts) -> Iterable[ExternalLoc
5761 + "/"
5862 )
5963 if common .count ("/" ) > min_slash :
60- external_locations [loc ] = ExternalLocation (common )
64+ table_count = external_locations [loc ].table_count
65+ external_locations [loc ] = ExternalLocation (common , table_count + 1 )
6166 dupe = True
6267 loc += 1
6368 if not dupe :
64- external_locations .append (ExternalLocation (os .path .dirname (location ) + "/" ))
69+ external_locations .append (ExternalLocation (os .path .dirname (location ) + "/" , 1 ))
6570 if location .startswith ("jdbc" ):
71+ dupe = False
6672 pattern = r"(\w+)=(.*?)(?=\s*,|\s*\])"
6773
6874 # Find all matches in the input string
@@ -93,7 +99,13 @@ def _external_locations(self, tables: list[Row], mounts) -> Iterable[ExternalLoc
9399 jdbc_location = f"jdbc:{ provider .lower ()} ://{ host } :{ port } /{ database } "
94100 else :
95101 jdbc_location = f"{ location .lower ()} /{ host } :{ port } /{ database } "
96- external_locations .append (ExternalLocation (jdbc_location ))
102+ for ext_loc in external_locations :
103+ if ext_loc .location == jdbc_location :
104+ ext_loc .table_count += 1
105+ dupe = True
106+ break
107+ if not dupe :
108+ external_locations .append (ExternalLocation (jdbc_location , 1 ))
97109
98110 return external_locations
99111
@@ -113,6 +125,81 @@ def _try_fetch(self) -> Iterable[ExternalLocation]:
113125 for row in self ._fetch (f"SELECT * FROM { self ._schema } .{ self ._table } " ):
114126 yield ExternalLocation (* row )
115127
128+ def _get_ext_location_definitions (self , missing_locations : list [ExternalLocation ]) -> list :
129+ tf_script = []
130+ cnt = 1
131+ for loc in missing_locations :
132+ if loc .location .startswith ("s3://" ):
133+ res_name = loc .location [5 :].rstrip ("/" ).replace ("/" , "_" )
134+ elif loc .location .startswith ("gcs://" ):
135+ res_name = loc .location [6 :].rstrip ("/" ).replace ("/" , "_" )
136+ elif loc .location .startswith ("abfss://" ):
137+ container_name = loc .location [8 : loc .location .index ("@" )]
138+ res_name = (
139+ loc .location [loc .location .index ("@" ) + 1 :]
140+ .replace (".dfs.core.windows.net" , "" )
141+ .rstrip ("/" )
142+ .replace ("/" , "_" )
143+ )
144+ res_name = f"{ container_name } _{ res_name } "
145+ else :
146+ # if the cloud storage url doesn't match the above condition or incorrect (example wasb://)
147+ # dont generate tf script and ignore
148+ logger .warning (f"unsupported storage format { loc .location } " )
149+ continue
150+ script = f'resource "databricks_external_location" "{ res_name } " {{ \n '
151+ script += f' name = "{ res_name } "\n '
152+ script += f' url = "{ loc .location .rstrip ("/" )} "\n '
153+ script += " credential_name = databricks_storage_credential.<storage_credential_reference>.id\n "
154+ script += "}\n "
155+ tf_script .append (script )
156+ cnt += 1
157+ return tf_script
158+
159+ def _match_table_external_locations (self ) -> tuple [list [list ], list [ExternalLocation ]]:
160+ external_locations = list (self ._ws .external_locations .list ())
161+ location_path = [_ .url .lower () for _ in external_locations ]
162+ table_locations = self .snapshot ()
163+ matching_locations = []
164+ missing_locations = []
165+ for loc in table_locations :
166+ # external_location.list returns url without trailing "/" but ExternalLocation.snapshot
167+ # does so removing the trailing slash before comparing
168+ if loc .location .rstrip ("/" ).lower () in location_path :
169+ # identify the index of the matching external_locations
170+ iloc = location_path .index (loc .location .rstrip ("/" ))
171+ matching_locations .append ([external_locations [iloc ].name , loc .table_count ])
172+ continue
173+ missing_locations .append (loc )
174+ return matching_locations , missing_locations
175+
176+ def save_as_terraform_definitions_on_workspace (self , folder : str | None = None ) -> str :
177+ if folder :
178+ self ._folder = folder
179+ matching_locations , missing_locations = self ._match_table_external_locations ()
180+ if len (matching_locations ) > 0 :
181+ logger .info ("following external locations are already configured." )
182+ logger .info ("sharing details of # tables that can be migrated for each location" )
183+ for _ in matching_locations :
184+ logger .info (f"{ _ [1 ]} tables can be migrated using external location { _ [0 ]} ." )
185+ if len (missing_locations ) > 0 :
186+ logger .info ("following external location need to be created." )
187+ for _ in missing_locations :
188+ logger .info (f"{ _ .table_count } tables can be migrated using external location { _ .location } ." )
189+ buffer = io .StringIO ()
190+ for script in self ._get_ext_location_definitions (missing_locations ):
191+ buffer .write (script )
192+ buffer .seek (0 )
193+ return self ._overwrite_mapping (buffer )
194+ else :
195+ logger .info ("no additional external location to be created." )
196+ return ""
197+
198+ def _overwrite_mapping (self , buffer ) -> str :
199+ path = f"{ self ._folder } /external_locations.tf"
200+ self ._ws .workspace .upload (path , buffer , overwrite = True , format = ImportFormat .AUTO )
201+ return path
202+
116203
117204class Mounts (CrawlerBase [Mount ]):
118205 def __init__ (self , backend : SqlBackend , ws : WorkspaceClient , inventory_database : str ):
0 commit comments