Skip to content

Commit 06854a4

Browse files
authored
Added workflow linter for spark python tasks (#1810)
1 parent 5e7c9d8 commit 06854a4

File tree

2 files changed

+48
-3
lines changed

2 files changed

+48
-3
lines changed

src/databricks/labs/ucx/source_code/jobs.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,11 @@ def _register_notebook(self, graph: DependencyGraph) -> Iterable[DependencyProbl
133133

134134
def _register_spark_python_task(self, graph: DependencyGraph): # pylint: disable=unused-argument
135135
if not self._task.spark_python_task:
136-
return
137-
# TODO: https://github.com/databrickslabs/ucx/issues/1639
138-
yield DependencyProblem('not-yet-implemented', 'Spark Python task is not yet implemented')
136+
return []
137+
notebook_path = self._task.spark_python_task.python_file
138+
logger.info(f'Discovering {self._task.task_key} entrypoint: {notebook_path}')
139+
path = WorkspacePath(self._ws, notebook_path)
140+
return graph.register_notebook(path)
139141

140142
def _register_python_wheel_task(self, graph: DependencyGraph): # pylint: disable=unused-argument
141143
if not self._task.python_wheel_task:

tests/integration/source_code/test_jobs.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,3 +298,46 @@ def test_workflow_linter_lints_job_with_wheel_dependency(
298298
problems = simple_ctx.workflow_linter.lint_job(job_with_ucx_library.job_id)
299299

300300
assert len([problem for problem in problems if problem.message == expected_problem_message]) == 0
301+
302+
303+
def test_job_spark_python_task_linter_happy_path(
304+
simple_ctx, ws, make_job, make_random, make_cluster, make_notebook, make_directory
305+
):
306+
entrypoint = make_directory()
307+
308+
make_notebook(path=f"{entrypoint}/notebook.py", content=b"import greenlet")
309+
310+
new_cluster = make_cluster(single_node=True)
311+
task = jobs.Task(
312+
task_key=make_random(4),
313+
spark_python_task=jobs.SparkPythonTask(
314+
python_file=f"{entrypoint}/notebook.py",
315+
),
316+
existing_cluster_id=new_cluster.cluster_id,
317+
libraries=[compute.Library(pypi=compute.PythonPyPiLibrary(package="greenlet"))],
318+
)
319+
j = make_job(tasks=[task])
320+
321+
problems = simple_ctx.workflow_linter.lint_job(j.job_id)
322+
assert len([problem for problem in problems if problem.message == "Could not locate import: greenlet"]) == 0
323+
324+
325+
def test_job_spark_python_task_linter_unhappy_path(
326+
simple_ctx, ws, make_job, make_random, make_cluster, make_notebook, make_directory
327+
):
328+
entrypoint = make_directory()
329+
330+
make_notebook(path=f"{entrypoint}/notebook.py", content=b"import greenlet")
331+
332+
new_cluster = make_cluster(single_node=True)
333+
task = jobs.Task(
334+
task_key=make_random(4),
335+
spark_python_task=jobs.SparkPythonTask(
336+
python_file=f"{entrypoint}/notebook.py",
337+
),
338+
existing_cluster_id=new_cluster.cluster_id,
339+
)
340+
j = make_job(tasks=[task])
341+
342+
problems = simple_ctx.workflow_linter.lint_job(j.job_id)
343+
assert len([problem for problem in problems if problem.message == "Could not locate import: greenlet"]) == 1

0 commit comments

Comments
 (0)