|
| 1 | +import json |
| 2 | +import uuid |
1 | 3 | from dataclasses import dataclass |
2 | 4 |
|
3 | 5 | from databricks.labs.blueprint.installation import Installation |
| 6 | +from databricks.labs.blueprint.tui import Prompts |
4 | 7 | from databricks.sdk import WorkspaceClient |
| 8 | +from databricks.sdk.errors import NotFound, ResourceAlreadyExists |
5 | 9 | from databricks.sdk.service.catalog import Privilege |
6 | 10 |
|
7 | 11 | from databricks.labs.ucx.assessment.crawlers import logger |
8 | | -from databricks.labs.ucx.azure.resources import AzureResource, AzureResources |
| 12 | +from databricks.labs.ucx.azure.resources import ( |
| 13 | + AzureAPIClient, |
| 14 | + AzureResource, |
| 15 | + AzureResources, |
| 16 | + PrincipalSecret, |
| 17 | +) |
9 | 18 | from databricks.labs.ucx.config import WorkspaceConfig |
10 | 19 | from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend |
11 | 20 | from databricks.labs.ucx.hive_metastore.locations import ExternalLocations |
@@ -46,7 +55,12 @@ def for_cli(cls, ws: WorkspaceClient, product='ucx', include_subscriptions=None) |
46 | 55 | installation = Installation.current(ws, product) |
47 | 56 | config = installation.load(WorkspaceConfig) |
48 | 57 | sql_backend = StatementExecutionBackend(ws, config.warehouse_id) |
49 | | - azurerm = AzureResources(ws, include_subscriptions=include_subscriptions) |
| 58 | + azure_mgmt_client = AzureAPIClient( |
| 59 | + ws.config.arm_environment.resource_manager_endpoint, |
| 60 | + ws.config.arm_environment.service_management_endpoint, |
| 61 | + ) |
| 62 | + graph_client = AzureAPIClient("https://graph.microsoft.com", "https://graph.microsoft.com") |
| 63 | + azurerm = AzureResources(azure_mgmt_client, graph_client, include_subscriptions) |
50 | 64 | locations = ExternalLocations(ws, sql_backend, config.inventory_database) |
51 | 65 | return cls(installation, ws, azurerm, locations) |
52 | 66 |
|
@@ -91,6 +105,121 @@ def save_spn_permissions(self) -> str | None: |
91 | 105 | return None |
92 | 106 | return self._installation.save(storage_account_infos, filename=self._filename) |
93 | 107 |
|
| 108 | + def _update_cluster_policy_definition( |
| 109 | + self, |
| 110 | + policy_definition: str, |
| 111 | + storage_accounts: list[AzureResource], |
| 112 | + uber_principal: PrincipalSecret, |
| 113 | + inventory_database: str, |
| 114 | + ) -> str: |
| 115 | + policy_dict = json.loads(policy_definition) |
| 116 | + tenant_id = self._azurerm.tenant_id() |
| 117 | + endpoint = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token" |
| 118 | + for storage in storage_accounts: |
| 119 | + policy_dict[ |
| 120 | + f"spark_conf.fs.azure.account.oauth2.client.id.{storage.storage_account}.dfs.core.windows.net" |
| 121 | + ] = self._policy_config(uber_principal.client.client_id) |
| 122 | + policy_dict[ |
| 123 | + f"spark_conf.fs.azure.account.oauth.provider.type.{storage.storage_account}.dfs.core.windows.net" |
| 124 | + ] = self._policy_config("org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") |
| 125 | + policy_dict[ |
| 126 | + f"spark_conf.fs.azure.account.oauth2.client.endpoint.{storage.storage_account}.dfs.core.windows.net" |
| 127 | + ] = self._policy_config(endpoint) |
| 128 | + policy_dict[f"spark_conf.fs.azure.account.auth.type.{storage.storage_account}.dfs.core.windows.net"] = ( |
| 129 | + self._policy_config("OAuth") |
| 130 | + ) |
| 131 | + policy_dict[ |
| 132 | + f"spark_conf.fs.azure.account.oauth2.client.secret.{storage.storage_account}.dfs.core.windows.net" |
| 133 | + ] = self._policy_config(f"{{secrets/{inventory_database}/uber_principal_secret}}") |
| 134 | + return json.dumps(policy_dict) |
| 135 | + |
| 136 | + @staticmethod |
| 137 | + def _policy_config(value: str): |
| 138 | + return {"type": "fixed", "value": value} |
| 139 | + |
| 140 | + def _update_cluster_policy_with_spn( |
| 141 | + self, |
| 142 | + policy_id: str, |
| 143 | + storage_accounts: list[AzureResource], |
| 144 | + uber_principal: PrincipalSecret, |
| 145 | + inventory_database: str, |
| 146 | + ): |
| 147 | + try: |
| 148 | + policy_definition = "" |
| 149 | + cluster_policy = self._ws.cluster_policies.get(policy_id) |
| 150 | + |
| 151 | + self._installation.save(cluster_policy, filename="policy-backup.json") |
| 152 | + |
| 153 | + if cluster_policy.definition is not None: |
| 154 | + policy_definition = self._update_cluster_policy_definition( |
| 155 | + cluster_policy.definition, storage_accounts, uber_principal, inventory_database |
| 156 | + ) |
| 157 | + if cluster_policy.name is not None: |
| 158 | + self._ws.cluster_policies.edit(policy_id, cluster_policy.name, definition=policy_definition) |
| 159 | + except NotFound: |
| 160 | + msg = f"cluster policy {policy_id} not found, please run UCX installation to create UCX cluster policy" |
| 161 | + raise NotFound(msg) from None |
| 162 | + |
| 163 | + def create_uber_principal(self, prompts: Prompts): |
| 164 | + config = self._installation.load(WorkspaceConfig) |
| 165 | + inventory_database = config.inventory_database |
| 166 | + display_name = f"unity-catalog-migration-{inventory_database}-{self._ws.get_workspace_id()}" |
| 167 | + uber_principal_name = prompts.question( |
| 168 | + "Enter a name for the uber service principal to be created", default=display_name |
| 169 | + ) |
| 170 | + policy_id = config.policy_id |
| 171 | + if policy_id is None: |
| 172 | + msg = "UCX cluster policy not found in config. Please run latest UCX installation to set cluster policy" |
| 173 | + logger.error(msg) |
| 174 | + raise ValueError(msg) from None |
| 175 | + if config.uber_spn_id is not None: |
| 176 | + logger.warning("Uber service principal already created for this workspace.") |
| 177 | + return |
| 178 | + used_storage_accounts = self._get_storage_accounts() |
| 179 | + if len(used_storage_accounts) == 0: |
| 180 | + logger.warning( |
| 181 | + "There are no external table present with azure storage account. " |
| 182 | + "Please check if assessment job is run" |
| 183 | + ) |
| 184 | + return |
| 185 | + storage_account_info = [] |
| 186 | + for storage in self._azurerm.storage_accounts(): |
| 187 | + if storage.storage_account in used_storage_accounts: |
| 188 | + storage_account_info.append(storage) |
| 189 | + logger.info("Creating service principal") |
| 190 | + uber_principal = self._azurerm.create_service_principal(uber_principal_name) |
| 191 | + self._create_scope(uber_principal, inventory_database) |
| 192 | + config.uber_spn_id = uber_principal.client.client_id |
| 193 | + logger.info( |
| 194 | + f"Created service principal of client_id {config.uber_spn_id}. " f"Applying permission on storage accounts" |
| 195 | + ) |
| 196 | + try: |
| 197 | + self._apply_storage_permission(storage_account_info, uber_principal) |
| 198 | + self._installation.save(config) |
| 199 | + self._update_cluster_policy_with_spn(policy_id, storage_account_info, uber_principal, inventory_database) |
| 200 | + except PermissionError: |
| 201 | + self._azurerm.delete_service_principal(uber_principal.client.object_id) |
| 202 | + logger.info(f"Update UCX cluster policy {policy_id} with spn connection details for storage accounts") |
| 203 | + |
| 204 | + def _apply_storage_permission(self, storage_account_info: list[AzureResource], uber_principal: PrincipalSecret): |
| 205 | + for storage in storage_account_info: |
| 206 | + role_name = str(uuid.uuid4()) |
| 207 | + self._azurerm.apply_storage_permission( |
| 208 | + uber_principal.client.object_id, storage, "STORAGE_BLOB_DATA_READER", role_name |
| 209 | + ) |
| 210 | + logger.debug( |
| 211 | + f"Storage Data Blob Reader permission applied for spn {uber_principal.client.client_id} " |
| 212 | + f"to storage account {storage.storage_account}" |
| 213 | + ) |
| 214 | + |
| 215 | + def _create_scope(self, uber_principal: PrincipalSecret, inventory_database: str): |
| 216 | + logger.info(f"Creating secret scope {inventory_database}.") |
| 217 | + try: |
| 218 | + self._ws.secrets.create_scope(inventory_database) |
| 219 | + except ResourceAlreadyExists: |
| 220 | + logger.warning(f"Secret scope {inventory_database} already exists, using the same") |
| 221 | + self._ws.secrets.put_secret(inventory_database, "uber_principal_secret", string_value=uber_principal.secret) |
| 222 | + |
94 | 223 | def load(self): |
95 | 224 | return self._installation.load(list[StoragePermissionMapping], filename=self._filename) |
96 | 225 |
|
|
0 commit comments