Skip to content

Commit a5feef4

Browse files
authored
Support linting python wheel tasks (#1821)
1 parent 02d03d5 commit a5feef4

File tree

5 files changed

+164
-18
lines changed

5 files changed

+164
-18
lines changed

src/databricks/labs/ucx/source_code/jobs.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import tempfile
44
from collections.abc import Iterable
55
from dataclasses import dataclass
6+
from importlib import metadata
67
from pathlib import Path
78

89
from databricks.labs.blueprint.parallel import Threads
@@ -139,11 +140,37 @@ def _register_spark_python_task(self, graph: DependencyGraph):
139140
path = WorkspacePath(self._ws, notebook_path)
140141
return graph.register_notebook(path)
141142

142-
def _register_python_wheel_task(self, graph: DependencyGraph): # pylint: disable=unused-argument
143+
@staticmethod
144+
def _find_first_matching_distribution(path_lookup: PathLookup, name: str) -> metadata.Distribution | None:
145+
# Prepared exists in importlib.metadata.__init__pyi, but is not defined in importlib.metadata.__init__.py
146+
normalize_name = metadata.Prepared.normalize # type: ignore
147+
normalized_name = normalize_name(name)
148+
for library_root in path_lookup.library_roots:
149+
for path in library_root.glob("*.dist-info"):
150+
distribution = metadata.Distribution.at(path)
151+
if normalize_name(distribution.name) == normalized_name:
152+
return distribution
153+
return None
154+
155+
def _register_python_wheel_task(self, graph: DependencyGraph) -> Iterable[DependencyProblem]:
143156
if not self._task.python_wheel_task:
144-
return
145-
# TODO: https://github.com/databrickslabs/ucx/issues/1640
146-
yield DependencyProblem('not-yet-implemented', 'Python wheel task is not yet implemented')
157+
return []
158+
159+
distribution_name = self._task.python_wheel_task.package_name
160+
distribution = self._find_first_matching_distribution(graph.path_lookup, distribution_name)
161+
if distribution is None:
162+
return [DependencyProblem("distribution-not-found", f"Could not find distribution for {distribution_name}")]
163+
entry_point_name = self._task.python_wheel_task.entry_point
164+
try:
165+
entry_point = distribution.entry_points[entry_point_name]
166+
except KeyError:
167+
return [
168+
DependencyProblem(
169+
"distribution-entry-point-not-found",
170+
f"Could not find distribution entry point for {distribution_name}.{entry_point_name}",
171+
)
172+
]
173+
return graph.register_import(entry_point.module)
147174

148175
def _register_spark_jar_task(self, graph: DependencyGraph): # pylint: disable=unused-argument
149176
if not self._task.spark_jar_task:

src/databricks/labs/ucx/source_code/path_lookup.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,15 @@ def remove_path(self, index: int):
9696
@property
9797
def library_roots(self) -> list[Path]:
9898
# we may return a combination of WorkspacePath and PosixPath here
99-
return [self._cwd] + self._sys_paths
99+
library_roots = []
100+
for library_root in [self._cwd] + self._sys_paths:
101+
try:
102+
is_existing_directory = library_root.exists() and library_root.is_dir()
103+
except PermissionError:
104+
continue
105+
if is_existing_directory:
106+
library_roots.append(library_root)
107+
return library_roots
100108

101109
@property
102110
def cwd(self):

tests/integration/source_code/test_jobs.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,3 +341,38 @@ def test_job_spark_python_task_linter_unhappy_path(
341341

342342
problems = simple_ctx.workflow_linter.lint_job(j.job_id)
343343
assert len([problem for problem in problems if problem.message == "Could not locate import: greenlet"]) == 1
344+
345+
346+
def test_workflow_linter_lints_python_wheel_task(simple_ctx, ws, make_job, make_random):
347+
whitelist = create_autospec(Whitelist) # databricks is in default list
348+
whitelist.module_compatibility.return_value = UNKNOWN
349+
whitelist.distribution_compatibility.return_value = UNKNOWN
350+
351+
simple_ctx = simple_ctx.replace(
352+
whitelist=whitelist,
353+
path_lookup=PathLookup(Path("/non/existing/path"), []), # Avoid finding current project
354+
)
355+
356+
simple_ctx.workspace_installation.run() # Creates ucx wheel
357+
wheels = [file for file in simple_ctx.installation.files() if file.path.endswith(".whl")]
358+
library = compute.Library(whl=wheels[0].path)
359+
360+
python_wheel_task = jobs.PythonWheelTask("databricks_labs_ucx", "runtime")
361+
task = jobs.Task(
362+
task_key=make_random(4),
363+
python_wheel_task=python_wheel_task,
364+
new_cluster=compute.ClusterSpec(
365+
num_workers=1,
366+
node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
367+
spark_version=ws.clusters.select_spark_version(latest=True),
368+
),
369+
timeout_seconds=0,
370+
libraries=[library],
371+
)
372+
job_with_ucx_library = make_job(tasks=[task])
373+
374+
problems = simple_ctx.workflow_linter.lint_job(job_with_ucx_library.job_id)
375+
376+
assert len([problem for problem in problems if problem.code == "library-dist-info-not-found"]) == 0
377+
assert len([problem for problem in problems if problem.code == "library-entrypoint-not-found"]) == 0
378+
whitelist.distribution_compatibility.assert_called_once_with(Path(wheels[0].path).name)

tests/unit/source_code/test_jobs.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,58 @@ def test_workflow_task_container_builds_dependency_graph_with_known_egg_library(
257257
assert len(problems) == 0
258258
assert graph.path_lookup.resolve(Path("thingy")) is not None
259259
ws.workspace.download.assert_called_once_with(egg_file.as_posix(), format=ExportFormat.AUTO)
260+
261+
262+
def test_workflow_task_container_builds_dependency_graph_with_missing_distribution_in_python_wheel_task(
263+
mock_path_lookup,
264+
graph,
265+
):
266+
ws = create_autospec(WorkspaceClient)
267+
python_wheel_task = jobs.PythonWheelTask(package_name="databricks_labs_ucx", entry_point="runtime")
268+
task = jobs.Task(task_key="test", python_wheel_task=python_wheel_task)
269+
270+
workflow_task_container = WorkflowTaskContainer(ws, task)
271+
problems = workflow_task_container.build_dependency_graph(graph)
272+
273+
assert len(problems) == 1
274+
assert problems[0].code == "distribution-not-found"
275+
assert problems[0].message == "Could not find distribution for databricks_labs_ucx"
276+
ws.assert_not_called()
277+
278+
279+
def test_workflow_task_container_builds_dependency_graph_with_missing_entrypoint_in_python_wheel_task(graph):
280+
ws = create_autospec(WorkspaceClient)
281+
282+
whl_file = Path(__file__).parent / "samples/distribution/dist/thingy-0.0.1-py2.py3-none-any.whl"
283+
with whl_file.open("rb") as f:
284+
ws.workspace.download.return_value = io.BytesIO(f.read())
285+
286+
python_wheel_task = jobs.PythonWheelTask(package_name="thingy", entry_point="non_existing_entrypoint")
287+
libraries = [compute.Library(whl=whl_file.as_posix())]
288+
task = jobs.Task(task_key="test", libraries=libraries, python_wheel_task=python_wheel_task)
289+
290+
workflow_task_container = WorkflowTaskContainer(ws, task)
291+
problems = workflow_task_container.build_dependency_graph(graph)
292+
293+
assert len(problems) == 1
294+
assert problems[0].code == "distribution-entry-point-not-found"
295+
assert problems[0].message == "Could not find distribution entry point for thingy.non_existing_entrypoint"
296+
ws.workspace.download.assert_called_once_with(whl_file.as_posix(), format=ExportFormat.AUTO)
297+
298+
299+
def test_workflow_task_container_builds_dependency_graph_for_python_wheel_task(graph):
300+
ws = create_autospec(WorkspaceClient)
301+
302+
whl_file = Path(__file__).parent / "samples/distribution/dist/thingy-0.0.1-py2.py3-none-any.whl"
303+
with whl_file.open("rb") as f:
304+
ws.workspace.download.return_value = io.BytesIO(f.read())
305+
306+
python_wheel_task = jobs.PythonWheelTask(package_name="thingy", entry_point="runtime")
307+
libraries = [compute.Library(whl=whl_file.as_posix())]
308+
task = jobs.Task(task_key="test", libraries=libraries, python_wheel_task=python_wheel_task)
309+
310+
workflow_task_container = WorkflowTaskContainer(ws, task)
311+
problems = workflow_task_container.build_dependency_graph(graph)
312+
313+
assert len(problems) == 0
314+
ws.workspace.download.assert_called_once_with(whl_file.as_posix(), format=ExportFormat.AUTO)

tests/unit/source_code/test_path_lookup.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,43 @@ def test_lookup_is_initialized_with_syspath():
1111
assert len(filtered) > 0
1212

1313

14-
def test_lookup_is_initialized_with_handmade_string():
15-
provider = PathLookup.from_pathlike_string(Path.cwd(), "what:on:earth")
14+
def test_lookup_is_initialized_with_handmade_string(tmp_path):
15+
directories, sys_paths = ("what", "on", "earth"), []
16+
for directory in directories:
17+
path = tmp_path / directory
18+
path.mkdir()
19+
sys_paths.append(path)
20+
21+
provider = PathLookup.from_pathlike_string(Path.cwd(), ":".join([p.as_posix() for p in sys_paths]))
22+
1623
assert provider is not None
17-
paths = provider.library_roots[1:]
18-
assert paths == [Path("what"), Path("on"), Path("earth")]
24+
assert provider.library_roots[1:] == sys_paths
25+
26+
27+
def test_lookup_inserts_path(tmp_path):
28+
directories, sys_paths = ("what", "on", "earth"), []
29+
for directory in directories:
30+
path = tmp_path / directory
31+
path.mkdir()
32+
sys_paths.append(path)
33+
34+
provider = PathLookup.from_pathlike_string(Path.cwd(), ":".join([p.as_posix() for p in sys_paths]))
35+
36+
new_sys_path = tmp_path / Path("is")
37+
new_sys_path.mkdir()
38+
provider.insert_path(1, new_sys_path)
1939

40+
assert provider.library_roots[1:] == [sys_paths[0]] + [new_sys_path] + sys_paths[1:]
2041

21-
def test_lookup_inserts_path():
22-
provider = PathLookup.from_pathlike_string(Path.cwd(), "what:on:earth")
23-
provider.insert_path(1, Path("is"))
24-
paths = provider.library_roots[1:]
25-
assert paths == [Path("what"), Path("is"), Path("on"), Path("earth")]
2642

43+
def test_lookup_removes_path(tmp_path):
44+
directories, sys_paths = ("what", "is", "on", "earth"), []
45+
for directory in directories:
46+
path = tmp_path / directory
47+
path.mkdir()
48+
sys_paths.append(path)
2749

28-
def test_lookup_removes_path():
29-
provider = PathLookup.from_pathlike_string(Path.cwd(), "what:is:on:earth")
50+
provider = PathLookup.from_pathlike_string(Path.cwd(), ":".join([p.as_posix() for p in sys_paths]))
3051
provider.remove_path(1)
31-
paths = provider.library_roots[1:]
32-
assert paths == [Path("what"), Path("on"), Path("earth")]
52+
sys_paths.pop(1)
53+
assert provider.library_roots[1:] == sys_paths

0 commit comments

Comments
 (0)