@@ -146,7 +146,9 @@ def __init__(
146146 self ._prefix = prefix
147147 self ._context = context
148148 self ._logger = context .log
149- self ._tracker = MaterializationTracker (sorted_dag = dag .sorted [:], logger = self ._logger )
149+ self ._tracker = MaterializationTracker (
150+ sorted_dag = dag .sorted [:], logger = self ._logger
151+ )
150152 self ._stage = "plan"
151153 self ._errors : list [Exception ] = []
152154 self ._is_testing = is_testing
@@ -328,8 +330,7 @@ def run(
328330 logger = context .log
329331
330332 controller = self .get_controller (
331- context_factory = context_factory ,
332- log_override = logger
333+ context_factory = context_factory , log_override = logger
333334 )
334335
335336 with controller .instance (environment ) as mesh :
@@ -341,10 +342,7 @@ def run(
341342 [model .fqn for model , _ in mesh .non_external_models_dag ()]
342343 )
343344 selected_models_set , models_map , select_models = (
344- self ._get_selected_models_from_context (
345- context = context ,
346- models = models
347- )
345+ self ._get_selected_models_from_context (context = context , models = models )
348346 )
349347
350348 if all_available_models == selected_models_set or select_models is None :
@@ -356,11 +354,32 @@ def run(
356354 else :
357355 logger .info (f"selected models: { select_models } " )
358356
359- event_handler = DagsterSQLMeshEventHandler (
360- context = context , models_map = models_map , dag = dag ,
361- prefix = "sqlmesh: " , is_testing = self .is_testing
357+ event_handler = self .create_event_handler (
358+ context = context ,
359+ models_map = models_map ,
360+ dag = dag ,
361+ prefix = "sqlmesh: " ,
362+ is_testing = self .is_testing ,
362363 )
363364
365+ def raise_for_sqlmesh_errors (
366+ event_handler : DagsterSQLMeshEventHandler ,
367+ additional_errors : list [Exception ] | None = None ,
368+ ) -> None :
369+ additional_errors = additional_errors or []
370+ errors = event_handler .errors
371+ if len (errors ) + len (additional_errors ) == 0 :
372+ return
373+ for error in errors :
374+ logger .error (
375+ f"sqlmesh encountered the following error during sqlmesh { event_handler .stage } : { error } "
376+ )
377+ raise PlanOrRunFailedError (
378+ event_handler .stage ,
379+ f"sqlmesh failed during { event_handler .stage } with { len (event_handler .errors ) + 1 } errors" ,
380+ [* errors , * additional_errors ],
381+ )
382+
364383 try :
365384 for event in mesh .plan_and_run (
366385 start = start ,
@@ -376,16 +395,30 @@ def run(
376395 event_handler .process_events (event )
377396 except SQLMeshError as e :
378397 logger .error (f"sqlmesh error: { e } " )
379- errors = event_handler .errors
380- for error in errors :
381- logger .error (f"sqlmesh encountered the following error during sqlmesh { event_handler .stage } : { error } " )
382- raise PlanOrRunFailedError (
383- event_handler .stage ,
384- f"sqlmesh failed during { event_handler .stage } with { len (event_handler .errors ) + 1 } errors" ,
385- [e , * event_handler .errors ],
386- )
398+ raise_for_sqlmesh_errors (event_handler , [GenericSQLMeshError (str (e ))])
399+ # Some errors do not raise exceptions immediately, so we need to check
400+ # the event handler for any errors that may have been collected.
401+ raise_for_sqlmesh_errors (event_handler )
402+
387403 yield from event_handler .notify_success (mesh .context )
388404
405+ def create_event_handler (
406+ self ,
407+ * ,
408+ context : AssetExecutionContext ,
409+ dag : DAG [str ],
410+ models_map : dict [str , Model ],
411+ prefix : str ,
412+ is_testing : bool ,
413+ ) -> DagsterSQLMeshEventHandler :
414+ return DagsterSQLMeshEventHandler (
415+ context = context ,
416+ dag = dag ,
417+ models_map = models_map ,
418+ prefix = prefix ,
419+ is_testing = is_testing ,
420+ )
421+
389422 def _get_selected_models_from_context (
390423 self , context : AssetExecutionContext , models : MappingProxyType [str , Model ]
391424 ) -> tuple [set [str ], dict [str , Model ], list [str ] | None ]:
@@ -421,5 +454,5 @@ def get_controller(
421454 return DagsterSQLMeshController .setup_with_config (
422455 config = self .config ,
423456 context_factory = context_factory ,
424- log_override = log_override
457+ log_override = log_override ,
425458 )
0 commit comments