1616from .config import SQLMeshContextConfig
1717from .controller import PlanOptions , RunOptions
1818from .controller .dagster import DagsterSQLMeshController
19- from .utils import sqlmesh_model_name_to_key
19+ from .translator import SQLMeshDagsterTranslator
2020
2121
2222class MaterializationTracker :
2323 """Tracks sqlmesh materializations and notifies dagster in the correct
2424 order. This is necessary because sqlmesh may skip some materializations that
2525 have no changes and those will be reported as completed out of order."""
2626
27- def __init__ (self , sorted_dag : list [str ], logger : logging .Logger ) -> None :
27+ def __init__ (self , sorted_dag : list [str ], logger : logging .Logger , translator : SQLMeshDagsterTranslator ) -> None :
2828 self .logger = logger
29+ self .translator = translator
2930 self ._batches : dict [Snapshot , int ] = {}
3031 self ._count : dict [Snapshot , int ] = {}
3132 self ._complete_update_status : dict [str , bool ] = {}
@@ -114,12 +115,14 @@ def __init__(
114115 models_map : dict [str , Model ],
115116 dag : DAG [t .Any ],
116117 prefix : str ,
118+ translator : SQLMeshDagsterTranslator
117119 ) -> None :
118120 self ._models_map = models_map
119121 self ._prefix = prefix
120122 self ._context = context
121123 self ._logger = context .log
122- self ._tracker = MaterializationTracker (dag .sorted [:], self ._logger )
124+ self .translator = translator
125+ self ._tracker = MaterializationTracker (sorted_dag = dag .sorted [:], logger = self ._logger , translator = self .translator )
123126 self ._stage = "plan"
124127
125128 def process_events (self , event : console .ConsoleEvent ) -> None :
@@ -143,7 +146,8 @@ def notify_success(
143146 # We allow selecting models. That value is mapped to models_map.
144147 # If the model is not in models_map, we can skip any notification
145148 if model :
146- output_key = sqlmesh_model_name_to_key (model .name )
149+ # Passing model.fqn to translator
150+ output_key = self .translator .get_asset_key_str (model .fqn )
147151 asset_key = self ._context .asset_key_for_output (output_key )
148152 yield MaterializeResult (
149153 asset_key = asset_key ,
@@ -192,7 +196,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
192196 log_context .info (
193197 "Snapshot progress update" ,
194198 {
195- "asset_key" : sqlmesh_model_name_to_key (snapshot .model .name ),
199+ "asset_key" : self . translator . get_asset_key_str (snapshot .model .name ),
196200 "progress" : f"{ done } /{ expected } " ,
197201 "duration_ms" : duration_ms ,
198202 },
@@ -263,6 +267,7 @@ def run(
263267 self ,
264268 context : AssetExecutionContext ,
265269 * ,
270+ translator : SQLMeshDagsterTranslator | None = None ,
266271 environment : str = "dev" ,
267272 start : TimeLike | None = None ,
268273 end : TimeLike | None = None ,
@@ -277,9 +282,12 @@ def run(
277282 plan_options = plan_options or {}
278283 run_options = run_options or {}
279284
285+ if translator is None :
286+ translator = SQLMeshDagsterTranslator ()
287+
280288 logger = context .log
281289
282- controller = self .get_controller (logger )
290+ controller = self .get_controller (logger , translator )
283291
284292 with controller .instance (environment ) as mesh :
285293 dag = mesh .models_dag ()
@@ -295,7 +303,7 @@ def run(
295303 models_map = {}
296304 for key , model in models .items ():
297305 if (
298- sqlmesh_model_name_to_key (model .name )
306+ translator . get_asset_key_str (model .fqn )
299307 in context .selected_output_names
300308 ):
301309 models_map [key ] = model
@@ -312,7 +320,8 @@ def run(
312320 logger .info (f"selected models: { select_models } " )
313321
314322 event_handler = DagsterSQLMeshEventHandler (
315- context , models_map , dag , "sqlmesh: "
323+ context = context , models_map = models_map , dag = dag ,
324+ prefix = "sqlmesh: " , translator = translator
316325 )
317326
318327 for event in mesh .plan_and_run (
@@ -330,8 +339,8 @@ def run(
330339 yield from event_handler .notify_success (mesh .context )
331340
332341 def get_controller (
333- self , log_override : logging .Logger | None = None
342+ self , log_override : logging .Logger | None = None , translator : SQLMeshDagsterTranslator | None = None
334343 ) -> DagsterSQLMeshController :
335344 return DagsterSQLMeshController .setup_with_config (
336- self .config , log_override = log_override
345+ self .config , log_override = log_override , translator_override = translator
337346 )
0 commit comments