1+ import datetime as dt
12import json
23import logging
34import os
67import sys
78import tempfile
89import typing as t
9- from dataclasses import dataclass
10+ from dataclasses import dataclass , field
1011
1112import duckdb
1213import polars
@@ -41,8 +42,32 @@ def sample_project_root() -> t.Iterator[str]:
4142 """Creates a temporary project directory containing both SQLMesh and Dagster projects"""
4243 with tempfile .TemporaryDirectory () as tmp_dir :
4344 project_dir = shutil .copytree ("sample" , tmp_dir , dirs_exist_ok = True )
45+
4446 yield project_dir
4547
48+ # Create debug directory with timestamp AFTER test run
49+ debug_dir = os .path .join (
50+ os .path .dirname (os .path .dirname (__file__ )), "debug_runs"
51+ )
52+ os .makedirs (debug_dir , exist_ok = True )
53+ timestamp = dt .datetime .now ().strftime ("%Y%m%d_%H%M%S" )
54+ run_debug_dir = os .path .join (debug_dir , f"run_{ timestamp } " )
55+
56+ # Copy contents to debug directory
57+ try :
58+ shutil .copytree (tmp_dir , run_debug_dir , dirs_exist_ok = True )
59+ logger .info (
60+ f"Copied final test project contents to { run_debug_dir } for debugging"
61+ )
62+ except FileNotFoundError :
63+ logger .warning (
64+ f"Temporary directory { tmp_dir } not found during cleanup copy."
65+ )
66+ except Exception as e :
67+ logger .error (
68+ f"Error copying temporary directory { tmp_dir } to { run_debug_dir } : { e } "
69+ )
70+
4671
4772@pytest .fixture
4873def sample_sqlmesh_project (sample_project_root : str ) -> t .Iterator [str ]:
@@ -75,6 +100,9 @@ class SQLMeshTestContext:
75100 context_config : SQLMeshContextConfig
76101 project_path : str
77102
103+ # Internal state for backup/restore
104+ _backed_up_files : set [str ] = field (default_factory = set , init = False )
105+
78106 def create_controller (
79107 self , enable_debug_console : bool = False
80108 ) -> DagsterSQLMeshController :
@@ -160,6 +188,39 @@ def cleanup_modified_files(self) -> None:
160188 self .restore_model_file (model_name )
161189 self ._backed_up_files .clear ()
162190
191+ def save_sqlmesh_debug_state (self , name_suffix : str = "manual_save" ) -> str :
192+ """Saves the current state of the SQLMesh project to the debug directory.
193+
194+ Copies the contents of the SQLMesh project directory (self.project_path)
195+ to a timestamped sub-directory within the 'debug_runs' folder.
196+
197+ Args:
198+ name_suffix: An optional suffix to append to the debug directory name
199+ to distinguish this save point (e.g., 'before_change',
200+ 'after_plan'). Defaults to 'manual_save'.
201+
202+ Returns:
203+ The path to the created debug state directory.
204+ """
205+ debug_dir_base = os .path .join (
206+ os .path .dirname (self .project_path ), ".." , "debug_runs"
207+ )
208+ os .makedirs (debug_dir_base , exist_ok = True )
209+ timestamp = dt .datetime .now ().strftime ("%Y%m%d_%H%M%S" )
210+ run_debug_dir = os .path .join (
211+ debug_dir_base , f"sqlmesh_state_{ timestamp } _{ name_suffix } "
212+ )
213+
214+ try :
215+ shutil .copytree (self .project_path , run_debug_dir , dirs_exist_ok = True )
216+ logger .info (f"Saved SQLMesh project debug state to { run_debug_dir } " )
217+ return run_debug_dir
218+ except Exception as e :
219+ logger .error (
220+ f"Error saving SQLMesh project debug state to { run_debug_dir } : { e } "
221+ )
222+ raise
223+
163224 def query (self , * args : t .Any , return_df : bool = False , ** kwargs : t .Any ) -> t .Any :
164225 """Execute a query against the test database.
165226
@@ -477,7 +538,8 @@ def model_change_test_context(
477538class DagsterTestContext :
478539 """A test context for running Dagster"""
479540
480- project_path : str
541+ dagster_project_path : str
542+ sqlmesh_project_path : str
481543
482544 def _run_command (self , cmd : list [str ]) -> None :
483545 """Execute a command and stream its output in real-time.
@@ -488,76 +550,93 @@ def _run_command(self, cmd: list[str]) -> None:
488550 Raises:
489551 subprocess.CalledProcessError: If the command returns non-zero exit code
490552 """
553+ import io
491554 import queue
492555 import threading
493556 import typing as t
494557
495558 def stream_output (
496- pipe : t .IO [str ], output_queue : queue .Queue [str | None ]
559+ pipe : t .IO [str ], output_queue : queue .Queue [tuple [ str , str | None ] ]
497560 ) -> None :
498- """Stream output from a pipe to a queue."""
561+ """Stream output from a pipe to a queue.
562+
563+ Args:
564+ pipe: The pipe to read from (stdout or stderr)
565+ output_queue: Queue to write output to, as (stream_type, line) tuples
566+ """
567+ # Use a StringIO buffer to accumulate characters into lines
568+ buffer = io .StringIO ()
569+ stream_type = "stdout" if pipe is process .stdout else "stderr"
570+
499571 try :
500572 while True :
501573 char = pipe .read (1 )
502574 if not char :
575+ # Flush any remaining content in buffer
576+ remaining = buffer .getvalue ()
577+ if remaining :
578+ output_queue .put ((stream_type , remaining ))
503579 break
504- output_queue .put (char )
580+
581+ buffer .write (char )
582+
583+ # If we hit a newline, flush the buffer
584+ if char == "\n " :
585+ output_queue .put ((stream_type , buffer .getvalue ()))
586+ buffer = io .StringIO ()
505587 finally :
506- output_queue .put (None ) # Signal EOF
588+ buffer .close ()
589+ output_queue .put ((stream_type , None )) # Signal EOF
507590
508591 print (f"Running command: { ' ' .join (cmd )} " )
592+ print (f"Current working directory: { os .getcwd ()} " )
593+ print (f"Changing to directory: { self .dagster_project_path } " )
594+
595+ # Change to the dagster project directory before running the command
596+ os .chdir (self .dagster_project_path )
597+
509598 process = subprocess .Popen (
510599 cmd ,
511600 stdout = subprocess .PIPE ,
512601 stderr = subprocess .PIPE ,
513602 text = True ,
514603 universal_newlines = True ,
604+ encoding = "utf-8" ,
605+ errors = "replace" ,
515606 )
516607
517608 if not process .stdout or not process .stderr :
518609 raise RuntimeError ("Failed to open subprocess pipes" )
519610
520- # Create queues for stdout and stderr
521- stdout_queue : queue .Queue [str | None ] = queue .Queue ()
522- stderr_queue : queue .Queue [str | None ] = queue .Queue ()
611+ # Create a single queue for all output
612+ output_queue : queue .Queue [tuple [str , str | None ]] = queue .Queue ()
523613
524614 # Start threads to read from pipes
525615 stdout_thread = threading .Thread (
526- target = stream_output , args = (process .stdout , stdout_queue )
616+ target = stream_output , args = (process .stdout , output_queue )
527617 )
528618 stderr_thread = threading .Thread (
529- target = stream_output , args = (process .stderr , stderr_queue )
619+ target = stream_output , args = (process .stderr , output_queue )
530620 )
531621
532622 stdout_thread .daemon = True
533623 stderr_thread .daemon = True
534624 stdout_thread .start ()
535625 stderr_thread .start ()
536626
537- # Read from queues and print output
538- stdout_done = False
539- stderr_done = False
627+ # Track which streams are still active
628+ active_streams = {"stdout" , "stderr" }
540629
541- while not ( stdout_done and stderr_done ):
542- # Handle stdout
630+ # Read from queue and print output
631+ while active_streams :
543632 try :
544- char = stdout_queue . get_nowait ( )
545- if char is None :
546- stdout_done = True
633+ stream_type , content = output_queue . get ( timeout = 0.1 )
634+ if content is None :
635+ active_streams . remove ( stream_type )
547636 else :
548- print (char , end = "" , flush = True )
637+ print (content , end = "" , flush = True )
549638 except queue .Empty :
550- pass
551-
552- # Handle stderr
553- try :
554- char = stderr_queue .get_nowait ()
555- if char is None :
556- stderr_done = True
557- else :
558- print (char , end = "" , flush = True )
559- except queue .Empty :
560- pass
639+ continue
561640
562641 stdout_thread .join ()
563642 stderr_thread .join ()
@@ -583,7 +662,10 @@ def asset_materialisation(
583662 "resources" : {
584663 "sqlmesh" : {
585664 "config" : {
586- "config" : {"gateway" : "local" , "path" : self .project_path }
665+ "config" : {
666+ "gateway" : "local" ,
667+ "path" : self .sqlmesh_project_path ,
668+ }
587669 }
588670 }
589671 }
@@ -610,7 +692,7 @@ def asset_materialisation(
610692 "asset" ,
611693 "materialize" ,
612694 "-f" ,
613- os .path .join (self .project_path , "definitions.py" ),
695+ os .path .join (self .dagster_project_path , "definitions.py" ),
614696 "--select" ,
615697 "," .join (assets ),
616698 "--config-json" ,
@@ -633,7 +715,10 @@ def sample_dagster_test_context(
633715 sample_dagster_project : str ,
634716) -> t .Iterator [DagsterTestContext ]:
635717 test_context = DagsterTestContext (
636- project_path = os .path .join (sample_dagster_project ),
718+ dagster_project_path = os .path .join (sample_dagster_project ),
719+ sqlmesh_project_path = os .path .join (
720+ sample_dagster_project .replace ("dagster_project" , "sqlmesh_project" )
721+ ),
637722 )
638723 yield test_context
639724
0 commit comments