Skip to content

Commit 7e6414b

Browse files
authored
Create UC external locations in Azure based on migrated storage credentials (#992)
1 parent e0da29a commit 7e6414b

File tree

8 files changed

+999
-15
lines changed

8 files changed

+999
-15
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import logging
2+
from urllib.parse import urlparse
3+
4+
from databricks.labs.blueprint.installation import Installation
5+
from databricks.sdk import WorkspaceClient
6+
from databricks.sdk.errors.platform import InvalidParameterValue, PermissionDenied
7+
8+
from databricks.labs.ucx.azure.access import AzureResourcePermissions
9+
from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources
10+
from databricks.labs.ucx.config import WorkspaceConfig
11+
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
12+
from databricks.labs.ucx.hive_metastore import ExternalLocations
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class ExternalLocationsMigration:
18+
def __init__(
19+
self,
20+
ws: WorkspaceClient,
21+
hms_locations: ExternalLocations,
22+
resource_permissions: AzureResourcePermissions,
23+
azurerm: AzureResources,
24+
):
25+
self._ws = ws
26+
self._hms_locations = hms_locations
27+
self._resource_permissions = resource_permissions
28+
self._azurerm = azurerm
29+
30+
@classmethod
31+
def for_cli(cls, ws: WorkspaceClient, installation: Installation):
32+
config = installation.load(WorkspaceConfig)
33+
sql_backend = StatementExecutionBackend(ws, config.warehouse_id)
34+
hms_locations = ExternalLocations(ws, sql_backend, config.inventory_database)
35+
36+
azure_mgmt_client = AzureAPIClient(
37+
ws.config.arm_environment.resource_manager_endpoint,
38+
ws.config.arm_environment.service_management_endpoint,
39+
)
40+
graph_client = AzureAPIClient("https://graph.microsoft.com", "https://graph.microsoft.com")
41+
azurerm = AzureResources(azure_mgmt_client, graph_client)
42+
43+
resource_permissions = AzureResourcePermissions(installation, ws, azurerm, hms_locations)
44+
45+
return cls(ws, hms_locations, resource_permissions, azurerm)
46+
47+
def _app_id_credential_name_mapping(self) -> tuple[dict[str, str], dict[str, str]]:
48+
# list all storage credentials.
49+
# generate the managed identity/service principal application id to credential name mapping.
50+
# return one mapping for all non read-only credentials and one mapping for all read-only credentials
51+
# TODO: considering put this logic into the StorageCredentialManager
52+
app_id_mapping_write = {}
53+
app_id_mapping_read = {}
54+
all_credentials = self._ws.storage_credentials.list(max_results=0)
55+
for credential in all_credentials:
56+
name = credential.name
57+
# cannot have none credential name, it's required for external location
58+
if not name:
59+
continue
60+
61+
read_only = credential.read_only
62+
service_principal = credential.azure_service_principal
63+
managed_identity = credential.azure_managed_identity
64+
65+
application_id = None
66+
if service_principal:
67+
# if service principal based credential, use service principal's application_id directly
68+
application_id = service_principal.application_id
69+
if managed_identity:
70+
# if managed identity based credential, fetch the application_id of the managed identity
71+
application_id = self._azurerm.managed_identity_client_id(
72+
managed_identity.access_connector_id,
73+
managed_identity.managed_identity_id,
74+
)
75+
if not application_id:
76+
continue
77+
78+
if read_only:
79+
app_id_mapping_read[application_id] = name
80+
continue
81+
app_id_mapping_write[application_id] = name
82+
83+
return app_id_mapping_write, app_id_mapping_read
84+
85+
def _prefix_credential_name_mapping(self) -> tuple[dict[str, str], dict[str, str]]:
86+
# get managed identity/service principal's application id to storage credential name mapping
87+
# for all non read-only and read-only credentials
88+
app_id_mapping_write, app_id_mapping_read = self._app_id_credential_name_mapping()
89+
90+
# use the application id to storage credential name mapping to create prefix to storage credential name mapping
91+
prefix_mapping_write = {}
92+
prefix_mapping_read = {}
93+
for permission_mapping in self._resource_permissions.load():
94+
if permission_mapping.client_id in app_id_mapping_write:
95+
prefix_mapping_write[permission_mapping.prefix] = app_id_mapping_write[permission_mapping.client_id]
96+
continue
97+
if permission_mapping.client_id in app_id_mapping_read:
98+
prefix_mapping_read[permission_mapping.prefix] = app_id_mapping_read[permission_mapping.client_id]
99+
return prefix_mapping_write, prefix_mapping_read
100+
101+
def _create_location_name(self, location_url: str) -> str:
102+
# generate the UC external location name
103+
before_at, _, after_at = location_url.partition('@')
104+
container_name = before_at.removeprefix("abfss://")
105+
res_name = after_at.replace(".dfs.core.windows.net", "").rstrip("/").replace("/", "_")
106+
return f"{container_name}_{res_name}"
107+
108+
def _create_external_location_helper(
109+
self, name, url, credential, comment="Created by UCX", read_only=False, skip_validation=False
110+
) -> str | None:
111+
try:
112+
self._ws.external_locations.create(
113+
name, url, credential, comment=comment, read_only=read_only, skip_validation=skip_validation
114+
)
115+
return url
116+
except InvalidParameterValue as invalid:
117+
if "overlaps with an existing external location" in str(invalid):
118+
logger.warning(f"Skip creating external location, see details: {str(invalid)}")
119+
return None
120+
raise invalid
121+
122+
def _create_external_location(
123+
self, location_url: str, prefix_mapping_write: dict[str, str], prefix_mapping_read: dict[str, str]
124+
) -> str | None:
125+
location_name = self._create_location_name(location_url)
126+
127+
# get container url as the prefix
128+
parsed_url = urlparse(location_url)
129+
container_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
130+
131+
# try to create external location with write privilege first
132+
if container_url in prefix_mapping_write:
133+
url = self._create_external_location_helper(
134+
location_name, location_url, prefix_mapping_write[container_url], comment="Created by UCX"
135+
)
136+
return url
137+
# if no matched write privilege credential, try to create read-only external location
138+
if container_url in prefix_mapping_read:
139+
try:
140+
url = self._create_external_location_helper(
141+
location_name,
142+
location_url,
143+
prefix_mapping_read[container_url],
144+
comment="Created by UCX",
145+
read_only=True,
146+
)
147+
return url
148+
except PermissionDenied as denied:
149+
if "No file available under the location to read" in str(denied):
150+
# Empty location will cause failed READ permission check with read-only credential
151+
# Skip skip_validation in this case
152+
url = self._create_external_location_helper(
153+
location_name,
154+
location_url,
155+
prefix_mapping_read[container_url],
156+
comment="Created by UCX",
157+
read_only=True,
158+
skip_validation=True,
159+
)
160+
return url
161+
raise denied
162+
# if no credential found
163+
return None
164+
165+
def run(self):
166+
# list missing external locations in UC
167+
_, missing_locations = self._hms_locations.match_table_external_locations()
168+
# Extract the location URLs from the missing locations
169+
missing_loc_urls = [loc.location for loc in missing_locations]
170+
171+
# get prefix to storage credential name mapping
172+
prefix_mapping_write, prefix_mapping_read = self._prefix_credential_name_mapping()
173+
174+
# if missing external location is in prefix to storage credential name mapping
175+
# create a UC external location with mapped storage credential name
176+
migrated_loc_urls = []
177+
for location_url in missing_loc_urls:
178+
migrated_loc_url = self._create_external_location(location_url, prefix_mapping_write, prefix_mapping_read)
179+
if migrated_loc_url:
180+
migrated_loc_urls.append(migrated_loc_url)
181+
182+
leftover_loc_urls = [url for url in missing_loc_urls if url not in migrated_loc_urls]
183+
if leftover_loc_urls:
184+
logger.info(
185+
"External locations below are not created in UC. You may check following cases and rerun this command:"
186+
"1. Please check the output of 'migrate_credentials' command for storage credentials migration failure."
187+
"2. If you use service principal in extra_config when create dbfs mount or use service principal "
188+
"in your code directly for storage access, UCX cannot automatically migrate them to storage credential."
189+
"Please manually create those storage credentials first."
190+
"3. You may have overlapping external location already in UC."
191+
)
192+
for loc_url in leftover_loc_urls:
193+
logger.info(f"Not created external location: {loc_url}")
194+
return leftover_loc_urls
195+
196+
logger.info("All UC external location are created.")
197+
return leftover_loc_urls

src/databricks/labs/ucx/azure/resources.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,3 +341,36 @@ def _role_name(self, role_definition_id) -> str | None:
341341
return None
342342
self._role_definitions[role_definition_id] = role_name
343343
return self._role_definitions.get(role_definition_id)
344+
345+
def managed_identity_client_id(
346+
self, access_connector_id: str, user_assigned_identity_id: str | None = None
347+
) -> str | None:
348+
# get te client_id/application_id of the managed identity used in the access connector
349+
try:
350+
identity = self._mgmt.get(access_connector_id, "2023-05-01").get("identity")
351+
except NotFound:
352+
logger.warning(f"Access connector {access_connector_id} no longer exists")
353+
return None
354+
if not identity:
355+
return None
356+
357+
if identity.get("type") == "UserAssigned":
358+
if not user_assigned_identity_id:
359+
return None
360+
identities = identity.get("userAssignedIdentities")
361+
if user_assigned_identity_id in identities:
362+
return identities.get(user_assigned_identity_id).get("clientId")
363+
# sometimes we see "resourceGroups" instead of "resourcegroups" in the response from Azure RM API
364+
# but "resourcegroups" is in response from storage credential's managed_identity_id
365+
alternative_identity_id = user_assigned_identity_id.replace("resourcegroups", "resourceGroups")
366+
if alternative_identity_id in identities:
367+
return identities.get(alternative_identity_id).get("clientId")
368+
return None
369+
if identity.get("type") == "SystemAssigned":
370+
# SystemAssigned managed identity does not have client_id in get access connector response
371+
# need to call graph api directoryObjects to fetch the client_id
372+
principal = self._get_principal(identity.get("principalId"))
373+
if not principal:
374+
return None
375+
return principal.client_id
376+
return None

src/databricks/labs/ucx/hive_metastore/locations.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,30 +160,39 @@ def _get_ext_location_definitions(self, missing_locations: list[ExternalLocation
160160
cnt += 1
161161
return tf_script
162162

163-
def _match_table_external_locations(self) -> tuple[list[list], list[ExternalLocation]]:
164-
external_locations = list(self._ws.external_locations.list())
165-
location_path = [_.url.lower() for _ in external_locations]
163+
def match_table_external_locations(self) -> tuple[dict[str, int], list[ExternalLocation]]:
164+
uc_external_locations = list(self._ws.external_locations.list())
166165
table_locations = self.snapshot()
167-
matching_locations = []
166+
matching_locations = {}
168167
missing_locations = []
169-
for loc in table_locations:
168+
for table_loc in table_locations:
170169
# external_location.list returns url without trailing "/" but ExternalLocation.snapshot
171170
# does so removing the trailing slash before comparing
172-
if loc.location.rstrip("/").lower() in location_path:
173-
# identify the index of the matching external_locations
174-
iloc = location_path.index(loc.location.rstrip("/"))
175-
matching_locations.append([external_locations[iloc].name, loc.table_count])
176-
continue
177-
missing_locations.append(loc)
171+
matched = False
172+
for uc_loc in uc_external_locations:
173+
if not uc_loc.url:
174+
continue
175+
if not uc_loc.name:
176+
continue
177+
uc_loc_path = uc_loc.url.lower()
178+
if uc_loc_path in table_loc.location.rstrip("/").lower():
179+
if uc_loc.name not in matching_locations:
180+
matching_locations[uc_loc.name] = table_loc.table_count
181+
else:
182+
matching_locations[uc_loc.name] = matching_locations[uc_loc.name] + table_loc.table_count
183+
matched = True
184+
break
185+
if not matched:
186+
missing_locations.append(table_loc)
178187
return matching_locations, missing_locations
179188

180189
def save_as_terraform_definitions_on_workspace(self, installation: Installation):
181-
matching_locations, missing_locations = self._match_table_external_locations()
190+
matching_locations, missing_locations = self.match_table_external_locations()
182191
if len(matching_locations) > 0:
183192
logger.info("following external locations are already configured.")
184193
logger.info("sharing details of # tables that can be migrated for each location")
185-
for _ in matching_locations:
186-
logger.info(f"{_[1]} tables can be migrated using external location {_[0]}.")
194+
for location, table_count in matching_locations.items():
195+
logger.info(f"{table_count} tables can be migrated using UC external location: {location}.")
187196
if len(missing_locations) > 0:
188197
logger.info("following external location need to be created.")
189198
for _ in missing_locations:

0 commit comments

Comments
 (0)