Skip to content

Commit fd8772b

Browse files
authored
Refactor workspace_client_mock to have combine fixtures stored in separate JSON files (#955)
This PR reduces code duplication and removes private api use from JobsCrawler
1 parent 05851e6 commit fd8772b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+528
-1282
lines changed

src/databricks/labs/ucx/assessment/jobs.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,24 @@ def _crawl(self) -> Iterable[JobInfo]:
5757
return self._assess_jobs(all_jobs, all_clusters)
5858

5959
def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[JobInfo]:
60+
job_assessment, job_details = self._prepare(all_jobs)
61+
for job, cluster_config in self._get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
62+
job_id = job.job_id
63+
if not job_id:
64+
continue
65+
cluster_details = ClusterDetails.from_dict(cluster_config.as_dict())
66+
cluster_failures = self.check_cluster_failures(cluster_details, "Job cluster")
67+
job_assessment[job_id].update(cluster_failures)
68+
69+
# TODO: next person looking at this - rewrite, as this code makes no sense
70+
for job_key in job_details.keys(): # pylint: disable=consider-using-dict-items,consider-iterating-dictionary
71+
job_details[job_key].failures = json.dumps(list(job_assessment[job_key]))
72+
if len(job_assessment[job_key]) > 0:
73+
job_details[job_key].success = 0
74+
return list(job_details.values())
75+
76+
@staticmethod
77+
def _prepare(all_jobs):
6078
job_assessment: dict[int, set[str]] = {}
6179
job_details: dict[int, JobInfo] = {}
6280
for job in all_jobs:
@@ -82,21 +100,7 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
82100
success=1,
83101
failures="[]",
84102
)
85-
86-
for job, cluster_config in self._get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
87-
job_id = job.job_id
88-
if not job_id:
89-
continue
90-
cluster_details = ClusterDetails.from_dict(cluster_config.as_dict())
91-
cluster_failures = self.check_cluster_failures(cluster_details, "Job cluster")
92-
job_assessment[job_id].update(cluster_failures)
93-
94-
# TODO: next person looking at this - rewrite, as this code makes no sense
95-
for job_key in job_details.keys(): # pylint: disable=consider-using-dict-items,consider-iterating-dictionary
96-
job_details[job_key].failures = json.dumps(list(job_assessment[job_key]))
97-
if len(job_assessment[job_key]) > 0:
98-
job_details[job_key].success = 0
99-
return list(job_details.values())
103+
return job_assessment, job_details
100104

101105
def snapshot(self) -> Iterable[JobInfo]:
102106
return self._snapshot(self._try_fetch, self._crawl)

tests/unit/assessment/__init__.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515

1616

1717
def _base64(filename: str):
18-
with (__dir / filename).open("rb") as f:
19-
return base64.b64encode(f.read())
18+
try:
19+
with (__dir / filename).open("rb") as f:
20+
return base64.b64encode(f.read())
21+
except FileNotFoundError as err:
22+
raise NotFound(filename) from err
2023

2124

2225
def _workspace_export(filename: str):
@@ -25,12 +28,30 @@ def _workspace_export(filename: str):
2528

2629

2730
def _load_fixture(filename: str):
28-
with (__dir / filename).open("r") as f:
29-
return json.load(f)
31+
try:
32+
with (__dir / filename).open("r") as f:
33+
return json.load(f)
34+
except FileNotFoundError as err:
35+
raise NotFound(filename) from err
3036

3137

32-
def _load_list(cls: type, filename: str):
33-
return [cls.from_dict(_) for _ in _load_fixture(filename)] # type: ignore[attr-defined]
38+
_FOLDERS = {
39+
BaseJob: '../assessment/jobs',
40+
ClusterDetails: '../assessment/clusters',
41+
PipelineStateInfo: '../assessment/pipelines',
42+
}
43+
44+
45+
def _load_list(cls: type, filename: str, ids=None):
46+
if not ids: # TODO: remove
47+
return [cls.from_dict(_) for _ in _load_fixture(filename)] # type: ignore[attr-defined]
48+
return _id_list(cls, ids)
49+
50+
51+
def _id_list(cls: type, ids=None):
52+
if not ids:
53+
return []
54+
return [cls.from_dict(_load_fixture(f'{_FOLDERS[cls]}/{_}.json')) for _ in ids] # type: ignore[attr-defined]
3455

3556

3657
def _cluster_policy(policy_id: str):
@@ -51,18 +72,18 @@ def _secret_not_found(secret_scope, _):
5172

