Skip to content

Commit 369893a

Browse files
authored
Writing log readme in multiprocess safe way (#794)
## Changes 1. Changed writing of the readme log in a multiprocess safe way. Without this the assessment tasks are occasionally failing with error: `FileExistsError: [Errno 17] File exists: '/Workspace/Users/[email protected]/.ucx/logs/assessment/run-340934288146817/README.md'` 2. Skip long running integration test for Redash 3. Check if job settings and tasks are present while crawling ### Tests - [x] manually tested
1 parent b249e86 commit 369893a

File tree

5 files changed

+186
-43
lines changed

5 files changed

+186
-43
lines changed

src/databricks/labs/ucx/assessment/azure.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -118,21 +118,23 @@ def _get_azure_spn_list(self, config: dict) -> list:
118118

119119
def _get_cluster_configs_from_all_jobs(self, all_jobs, all_clusters_by_id):
120120
for j in all_jobs:
121-
if j.settings.job_clusters is not None:
122-
for jc in j.settings.job_clusters:
123-
if jc.new_cluster is None:
124-
continue
125-
yield j, jc.new_cluster
126-
127-
for t in j.settings.tasks:
128-
if t.existing_cluster_id is not None:
129-
interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None)
130-
if interactive_cluster is None:
131-
continue
132-
yield j, interactive_cluster
133-
134-
elif t.new_cluster is not None:
135-
yield j, t.new_cluster
121+
if j.settings is not None:
122+
if j.settings.job_clusters is not None:
123+
for jc in j.settings.job_clusters:
124+
if jc.new_cluster is None:
125+
continue
126+
yield j, jc.new_cluster
127+
128+
if j.settings.tasks is not None:
129+
for t in j.settings.tasks:
130+
if t.existing_cluster_id is not None:
131+
interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None)
132+
if interactive_cluster is None:
133+
continue
134+
yield j, interactive_cluster
135+
136+
elif t.new_cluster is not None:
137+
yield j, t.new_cluster
136138

137139
def _get_relevant_service_principals(self) -> list:
138140
relevant_service_principals = []

src/databricks/labs/ucx/assessment/jobs.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,23 @@ def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
3636
@staticmethod
3737
def _get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
3838
for j in all_jobs:
39-
if j.settings.job_clusters is not None:
40-
for jc in j.settings.job_clusters:
41-
if jc.new_cluster is None:
42-
continue
43-
yield j, jc.new_cluster
44-
45-
for t in j.settings.tasks:
46-
if t.existing_cluster_id is not None:
47-
interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None)
48-
if interactive_cluster is None:
49-
continue
50-
yield j, interactive_cluster
51-
52-
elif t.new_cluster is not None:
53-
yield j, t.new_cluster
39+
if j.settings is not None:
40+
if j.settings.job_clusters is not None:
41+
for jc in j.settings.job_clusters:
42+
if jc.new_cluster is None:
43+
continue
44+
yield j, jc.new_cluster
45+
46+
if j.settings.tasks is not None:
47+
for t in j.settings.tasks:
48+
if t.existing_cluster_id is not None:
49+
interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None)
50+
if interactive_cluster is None:
51+
continue
52+
yield j, interactive_cluster
53+
54+
elif t.new_cluster is not None:
55+
yield j, t.new_cluster
5456

5557
def _crawl(self) -> Iterable[JobInfo]:
5658
all_jobs = list(self._ws.jobs.list(expand_tasks=True))
@@ -71,17 +73,17 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
7173
)
7274

7375
job_settings = job.settings
74-
assert job_settings is not None
75-
job_name = job_settings.name
76-
if not job_name:
77-
job_name = "Unknown"
78-
job_details[job.job_id] = JobInfo(
79-
job_id=str(job.job_id),
80-
job_name=job_name,
81-
creator=job.creator_user_name,
82-
success=1,
83-
failures="[]",
84-
)
76+
if job_settings is not None:
77+
job_name = job_settings.name
78+
if not job_name:
79+
job_name = "Unknown"
80+
job_details[job.job_id] = JobInfo(
81+
job_id=str(job.job_id),
82+
job_name=job_name,
83+
creator=job.creator_user_name,
84+
success=1,
85+
failures="[]",
86+
)
8587

