|
1 | 1 | import json |
| 2 | +import logging |
2 | 3 | import re |
3 | 4 | from dataclasses import dataclass |
4 | 5 |
|
5 | 6 | from databricks.sdk import WorkspaceClient |
| 7 | +from databricks.sdk.core import DatabricksError |
6 | 8 | from databricks.sdk.service.compute import ClusterSource |
7 | 9 | from databricks.sdk.service.jobs import BaseJob |
8 | 10 |
|
9 | 11 | from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend |
10 | 12 |
|
| 13 | +logger = logging.getLogger(__name__) |
| 14 | + |
11 | 15 | INCOMPATIBLE_SPARK_CONFIG_KEYS = [ |
12 | 16 | "spark.databricks.passthrough.enabled", |
13 | 17 | "spark.hadoop.javax.jdo.option.ConnectionURL", |
@@ -145,12 +149,15 @@ def _assess_clusters(self, all_clusters): |
145 | 149 |
|
146 | 150 | # Checking if Azure cluster config is present in cluster policies |
147 | 151 | if cluster.policy_id: |
148 | | - policy = self._ws.cluster_policies.get(cluster.policy_id) |
149 | | - if _azure_sp_conf_present_check(json.loads(policy.definition)): |
150 | | - failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") |
151 | | - if policy.policy_family_definition_overrides: |
152 | | - if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): |
| 152 | + try: |
| 153 | + policy = self._ws.cluster_policies.get(cluster.policy_id) |
| 154 | + if _azure_sp_conf_present_check(json.loads(policy.definition)): |
153 | 155 | failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") |
| 156 | + if policy.policy_family_definition_overrides: |
| 157 | + if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): |
| 158 | + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") |
| 159 | + except DatabricksError as err: |
| 160 | + logger.warning(f"Error retrieving cluster policy {cluster.policy_id}. Error: {err}") |
154 | 161 |
|
155 | 162 | cluster_info.failures = json.dumps(failures) |
156 | 163 | if len(failures) > 0: |
@@ -220,12 +227,15 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> list[JobI |
220 | 227 |
|
221 | 228 | # Checking if Azure cluster config is present in cluster policies |
222 | 229 | if cluster_config.policy_id: |
223 | | - policy = self._ws.cluster_policies.get(cluster_config.policy_id) |
224 | | - if _azure_sp_conf_present_check(json.loads(policy.definition)): |
225 | | - job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") |
226 | | - if policy.policy_family_definition_overrides: |
227 | | - if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): |
| 230 | + try: |
| 231 | + policy = self._ws.cluster_policies.get(cluster_config.policy_id) |
| 232 | + if _azure_sp_conf_present_check(json.loads(policy.definition)): |
228 | 233 | job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") |
| 234 | + if policy.policy_family_definition_overrides: |
| 235 | + if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): |
| 236 | + job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") |
| 237 | + except DatabricksError as err: |
| 238 | + logger.warning(f"Error retrieving cluster policy {cluster_config.policy_id}. Error: {err}") |
229 | 239 |
|
230 | 240 | for job_key in job_details.keys(): |
231 | 241 | job_details[job_key].failures = json.dumps(list(job_assessment[job_key])) |
|
0 commit comments