Skip to content

Commit 3a43ccf

Browse files
authored
Added Dashboard widget to show the list of cluster policies along with DBR version (#1013)
## Changes Adding new table called policies and loading cluster policies along with dbr version in the same ### Linked issues #830 Resolves #.. ### Functionality - [ ] added relevant user documentation - [ ] added new CLI command - [ ] modified existing command: `databricks labs ucx ...` - [ ] added a new workflow - [ ] modified existing workflow: `...` - [x] added a new table - [x] modified existing table: `...` ### Tests <img width="749" alt="image" src="https://github.com/databrickslabs/ucx/assets/127273819/117b8a78-9c9e-46dc-a838-7329c68c0bcb"> - [x] manually tested - [x] added unit tests - [x] added integration tests - [ ] verified on staging environment (screenshot attached)
1 parent 3c5f003 commit 3a43ccf

File tree

11 files changed

+225
-7
lines changed

11 files changed

+225
-7
lines changed

src/databricks/labs/ucx/assessment/clusters.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class ClusterInfo:
3737
cluster_id: str
3838
success: int
3939
failures: str
40+
spark_version: str | None = None
41+
policy_id: str | None = None
4042
cluster_name: str | None = None
4143
creator: str | None = None
4244

@@ -156,6 +158,8 @@ def _assess_clusters(self, all_clusters):
156158
cluster_info = ClusterInfo(
157159
cluster_id=cluster.cluster_id if cluster.cluster_id else "",
158160
cluster_name=cluster.cluster_name,
161+
policy_id=cluster.policy_id,
162+
spark_version=cluster.spark_version,
159163
creator=cluster.creator_user_name,
160164
success=1,
161165
failures="[]",
@@ -172,3 +176,58 @@ def snapshot(self) -> Iterable[ClusterInfo]:
172176
def _try_fetch(self) -> Iterable[ClusterInfo]:
173177
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
174178
yield ClusterInfo(*row)
179+
180+
181+
@dataclass
182+
class PolicyInfo:
183+
policy_id: str
184+
policy_name: str
185+
success: int
186+
failures: str
187+
spark_version: str | None = None
188+
policy_description: str | None = None
189+
creator: str | None = None
190+
191+
192+
class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin):
193+
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
194+
super().__init__(sbe, "hive_metastore", schema, "policies", PolicyInfo)
195+
self._ws = ws
196+
197+
def _crawl(self) -> Iterable[PolicyInfo]:
198+
all_policices = list(self._ws.cluster_policies.list())
199+
return list(self._assess_policies(all_policices))
200+
201+
def _assess_policies(self, all_policices):
202+
for policy in all_policices:
203+
failures: list[str] = []
204+
if policy.policy_id is None:
205+
continue
206+
failures.extend(self._check_cluster_policy(policy.policy_id, "policy"))
207+
try:
208+
spark_version = json.dumps(json.loads(policy.definition)["spark_version"])
209+
except KeyError:
210+
spark_version = None
211+
policy_name = policy.name
212+
creator_name = policy.creator_user_name
213+
214+
policy_info = PolicyInfo(
215+
policy_id=policy.policy_id,
216+
policy_description=policy.description,
217+
policy_name=policy_name,
218+
spark_version=spark_version,
219+
success=1,
220+
failures="[]",
221+
creator=creator_name,
222+
)
223+
if len(failures) > 0:
224+
policy_info.success = 0
225+
policy_info.failures = json.dumps(failures)
226+
yield policy_info
227+
228+
def snapshot(self) -> Iterable[PolicyInfo]:
229+
return self._snapshot(self._try_fetch, self._crawl)
230+
231+
def _try_fetch(self) -> Iterable[PolicyInfo]:
232+
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
233+
yield PolicyInfo(*row)

src/databricks/labs/ucx/install.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656

5757
from databricks.labs.ucx.__about__ import __version__
5858
from databricks.labs.ucx.assessment.azure import AzureServicePrincipalInfo
59-
from databricks.labs.ucx.assessment.clusters import ClusterInfo
59+
from databricks.labs.ucx.assessment.clusters import ClusterInfo, PolicyInfo
6060
from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptInfo
6161
from databricks.labs.ucx.assessment.jobs import JobInfo, SubmitRunInfo
6262
from databricks.labs.ucx.assessment.pipelines import PipelineInfo
@@ -160,6 +160,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
160160
functools.partial(table, "workspace_objects", WorkspaceObjectInfo),
161161
functools.partial(table, "permissions", Permissions),
162162
functools.partial(table, "submit_runs", SubmitRunInfo),
163+
functools.partial(table, "policies", PolicyInfo),
163164
functools.partial(table, "migration_status", MigrationStatus),
164165
],
165166
)