8688
for job, cluster_config in self._get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
8789
support_status = spark_version_compatibility(cluster_config.spark_version)

src/databricks/labs/ucx/framework/tasks.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import logging
2+
import os
23
from collections.abc import Callable
4+
from contextlib import contextmanager
35
from dataclasses import dataclass
6+
from datetime import timedelta
47
from functools import wraps
58
from logging.handlers import TimedRotatingFileHandler
69
from pathlib import Path
710

811
from databricks.labs.blueprint.logger import install_logger
912
from databricks.sdk.core import Config
13+
from databricks.sdk.retries import retried
1014

1115
from databricks.labs.ucx.__about__ import __version__
1216
from databricks.labs.ucx.config import WorkspaceConfig
@@ -116,6 +120,36 @@ def wrapper(*args, **kwargs):
116120
return decorator
117121

118122

123+
@retried(on=[FileExistsError], timeout=timedelta(seconds=5))
124+
def _create_lock(lockfile_name):
125+
while True: # wait until the lock file can be opened
126+
f = os.open(lockfile_name, os.O_CREAT | os.O_EXCL)
127+
break
128+
return f
129+
130+
131+
@contextmanager
132+
def _exclusive_open(filename: str, *args, **kwargs):
133+
"""Open a file with exclusive access across multiple processes.
134+
Requires write access to the directory containing the file.
135+
136+
Arguments are the same as the built-in open.
137+
138+
Returns a context manager that closes the file and releases the lock.
139+
"""
140+
lockfile_name = filename + ".lock"
141+
lockfile = _create_lock(lockfile_name)
142+
143+
try:
144+
with open(filename, *args, **kwargs) as f:
145+
yield f
146+
finally:
147+
try:
148+
os.close(lockfile)
149+
finally:
150+
os.unlink(lockfile_name)
151+
152+
119153
def trigger(*argv):
120154
args = dict(a[2:].split("=") for a in argv if "--" == a[0:2])
121155
if "config" not in args:
@@ -168,8 +202,8 @@ def trigger(*argv):
168202

169203
log_readme = log_path.joinpath("README.md")
170204
if not log_readme.exists():
171-
# this may race when run from multiple tasks, but let's accept the risk for now.
172-
with log_readme.open(mode="w") as f:
205+
# this may race when run from multiple tasks, therefore it must be multiprocess safe
206+
with _exclusive_open(str(log_readme), mode="w") as f:
173207
f.write(f"# Logs for the UCX {current_task.workflow} workflow\n")
174208
f.write("This folder contains UCX log files.\n\n")
175209
f.write(f"See the [{current_task.workflow} job](/#job/{job_id}) and ")

tests/integration/workspace_access/test_redash.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
from datetime import timedelta
3+
from unittest import skip
34

