1+ import json
12import logging
23import os
34import shutil
5+ import subprocess
46import sys
57import tempfile
68import typing as t
@@ -35,7 +37,7 @@ def setup_debug_logging_for_tests() -> None:
3537
3638
3739@pytest .fixture
38- def sample_sqlmesh_project () -> t .Generator [str , None , None ]:
40+ def sample_sqlmesh_project () -> t .Iterator [str ]:
3941 """Creates a temporary sqlmesh project by copying the sample project"""
4042 with tempfile .TemporaryDirectory () as tmp_dir :
4143 project_dir = shutil .copytree (
@@ -359,7 +361,7 @@ def plan_and_run(
359361@pytest .fixture
360362def sample_sqlmesh_test_context (
361363 sample_sqlmesh_project : str ,
362- ) -> t .Generator [SQLMeshTestContext , None , None ]:
364+ ) -> t .Iterator [SQLMeshTestContext ]:
363365 db_path = os .path .join (sample_sqlmesh_project , "db.db" )
364366 config = SQLMeshConfig (
365367 gateways = {
@@ -417,7 +419,7 @@ def permanent_sqlmesh_project() -> str:
417419@pytest .fixture
418420def model_change_test_context (
419421 permanent_sqlmesh_project : str ,
420- ) -> t .Generator [SQLMeshTestContext , None , None ]:
422+ ) -> t .Iterator [SQLMeshTestContext ]:
421423 """FOR DEBUGGING ONLY: Creates a SQLMesh test context specifically for testing model code changes.
422424
423425 This fixture provides a context that allows modifying SQL model files and ensures
@@ -455,5 +457,109 @@ def model_change_test_context(
455457 # test_context.cleanup_modified_files()
456458
457459
460+ @dataclass
461+ class DagsterTestContext :
462+ """A test context for running Dagster"""
463+
464+ project_path : str
465+
466+ def asset_materialisation (
467+ self ,
468+ assets : list [str ],
469+ plan_options : PlanOptions | None = None ,
470+ run_options : RunOptions | None = None ,
471+ ) -> None :
472+ """Materialises the given Dagster assets using CLI command.
473+
474+ Args:
475+ assets: String of comma-separated asset names to materialize
476+ plan_options: Optional SQLMesh plan options to pass to the config
477+ run_options: Optional SQLMesh run options to pass to the config
478+ """
479+
480+ # Construct the base config
481+ config : dict [str , t .Any ] = {
482+ "resources" : {
483+ "sqlmesh" : {
484+ "config" : {
485+ "config" : {"gateway" : "local" , "path" : self .project_path }
486+ }
487+ }
488+ }
489+ }
490+
491+ # Add plan options if provided
492+ if plan_options :
493+ config ["resources" ]["sqlmesh" ]["config" ]["plan_options_override" ] = {
494+ k : v for k , v in plan_options .items () if v is not None
495+ }
496+
497+ # Add run options if provided
498+ if run_options :
499+ config ["resources" ]["sqlmesh" ]["config" ]["run_options_override" ] = {
500+ k : v for k , v in run_options .items () if v is not None
501+ }
502+
503+ # Convert config to JSON string, escaping backslashes for Windows paths
504+ config_json = json .dumps (config ).replace ("\\ " , "\\ \\ " )
505+
506+ # Construct the command
507+ cmd = [
508+ sys .executable ,
509+ "-m" ,
510+ "dagster" ,
511+ "asset" ,
512+ "materialize" ,
513+ "-f" ,
514+ os .path .join (self .project_path , "definitions.py" ),
515+ "--select" ,
516+ "," .join (assets ),
517+ "--config-json" ,
518+ config_json ,
519+ ]
520+
521+ # Run the command
522+ subprocess .run (cmd , check = True )
523+
524+ def reset_assets (self ) -> None :
525+ """Resets the assets to the original state"""
526+ self .asset_materialisation (assets = ["reset_asset" ])
527+
528+ def init_test_source (self ) -> None :
529+ """Initialises the test source"""
530+ self .asset_materialisation (assets = ["test_source" ])
531+
532+
533+
534+ @pytest .fixture
535+ def sample_dagster_project () -> t .Iterator [str ]:
536+ """Creates a temporary dagster project by copying the sample project"""
537+ with tempfile .TemporaryDirectory () as tmp_dir :
538+ project_dir = shutil .copytree (
539+ "sample" ,
540+ tmp_dir ,
541+ )
542+ dagster_project_dir = os .path .join (project_dir , "dagster_project" )
543+ sqlmesh_project_dir = os .path .join (project_dir , "sqlmesh_project" )
544+
545+ db_path = os .path .join (sqlmesh_project_dir , "db.db" )
546+ if os .path .exists (db_path ):
547+ os .remove (os .path .join (sqlmesh_project_dir , "db.db" ))
548+
549+ # Initialize the "source" data
550+ yield str (dagster_project_dir )
551+
552+
553+
554+ @pytest .fixture
555+ def sample_dagster_test_context (
556+ sample_dagster_project : str ,
557+ ) -> t .Iterator [DagsterTestContext ]:
558+ test_context = DagsterTestContext (
559+ project_path = os .path .join (sample_dagster_project ),
560+ )
561+ yield test_context
562+
563+
458564if __name__ == "__main__" :
459- pytest .main ([__file__ ])
565+ pytest .main ([__file__ ])
0 commit comments