1- import io
21import logging
32import shutil
43from collections .abc import Callable
3433
3534
3635@retried (on = [NotFound ], timeout = timedelta (minutes = 5 ))
37- def test_running_real_workflow_linter_job (installation_ctx , make_notebook , make_directory , make_job ):
36+ def test_running_real_workflow_linter_job (installation_ctx , make_job ) -> None :
3837 # Deprecated file system path in call to: /mnt/things/e/f/g
39- lint_problem = b"display(spark.read.csv('/mnt/things/e/f/g'))"
40- notebook = make_notebook (path = f"{ make_directory ()} /notebook.ipynb" , content = lint_problem )
41- job = make_job (notebook_path = notebook )
38+ job = make_job (content = "spark.read.table('a_table').write.csv('/mnt/things/e/f/g')\n " )
4239 ctx = installation_ctx .replace (config_transform = lambda wc : replace (wc , include_job_ids = [job .job_id ]))
4340 ctx .workspace_installation .run ()
4441 ctx .deployed_workflows .run_workflow ("experimental-workflow-linter" )
4542 ctx .deployed_workflows .validate_step ("experimental-workflow-linter" )
43+
44+ # This test merely tests that the workflows produces records of the expected types; record content is not checked.
4645 cursor = ctx .sql_backend .fetch (f"SELECT COUNT(*) AS count FROM { ctx .inventory_database } .workflow_problems" )
4746 result = next (cursor )
4847 if result ['count' ] == 0 :
4948 installation_ctx .deployed_workflows .relay_logs ("experimental-workflow-linter" )
5049 assert False , "No workflow problems found"
50+ dfsa_records = installation_ctx .directfs_access_crawler_for_paths .snapshot ()
51+ used_table_records = installation_ctx .used_tables_crawler_for_paths .snapshot ()
52+ assert dfsa_records and used_table_records
5153
5254
5355@retried (on = [NotFound ], timeout = timedelta (minutes = 2 ))
54- def test_linter_from_context (simple_ctx , make_job , make_notebook ):
55- # This code is essentially the same as in test_running_real_workflow_linter_job,
56- # but it's executed on the caller side and is easier to debug.
57- # ensure we have at least 1 job that fails
58- notebook_path = make_notebook (content = io .BytesIO (b"import xyz" ))
59- job = make_job (notebook_path = notebook_path )
56+ def test_linter_from_context (simple_ctx , make_job ) -> None :
57+ # This code is similar to test_running_real_workflow_linter_job, but it's executed on the caller side and is easier
58+ # to debug.
59+ # Ensure we have at least 1 job that fails
60+ job = make_job (content = "import xyz" )
6061 simple_ctx .config .include_job_ids = [job .job_id ]
6162 simple_ctx .workflow_linter .refresh_report (simple_ctx .sql_backend , simple_ctx .inventory_database )
6263
@@ -75,30 +76,10 @@ def test_job_linter_no_problems(simple_ctx, make_job) -> None:
7576 assert len (problems ) == 0
7677
7778
78- def test_job_task_linter_library_not_installed_cluster (
79- simple_ctx ,
80- make_job ,
81- make_random ,
82- make_cluster ,
83- make_notebook ,
84- make_directory ,
85- ) -> None :
86- created_cluster = make_cluster (single_node = True )
87- entrypoint = make_directory ()
88-
89- notebook = make_notebook (path = f"{ entrypoint } /notebook.ipynb" , content = b"import library_not_found" )
79+ def test_job_task_linter_library_not_installed_cluster (simple_ctx , make_job ) -> None :
80+ job = make_job (content = "import library_not_found\n " )
9081
91- task = jobs .Task (
92- task_key = make_random (4 ),
93- description = make_random (4 ),
94- existing_cluster_id = created_cluster .cluster_id ,
95- notebook_task = jobs .NotebookTask (
96- notebook_path = str (notebook ),
97- ),
98- )
99- j = make_job (tasks = [task ])
100-
101- problems , * _ = simple_ctx .workflow_linter .lint_job (j .job_id )
82+ problems , * _ = simple_ctx .workflow_linter .lint_job (job .job_id )
10283
10384 assert (
10485 len ([problem for problem in problems if problem .message == "Could not locate import: library_not_found" ]) == 1
@@ -112,28 +93,24 @@ def test_job_task_linter_library_installed_cluster(
11293 make_random ,
11394 make_cluster ,
11495 make_notebook ,
115- make_directory ,
11696) -> None :
11797 created_cluster = make_cluster (single_node = True )
118- libraries_api = ws .libraries
119- libraries_api .install (created_cluster .cluster_id , [Library (pypi = PythonPyPiLibrary ("greenlet" ))])
120- entrypoint = make_directory ()
98+ ws .libraries .install (created_cluster .cluster_id , [Library (pypi = PythonPyPiLibrary ("dbt-core==1.8.7" ))])
12199
122- notebook = make_notebook (path = f" { entrypoint } /notebook.ipynb" , content = b"import doesnotexist;import greenlet " )
100+ notebook = make_notebook (content = b"import doesnotexist;import dbt " )
123101
124102 task = jobs .Task (
125103 task_key = make_random (4 ),
126104 description = make_random (4 ),
127105 existing_cluster_id = created_cluster .cluster_id ,
128- notebook_task = jobs .NotebookTask (
129- notebook_path = str (notebook ),
130- ),
106+ notebook_task = jobs .NotebookTask (notebook_path = str (notebook )),
131107 )
132108 j = make_job (tasks = [task ])
133109
134110 problems , * _ = simple_ctx .workflow_linter .lint_job (j .job_id )
135111
136- assert len ([problem for problem in problems if problem .message == "Could not locate import: greenlet" ]) == 0
112+ assert next (problem for problem in problems if problem .message == "Could not locate import: doesnotexist" )
113+ assert not next ((problem for problem in problems if problem .message == "Could not locate import: dbt" ), None )
137114
138115
139116def test_job_linter_some_notebook_graph_with_problems (
@@ -202,7 +179,7 @@ def test_workflow_linter_lints_job_with_import_pypi_library(simple_ctx, make_job
202179
203180 assert len ([problem for problem in problems if problem .message == problem_message ]) == 1
204181
205- library = compute .Library (pypi = compute .PythonPyPiLibrary (package = "dbt-core" ))
182+ library = compute .Library (pypi = compute .PythonPyPiLibrary (package = "dbt-core==1.8.7 " ))
206183 job_with_library = make_job (content = content , libraries = [library ])
207184
208185 problems , * _ = simple_ctx .workflow_linter .lint_job (job_with_library .job_id )
@@ -438,41 +415,22 @@ def test_workflow_linter_lints_job_with_wheel_dependency(simple_ctx, make_job, m
438415 assert len ([problem for problem in problems if problem .message == expected_problem_message ]) == 0
439416
440417
441- def test_job_spark_python_task_linter_happy_path (
442- simple_ctx ,
443- make_job ,
444- make_random ,
445- make_cluster ,
446- make_notebook ,
447- make_directory ,
448- ) -> None :
449- entrypoint = make_directory ()
450-
451- make_notebook (path = f"{ entrypoint } /notebook.py" , content = b"import greenlet" )
418+ def test_job_spark_python_task_linter_happy_path (simple_ctx , make_job ) -> None :
419+ extra_library_for_module = compute .Library (pypi = compute .PythonPyPiLibrary (package = "dbt-core==1.8.7" ))
420+ job = make_job (content = "import dbt\n " , task_type = jobs .SparkPythonTask , libraries = [extra_library_for_module ])
452421
453- new_cluster = make_cluster (single_node = True )
454- task = jobs .Task (
455- task_key = make_random (4 ),
456- spark_python_task = jobs .SparkPythonTask (
457- python_file = f"{ entrypoint } /notebook.py" ,
458- ),
459- existing_cluster_id = new_cluster .cluster_id ,
460- libraries = [compute .Library (pypi = compute .PythonPyPiLibrary (package = "greenlet" ))],
461- )
462- j = make_job (tasks = [task ])
463-
464- problems , * _ = simple_ctx .workflow_linter .lint_job (j .job_id )
422+ problems , * _ = simple_ctx .workflow_linter .lint_job (job .job_id )
465423
466- assert len ([problem for problem in problems if problem .message == "Could not locate import: greenlet " ]) == 0
424+ assert len ([problem for problem in problems if problem .message == "Could not locate import: dbt " ]) == 0
467425
468426
469427def test_job_spark_python_task_linter_unhappy_path (simple_ctx , make_job ) -> None :
470428 """The imported dependency is not defined."""
471- job = make_job (content = "import foobar " , task_type = jobs .SparkPythonTask )
429+ job = make_job (content = "import dbt " , task_type = jobs .SparkPythonTask )
472430
473431 problems , * _ = simple_ctx .workflow_linter .lint_job (job .job_id )
474432
475- assert len ([problem for problem in problems if problem .message == "Could not locate import: foobar " ]) == 1
433+ assert len ([problem for problem in problems if problem .message == "Could not locate import: dbt " ]) == 1
476434
477435
478436def test_workflow_linter_lints_python_wheel_task (simple_ctx , ws , make_job , make_random ) -> None :
@@ -510,28 +468,13 @@ def test_workflow_linter_lints_python_wheel_task(simple_ctx, ws, make_job, make_
510468 allow_list .distribution_compatibility .assert_called_once ()
511469
512470
513- def test_job_spark_python_task_workspace_linter_happy_path (
514- simple_ctx ,
515- make_job ,
516- make_random ,
517- make_cluster ,
518- make_directory ,
519- ) -> None :
520- pyspark_job_path = make_directory () / "spark_job.py"
521- pyspark_job_path .write_text ("import greenlet\n " )
522-
523- new_cluster = make_cluster (single_node = True )
524- task = jobs .Task (
525- task_key = make_random (4 ),
526- spark_python_task = jobs .SparkPythonTask (python_file = pyspark_job_path .as_posix ()),
527- existing_cluster_id = new_cluster .cluster_id ,
528- libraries = [compute .Library (pypi = compute .PythonPyPiLibrary (package = "greenlet" ))],
529- )
530- j = make_job (tasks = [task ])
471+ def test_job_spark_python_task_workspace_linter_happy_path (simple_ctx , make_job ) -> None :
472+ extra_library_for_module = compute .Library (pypi = compute .PythonPyPiLibrary (package = "dbt-core==1.8.7" ))
473+ job = make_job (content = "import dbt\n " , libraries = [extra_library_for_module ])
531474
532- problems , * _ = simple_ctx .workflow_linter .lint_job (j .job_id )
475+ problems , * _ = simple_ctx .workflow_linter .lint_job (job .job_id )
533476
534- assert not [problem for problem in problems if problem .message == "Could not locate import: greenlet " ]
477+ assert not [problem for problem in problems if problem .message == "Could not locate import: dbt " ]
535478
536479
537480def test_job_spark_python_task_dbfs_linter_happy_path (
@@ -547,7 +490,7 @@ def test_job_spark_python_task_dbfs_linter_happy_path(
547490 new_cluster = make_cluster (single_node = True )
548491 task = jobs .Task (
549492 task_key = make_random (4 ),
550- spark_python_task = jobs .SparkPythonTask (python_file = f"dbfs:{ pyspark_job_path . as_posix () } " ),
493+ spark_python_task = jobs .SparkPythonTask (python_file = f"dbfs:{ pyspark_job_path } " ),
551494 existing_cluster_id = new_cluster .cluster_id ,
552495 libraries = [compute .Library (pypi = compute .PythonPyPiLibrary (package = "greenlet" ))],
553496 )
@@ -596,10 +539,10 @@ def test_job_dlt_task_linter_unhappy_path(
596539 make_directory ,
597540 make_pipeline ,
598541) -> None :
599- entrypoint = make_directory ()
600- make_notebook (path = f" { entrypoint } /notebook.py" , content = b"import library_not_found" )
542+ notebook_path = make_directory () / "notebook.py"
543+ make_notebook (path = notebook_path , content = b"import library_not_found" )
601544 dlt_pipeline = make_pipeline (
602- libraries = [pipelines .PipelineLibrary (notebook = NotebookLibrary (path = f" { entrypoint } /notebook.py" ))]
545+ libraries = [pipelines .PipelineLibrary (notebook = NotebookLibrary (path = str ( notebook_path ) ))]
603546 )
604547
605548 task = jobs .Task (
@@ -623,22 +566,22 @@ def test_job_dlt_task_linter_happy_path(
623566 make_directory ,
624567 make_pipeline ,
625568) -> None :
626- entrypoint = make_directory ()
627- make_notebook (path = f" { entrypoint } /notebook.py" , content = b"import greenlet " )
569+ notebook_path = make_directory () / "notebook.py"
570+ make_notebook (path = notebook_path , content = b"import dbt " )
628571 dlt_pipeline = make_pipeline (
629- libraries = [pipelines .PipelineLibrary (notebook = NotebookLibrary (path = f" { entrypoint } /notebook.py" ))]
572+ libraries = [pipelines .PipelineLibrary (notebook = NotebookLibrary (path = str ( notebook_path ) ))]
630573 )
631574
632575 task = jobs .Task (
633576 task_key = make_random (4 ),
634577 pipeline_task = jobs .PipelineTask (pipeline_id = dlt_pipeline .pipeline_id ),
635- libraries = [compute .Library (pypi = compute .PythonPyPiLibrary (package = "greenlet " ))],
578+ libraries = [compute .Library (pypi = compute .PythonPyPiLibrary (package = "dbt-core==1.8.7 " ))],
636579 )
637580 j = make_job (tasks = [task ])
638581
639582 problems , * _ = simple_ctx .workflow_linter .lint_job (j .job_id )
640583
641- assert len ([problem for problem in problems if problem .message == "Could not locate import: greenlet " ]) == 0
584+ assert len ([problem for problem in problems if problem .message == "Could not locate import: dbt " ]) == 0
642585
643586
644587def test_job_dependency_problem_egg_dbr14plus (make_job , make_directory , simple_ctx , ws ) -> None :
0 commit comments