2525 ContextFactory ,
2626)
2727from dagster_sqlmesh .controller .dagster import DagsterSQLMeshController
28-
29- if t .TYPE_CHECKING :
30- from dagster_sqlmesh .translator import SQLMeshDagsterTranslator
28+ from dagster_sqlmesh .translator import SQLMeshDagsterTranslator
3129
3230logger = logging .getLogger (__name__ )
3331
@@ -101,9 +99,9 @@ def from_dagster_metadata(
10199 # convert metadata values
102100 converted : dict [str , dg .MetadataValue ] = {}
103101 for key , value in metadata .items ():
104- assert isinstance (
105- value , dg . MetadataValue
106- ), f"Expected MetadataValue for { key } , got { type ( value ) } "
102+ assert isinstance (value , dg . MetadataValue ), (
103+ f"Expected MetadataValue for { key } , got { type ( value ) } "
104+ )
107105 converted [key ] = value
108106
109107 return cls .model_validate (
@@ -224,7 +222,7 @@ def stop_promotion(self) -> None:
224222
225223 def plan (self , batches : dict [Snapshot , int ]) -> None :
226224 self ._batches = batches
227- self ._count : dict [ Snapshot , int ] = {}
225+ self ._count = {}
228226
229227 for snapshot , _ in self ._batches .items ():
230228 self ._count [snapshot ] = 0
@@ -331,7 +329,7 @@ def __init__(
331329 models_map : dict [str , Model ],
332330 dag : DAG [t .Any ],
333331 prefix : str ,
334- translator : " SQLMeshDagsterTranslator" ,
332+ translator : SQLMeshDagsterTranslator ,
335333 is_testing : bool = False ,
336334 materializations_enabled : bool = True ,
337335 ) -> None :
@@ -423,9 +421,9 @@ def create_materialize_result(
423421 )
424422 last_materialization_status = None
425423 else :
426- assert (
427- last_materialization . asset_materialization is not None
428- ), "Expected asset materialization to be present."
424+ assert last_materialization . asset_materialization is not None , (
425+ "Expected asset materialization to be present."
426+ )
429427 try :
430428 last_materialization_status = (
431429 ModelMaterializationStatus .from_dagster_metadata (
@@ -496,15 +494,19 @@ def report_event(self, event: console.ConsoleEvent) -> None:
496494 log_context .info (
497495 "Snapshot progress complete" ,
498496 {
499- "asset_key" : self ._translator .get_asset_key_str (snapshot .model .name ),
497+ "asset_key" : self ._translator .get_asset_key_str (
498+ snapshot .model .name
499+ ),
500500 },
501501 )
502502 self ._tracker .update_run (snapshot )
503503 else :
504504 log_context .info (
505505 "Snapshot progress update" ,
506506 {
507- "asset_key" : self ._translator .get_asset_key_str (snapshot .model .name ),
507+ "asset_key" : self ._translator .get_asset_key_str (
508+ snapshot .model .name
509+ ),
508510 "progress" : f"{ done } /{ expected } " ,
509511 "duration_ms" : duration_ms ,
510512 },
@@ -580,6 +582,15 @@ def errors(self) -> list[Exception]:
580582
581583
582584class SQLMeshResource (dg .ConfigurableResource ):
585+ """Dagster resource for executing SQLMesh plan and run operations.
586+
587+ The translator is obtained from `config.get_translator()` to ensure
588+ consistency between asset definition loading and runtime execution.
589+
590+ Attributes:
591+ is_testing: Whether the resource is being used in a testing context.
592+ """
593+
583594 is_testing : bool = False
584595
585596 def run (
@@ -599,16 +610,14 @@ def run(
599610 run_options : RunOptions | None = None ,
600611 materializations_enabled : bool = True ,
601612 ) -> t .Iterable [dg .MaterializeResult [t .Any ]]:
602- """Execute SQLMesh based on the configuration given """
613+ """Execute SQLMesh plan and run, yielding MaterializeResult for each model. """
603614 plan_options = plan_options or {}
604615 run_options = run_options or {}
605616
606617 logger = context .log
607618
608619 controller = self .get_controller (
609- config = config ,
610- context_factory = context_factory ,
611- log_override = logger
620+ config = config , context_factory = context_factory , log_override = logger
612621 )
613622
614623 with controller .instance (environment ) as mesh :
@@ -620,7 +629,9 @@ def run(
620629 [model .fqn for model , _ in mesh .non_external_models_dag ()]
621630 )
622631 selected_models_set , models_map , select_models = (
623- self ._get_selected_models_from_context (context = context , config = config , models = models )
632+ self ._get_selected_models_from_context (
633+ context = context , config = config , models = models
634+ )
624635 )
625636
626637 if all_available_models == selected_models_set or select_models is None :
@@ -696,6 +707,7 @@ def create_event_handler(
696707 is_testing : bool ,
697708 materializations_enabled : bool ,
698709 ) -> DagsterSQLMeshEventHandler :
710+ """Create an event handler for processing SQLMesh console events."""
699711 translator = config .get_translator ()
700712 return DagsterSQLMeshEventHandler (
701713 context = context ,
@@ -708,14 +720,17 @@ def create_event_handler(
708720 )
709721
710722 def _get_selected_models_from_context (
711- self ,
712- context : dg .AssetExecutionContext ,
723+ self ,
724+ context : dg .AssetExecutionContext ,
713725 config : SQLMeshContextConfig ,
714- models : MappingProxyType [str , Model ]
726+ models : MappingProxyType [str , Model ],
715727 ) -> tuple [set [str ], dict [str , Model ], list [str ] | None ]:
728+ """Get the selected models from the execution context."""
716729 models_map = models .copy ()
717730 try :
718- selected_output_names = set (context .op_execution_context .selected_output_names )
731+ selected_output_names = set (
732+ context .op_execution_context .selected_output_names
733+ )
719734 except (DagsterInvalidPropertyError , AttributeError ) as e :
720735 # Special case for direct execution context when testing. This is related to:
721736 # https://github.com/dagster-io/dagster/issues/23633
@@ -744,6 +759,7 @@ def get_controller(
744759 context_factory : ContextFactory [ContextCls ],
745760 log_override : logging .Logger | None = None ,
746761 ) -> DagsterSQLMeshController [ContextCls ]:
762+ """Get a SQLMesh controller for executing operations."""
747763 return DagsterSQLMeshController .setup_with_config (
748764 config = config ,
749765 context_factory = context_factory ,
0 commit comments