45
from databricks.sdk.errors import NotFound
56
from databricks.sdk.retries import retried
@@ -60,6 +61,7 @@ def test_permissions_for_redash(
6061
# Redash group permissions are cached for up to 10 mins. If a group is renamed, redash permissions api returns
6162
# the old name for some time. Therefore, we need to allow at least 10 mins in the timeout for checking the permissions
6263
# after group rename.
64+
@skip # skipping as it takes 5-10 mins to execute
6365
@retried(on=[NotFound], timeout=timedelta(minutes=13))
6466
def test_permissions_for_redash_after_group_is_renamed(
6567
ws,

tests/unit/assessment/test_jobs.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,68 @@ def test_job_assessment():
100100
assert result_set[1].success == 0
101101

102102

103+
def test_job_assessment_no_job_tasks():
104+
sample_jobs = [
105+
BaseJob(
106+
created_time=1694536604319,
107+
creator_user_name="[email protected]",
108+
job_id=536591785949415,
109+
settings=JobSettings(
110+
compute=None,
111+
continuous=None,
112+
tasks=None,
113+
timeout_seconds=0,
114+
),
115+
),
116+
]
117+
118+
sample_clusters = [
119+
ClusterDetails(
120+
autoscale=AutoScale(min_workers=1, max_workers=6),
121+
spark_conf={"spark.databricks.delta.preview.enabled": "true"},
122+
spark_context_id=5134472582179566666,
123+
spark_env_vars=None,
124+
spark_version="13.3.x-cpu-ml-scala2.12",
125+
cluster_id="0810-229933-chicago99",
126+
cluster_source=ClusterSource.JOB,
127+
),
128+
]
129+
ws = Mock()
130+
result_set = JobsCrawler(ws, MockBackend(), "ucx")._assess_jobs(
131+
sample_jobs, {c.cluster_id: c for c in sample_clusters}
132+
)
133+
assert len(result_set) == 1
134+
assert result_set[0].success == 1
135+
136+
137+
def test_job_assessment_no_job_settings():
138+
sample_jobs = [
139+
BaseJob(
140+
created_time=1694536604319,
141+
creator_user_name="[email protected]",
142+
job_id=536591785949415,
143+
settings=None,
144+
),
145+
]
146+
147+
sample_clusters = [
148+
ClusterDetails(
149+
autoscale=AutoScale(min_workers=1, max_workers=6),
150+
spark_conf={"spark.databricks.delta.preview.enabled": "true"},
151+
spark_context_id=5134472582179566666,
152+
spark_env_vars=None,
153+
spark_version="13.3.x-cpu-ml-scala2.12",
154+
cluster_id="0810-229933-chicago99",
155+
cluster_source=ClusterSource.JOB,
156+
),
157+
]
158+
ws = Mock()
159+
result_set = JobsCrawler(ws, MockBackend(), "ucx")._assess_jobs(
160+
sample_jobs, {c.cluster_id: c for c in sample_clusters}
161+
)
162+
assert len(result_set) == 0
163+
164+
103165
def test_job_assessment_for_azure_spark_config():
104166
sample_jobs = [
105167
BaseJob(
@@ -243,6 +305,47 @@ def test_job_assessment_for_azure_spark_config():
243305
assert result_set[2].success == 0
244306

245307

308+
def test_jobs_assessment_with_spn_cluster_no_job_tasks(mocker):
309+
sample_jobs = [
310+
BaseJob(
311+
created_time=1694536604319,
312+
creator_user_name="[email protected]",
313+
job_id=536591785949415,
314+
settings=JobSettings(
315+
compute=None,
316+
continuous=None,
317+
tasks=None,
318+
timeout_seconds=0,
319+
),
320+
)
321+
]
322+
323+
ws = mocker.Mock()
324+
ws.clusters.list.return_value = []
325+
ws.jobs.list.return_value = sample_jobs
326+
327+
jobs = AzureServicePrincipalCrawler(ws, MockBackend(), "ucx")._list_all_jobs_with_spn_in_spark_conf()
328+
assert len(jobs) == 0
329+
330+
331+
def test_jobs_assessment_with_spn_cluster_no_job_settings(mocker):
332+
sample_jobs = [
333+
BaseJob(
334+
created_time=1694536604319,
335+
creator_user_name="[email protected]",
336+
job_id=536591785949415,
337+
settings=None,
338+
)
339+
]
340+
341+
ws = mocker.Mock()
342+
ws.clusters.list.return_value = []
343+
ws.jobs.list.return_value = sample_jobs
344+
345+
jobs = AzureServicePrincipalCrawler(ws, MockBackend(), "ucx")._list_all_jobs_with_spn_in_spark_conf()
346+
assert len(jobs) == 0
347+
348+
246349
def test_jobs_assessment_with_spn_cluster_policy_not_found(mocker):
247350
sample_jobs = [
248351
BaseJob(

0 commit comments

Comments
 (0)