@@ -37,18 +37,34 @@ def setup_debug_logging_for_tests() -> None:
3737
3838
3939@pytest .fixture
40- def sample_sqlmesh_project () -> t .Iterator [str ]:
41- """Creates a temporary sqlmesh project by copying the sample project """
40+ def sample_project_root () -> t .Iterator [str ]:
41+ """Creates a temporary project directory containing both SQLMesh and Dagster projects """
4242 with tempfile .TemporaryDirectory () as tmp_dir :
43- project_dir = shutil .copytree (
44- "sample/sqlmesh_project" , os .path .join (tmp_dir , "project" )
45- )
46- db_path = os .path .join (project_dir , "db.db" )
47- if os .path .exists (db_path ):
48- os .remove (os .path .join (project_dir , "db.db" ))
43+ project_dir = shutil .copytree ("sample" , tmp_dir , dirs_exist_ok = True )
44+ yield project_dir
45+
46+
47+ @pytest .fixture
48+ def sample_sqlmesh_project (sample_project_root : str ) -> t .Iterator [str ]:
49+ """Returns path to the SQLMesh project within the sample project"""
50+ sqlmesh_project_dir = os .path .join (sample_project_root , "sqlmesh_project" )
51+ db_path = os .path .join (sqlmesh_project_dir , "db.db" )
52+ if os .path .exists (db_path ):
53+ os .remove (db_path )
54+ yield sqlmesh_project_dir
55+
56+
57+ @pytest .fixture
58+ def sample_dagster_project (sample_project_root : str ) -> t .Iterator [str ]:
59+ """Returns path to the Dagster project within the sample project"""
60+ dagster_project_dir = os .path .join (sample_project_root , "dagster_project" )
61+ sqlmesh_project_dir = os .path .join (sample_project_root , "sqlmesh_project" )
62+
63+ db_path = os .path .join (sqlmesh_project_dir , "db.db" )
64+ if os .path .exists (db_path ):
65+ os .remove (db_path )
4966
50- # Initialize the "source" data
51- yield str (project_dir )
67+ yield dagster_project_dir
5268
5369
5470@dataclass
@@ -463,6 +479,43 @@ class DagsterTestContext:
463479
464480 project_path : str
465481
482+ def _run_command (self , cmd : list [str ]) -> None :
483+ """Execute a command and stream its output in real-time.
484+
485+ Args:
486+ cmd: List of command parts to execute
487+
488+ Raises:
489+ subprocess.CalledProcessError: If the command returns non-zero exit code
490+ """
491+ print (f"Running command: { ' ' .join (cmd )} " )
492+ process = subprocess .Popen (
493+ cmd ,
494+ stdout = subprocess .PIPE ,
495+ stderr = subprocess .PIPE ,
496+ text = True ,
497+ bufsize = 1 ,
498+ universal_newlines = True ,
499+ )
500+
501+ # Stream output in real-time
502+ while True :
503+ stdout_line = process .stdout .readline () if process .stdout else ""
504+ stderr_line = process .stderr .readline () if process .stderr else ""
505+
506+ if stdout_line :
507+ print (stdout_line .rstrip ())
508+ if stderr_line :
509+ print (stderr_line .rstrip ())
510+
511+ process_finished = not stdout_line and not stderr_line and process .poll () is not None
512+ if process_finished :
513+ break
514+
515+ process_failed = process .returncode != 0
516+ if process_failed :
517+ raise subprocess .CalledProcessError (process .returncode , cmd )
518+
466519 def asset_materialisation (
467520 self ,
468521 assets : list [str ],
@@ -476,8 +529,6 @@ def asset_materialisation(
476529 plan_options: Optional SQLMesh plan options to pass to the config
477530 run_options: Optional SQLMesh run options to pass to the config
478531 """
479-
480- # Construct the base config
481532 config : dict [str , t .Any ] = {
482533 "resources" : {
483534 "sqlmesh" : {
@@ -494,7 +545,6 @@ def asset_materialisation(
494545 k : v for k , v in plan_options .items () if v is not None
495546 }
496547
497- # Add run options if provided
498548 if run_options :
499549 config ["resources" ]["sqlmesh" ]["config" ]["run_options_override" ] = {
500550 k : v for k , v in run_options .items () if v is not None
@@ -503,7 +553,6 @@ def asset_materialisation(
503553 # Convert config to JSON string, escaping backslashes for Windows paths
504554 config_json = json .dumps (config ).replace ("\\ " , "\\ \\ " )
505555
506- # Construct the command
507556 cmd = [
508557 sys .executable ,
509558 "-m" ,
@@ -518,8 +567,7 @@ def asset_materialisation(
518567 config_json ,
519568 ]
520569
521- # Run the command
522- subprocess .run (cmd , check = True )
570+ self ._run_command (cmd )
523571
524572 def reset_assets (self ) -> None :
525573 """Resets the assets to the original state"""
@@ -530,27 +578,6 @@ def init_test_source(self) -> None:
530578 self .asset_materialisation (assets = ["test_source" ])
531579
532580
533-
534- @pytest .fixture
535- def sample_dagster_project () -> t .Iterator [str ]:
536- """Creates a temporary dagster project by copying the sample project"""
537- with tempfile .TemporaryDirectory () as tmp_dir :
538- project_dir = shutil .copytree (
539- "sample" ,
540- tmp_dir ,
541- )
542- dagster_project_dir = os .path .join (project_dir , "dagster_project" )
543- sqlmesh_project_dir = os .path .join (project_dir , "sqlmesh_project" )
544-
545- db_path = os .path .join (sqlmesh_project_dir , "db.db" )
546- if os .path .exists (db_path ):
547- os .remove (os .path .join (sqlmesh_project_dir , "db.db" ))
548-
549- # Initialize the "source" data
550- yield str (dagster_project_dir )
551-
552-
553-
554581@pytest .fixture
555582def sample_dagster_test_context (
556583 sample_dagster_project : str ,
0 commit comments