5273

5374
def workspace_client_mock(
54-
clusters="no-spark-conf.json",
55-
pipelines="single-pipeline.json",
56-
jobs="single-job.json",
75+
cluster_ids: list[str] | None = None,
76+
pipeline_ids: list[str] | None = None,
77+
job_ids: list[str] | None = None,
5778
warehouse_config="single-config.json",
5879
secret_exists=True,
5980
):
6081
ws = create_autospec(WorkspaceClient)
61-
ws.clusters.list.return_value = _load_list(ClusterDetails, f"../assessment/clusters/{clusters}")
82+
ws.clusters.list.return_value = _id_list(ClusterDetails, cluster_ids)
6283
ws.cluster_policies.get = _cluster_policy
63-
ws.pipelines.list_pipelines.return_value = _load_list(PipelineStateInfo, f"../assessment/pipelines/{pipelines}")
84+
ws.pipelines.list_pipelines.return_value = _id_list(PipelineStateInfo, pipeline_ids)
6485
ws.pipelines.get = _pipeline
65-
ws.jobs.list.return_value = _load_list(BaseJob, f"../assessment/jobs/{jobs}")
86+
ws.jobs.list.return_value = _id_list(BaseJob, job_ids)
6687
ws.warehouses.get_workspace_warehouse_config().data_access_config = _load_list(
6788
EndpointConfPair, f"../assessment/warehouses/{warehouse_config}"
6889
)

tests/unit/assessment/clusters/assortment-conf.json

Lines changed: 0 additions & 64 deletions
This file was deleted.

tests/unit/assessment/clusters/assortment-spn.json

Lines changed: 0 additions & 29 deletions
This file was deleted.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"autoscale": {
3+
"min_workers": 1,
4+
"max_workers": 6
5+
},
6+
"cluster_id": "azure-spn-secret",
7+
"cluster_name": "Azure SPN Secret",
8+
"cluster_source": "UI",
9+
"spark_conf": {
10+
"spark.hadoop.fs.azure.account.oauth2.client.id.abcde.dfs.core.windows.net": "{{secrets/abcff/sp_app_client_id}}",
11+
"spark.hadoop.fs.azure.account.oauth2.client.endpoint.abcde.dfs.core.windows.net": "https://login.microsoftonline.com/dedededede/oauth2/token",
12+
"spark.hadoop.fs.azure.account.oauth2.client.secret.abcde.dfs.core.windows.net": "{{secrets/abcff/sp_secret}}"
13+
},
14+
"spark_context_id": 5134472582179565315,
15+
"spark_version": "13.3.x-cpu-ml-scala2.12"
16+
}

tests/unit/assessment/clusters/dbfs-init-scripts.json

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"autoscale": {
3+
"max_workers": 6,
4+
"min_workers": 1
5+
},
6+
"cluster_id": "01234-11223344-1122334455",
7+
"cluster_name": "UCX Cluster",
8+
"policy_id": "single-user-with-spn",
9+
"spark_version": "13.3.x-cpu-ml-scala2.12",
10+
"init_scripts": [
11+
{
12+
"dbfs": {
13+
"destination": ":/users/[email protected]/init_scripts/test.sh"
14+
}
15+
},
16+
{
17+
"dbfs": {
18+
"destination": "dbfs"
19+
}
20+
},
21+
{
22+
"dbfs": {
23+
"destination": "dbfs:"
24+
}
25+
},
26+
{
27+
"workspace": {
28+
"destination": "init.sh"
29+
}
30+
}
31+
]
32+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"autoscale": {
3+
"max_workers": 6,
4+
"min_workers": 1
5+
},
6+
"cluster_source": "JOB",
7+
"creator_user_name":"[email protected]",
8+
"cluster_id": "0123-190044-1122334422",
9+
"cluster_name": "Single User Cluster Name",
10+
"policy_id": "single-user-with-spn",
11+
"spark_version": "9.3.x-cpu-ml-scala2.12",
12+
"spark_conf" : {
13+
"spark.databricks.delta.preview.enabled": "true"
14+
},
15+
"spark_context_id":"5134472582179565315"
16+
}

tests/unit/assessment/clusters/job-source-cluster.json

Lines changed: 0 additions & 33 deletions
This file was deleted.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"cluster_name": "Passthrough cluster",
3+
"spark_version": "12.3.x-cpu-ml-scala2.12",
4+
"data_security_mode": "LEGACY_PASSTHROUGH"
5+
}

0 commit comments

Comments
 (0)