src/databricks/labs/ucx/mixins/fixtures.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,9 @@ def create(*, name: str | None = None, **kwargs):
647647
name = f"sdk-{make_random(4)}"
648648
if "definition" not in kwargs:
649649
kwargs["definition"] = json.dumps(
650-
{"spark_conf.spark.databricks.delta.preview.enabled": {"type": "fixed", "value": "true"}}
650+
{
651+
"spark_conf.spark.databricks.delta.preview.enabled": {"type": "fixed", "value": "true"},
652+
}
651653
)
652654
cluster_policy = ws.cluster_policies.create(name, **kwargs)
653655
logger.info(
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- viz type=table, name=Cluster Policies, columns=policy_name,cluster_dbr_version,policy_spark_version
2+
-- widget title=Cluster Policies, row=8, col=0, size_x=3, size_y=8
3+
SELECT
4+
distinct policy_name,
5+
cluster.spark_version as cluster_dbr_version,
6+
policy.spark_version as policy_spark_version
7+
FROM $inventory.clusters as cluster
8+
JOIN $inventory.policies as policy
9+
ON cluster.policy_id=policy.policy_id

src/databricks/labs/ucx/runtime.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from databricks.sdk import WorkspaceClient
77

88
from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler
9-
from databricks.labs.ucx.assessment.clusters import ClustersCrawler
9+
from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler
1010
from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptCrawler
1111
from databricks.labs.ucx.assessment.jobs import JobsCrawler, SubmitRunsCrawler
1212
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
@@ -154,6 +154,19 @@ def assess_incompatible_submit_runs(cfg: WorkspaceConfig, ws: WorkspaceClient, s
154154
crawler.snapshot()
155155

156156

157+
@task("assessment")
158+
def crawl_cluster_policies(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
159+
"""This module scans through all the Cluster Policies and get the necessary information
160+
161+
It looks for:
162+
- Clusters Policies with Databricks Runtime (DBR) version earlier than 11.3
163+
164+
Subsequently, a list of all the policies with matching configurations are stored in the
165+
`$inventory.policies` table."""
166+
crawler = PoliciesCrawler(ws, sql_backend, cfg.inventory_database)
167+
crawler.snapshot()
168+
169+
157170
@task("assessment", cloud="azure")
158171
def assess_azure_service_principals(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
159172
"""This module scans through all the clusters configurations, cluster policies, job cluster configurations,
@@ -237,6 +250,7 @@ def crawl_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBack
237250
assess_jobs,
238251
assess_incompatible_submit_runs,
239252
assess_clusters,
253+
crawl_cluster_policies,
240254
assess_azure_service_principals,
241255
assess_pipelines,
242256
assess_global_init_scripts,
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# pylint: disable=invalid-name
2+
3+
import logging
4+
5+
from databricks.labs.blueprint.installation import Installation
6+
from databricks.sdk import WorkspaceClient
7+
8+
from databricks.labs.ucx.config import WorkspaceConfig
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
def upgrade(installation: Installation, ws: WorkspaceClient):
14+
config = installation.load(WorkspaceConfig)
15+
warehouse_id = str(config.warehouse_id)
16+
sql = f"ALTER TABLE hive_metastore.{config.inventory_database}.clusters ADD COLUMNS(policy_id string,spark_version string)"
17+
ws.statement_execution.execute_statement(sql, warehouse_id=warehouse_id)

tests/integration/assessment/test_clusters.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
import json
12
from datetime import timedelta
23

34
from databricks.sdk.errors import NotFound
45
from databricks.sdk.retries import retried
56
from databricks.sdk.service.compute import DataSecurityMode
67

7-
from databricks.labs.ucx.assessment.clusters import ClustersCrawler
8+
from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler
89

910
from .test_assessment import _SPARK_CONF
1011

@@ -36,3 +37,31 @@ def test_cluster_crawler_no_isolation(ws, make_cluster, inventory_schema, sql_ba
3637

3738
assert len(results) == 1
3839
assert results[0].failures == '["No isolation shared clusters not supported in UC"]'
40+
41+
42+
@retried(on=[NotFound], timeout=timedelta(minutes=6))
43+
def test_policy_crawler(ws, make_cluster_policy, inventory_schema, sql_backend):
44+
created_policy = make_cluster_policy(
45+
name="test_policy_check",
46+
definition=json.dumps({"spark_version": {'type': 'fixed', 'value': '14.3.x-scala2.12'}}),
47+
)
48+
policy_definition = {
49+
"spark_version": {'type': 'fixed', 'value': '14.3.x-scala2.12'},
50+
"spark_conf.fs.azure.account.auth.type": {"type": "fixed", "value": "OAuth", "hidden": True},
51+
}
52+
created_policy_2 = make_cluster_policy(name="test_policy_check2", definition=json.dumps(policy_definition))
53+
policy_crawler = PoliciesCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema)
54+
policies = policy_crawler.snapshot()
55+
results = []
56+
for policy in policies:
57+
if policy.policy_id in (created_policy.policy_id, created_policy_2.policy_id):
58+
results.append(policy)
59+
60+
assert results[0].policy_name == "test_policy_check"
61+
assert results[0].success == 1
62+
assert results[0].failures == "[]"
63+
assert results[0].spark_version == json.dumps({'type': 'fixed', 'value': '14.3.x-scala2.12'})
64+
65+
assert results[1].policy_name == "test_policy_check2"
66+
assert results[1].success == 0
67+
assert results[1].failures == '["Uses azure service principal credentials config in policy."]'

tests/unit/assessment/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def _load_fixture(filename: str):
4040
BaseRun: '../assessment/jobruns',
4141
ClusterDetails: '../assessment/clusters',
4242
PipelineStateInfo: '../assessment/pipelines',
43+
Policy: '../assessment/policies',
4344
}
4445

4546

@@ -77,11 +78,13 @@ def workspace_client_mock(
7778
pipeline_ids: list[str] | None = None,
7879
job_ids: list[str] | None = None,
7980
jobruns_ids: list[str] | None = None,
81+
policy_ids: list[str] | None = None,
8082
warehouse_config="single-config.json",
8183
secret_exists=True,
8284
):
8385
ws = create_autospec(WorkspaceClient)
8486
ws.clusters.list.return_value = _id_list(ClusterDetails, cluster_ids)
87+
ws.cluster_policies.list.return_value = _id_list(Policy, policy_ids)
8588
ws.cluster_policies.get = _cluster_policy
8689
ws.pipelines.list_pipelines.return_value = _id_list(PipelineStateInfo, pipeline_ids)
8790
ws.pipelines.get = _pipeline
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"policy_id": "single-user-with-spn-no-sparkversion",
3+
"name": "test_policy",
4+
"definition": "{\"azure_attributes.availability\": {\"type\": \"fixed\", \"value\": \"ON_DEMAND_AZURE\", \"hidden\": true}}",
5+
"policy_family_definition_overrides":{
6+
}
7+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"policy_id": "single-user-with-spn-policyid",
3+
"definition": "{\"spark_version\": {\"type\": \"unlimited\",\"defaultValue\": \"auto:latest-ml\"},\"spark_conf.fs.azure.account.auth.type\": {\"type\": \"fixed\", \"value\": \"OAuth\", \"hidden\": true}}",
4+
"policy_family_definition_overrides": {
5+
"spark_conf.fs.azure.account.auth.type": {
6+
"type": "fixed",
7+
"value": "OAuth",
8+
"hidden": true
9+
},
10+
"not.a.type": {
11+
"type": "fixed",
12+
"value": "not.a.matching.type",
13+
"hidden": true
14+
},
15+
"not.a.matching.type": {
16+
"type": "fixed",
17+
"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",
18+
"hidden": true
19+
}
20+
},
21+
"name": "test_policy",
22+
"description": "test",
23+
"creator_user_name": "test_creator"
24+
}

0 commit comments

Comments
 (0)