Skip to content

Commit ed11df6

Browse files
committed
refactor(translator): use ConfigurableResource and AssetKey for external deps
1 parent cd56c58 commit ed11df6

File tree

5 files changed

+135
-143
lines changed

5 files changed

+135
-143
lines changed

dagster_sqlmesh/controller/dagster.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
)
88
from dagster_sqlmesh.translator import SQLMeshDagsterTranslator
99
from dagster_sqlmesh.types import (
10-
ConvertibleToAssetDep,
10+
ConvertibleToAssetKey,
1111
ConvertibleToAssetOut,
1212
SQLMeshModelDep,
1313
SQLMeshMultiAssetOptions,
@@ -24,11 +24,14 @@ def to_asset_outs(
2424
environment: str,
2525
translator: SQLMeshDagsterTranslator,
2626
) -> SQLMeshMultiAssetOptions:
27-
"""Loads all the asset outs of the current sqlmesh environment. If a
28-
cache is provided, it will be tried first to load the asset outs."""
27+
"""Loads all the asset outs of the current sqlmesh environment.
28+
29+
If a cache is provided, it will be tried first to load the asset outs.
30+
External dependencies use IntermediateAssetDep objects that convert to AssetKey.
31+
"""
2932

3033
internal_asset_deps_map: dict[str, set[str]] = {}
31-
deps_map: dict[str, ConvertibleToAssetDep] = {}
34+
deps_map: dict[str, ConvertibleToAssetKey] = {}
3235
asset_outs: dict[str, ConvertibleToAssetOut] = {}
3336

3437
with self.instance(environment, "to_asset_outs") as instance:
@@ -52,14 +55,15 @@ def to_asset_outs(
5255

5356
internal_asset_deps.add(dep_asset_key_str)
5457
else:
58+
# External dependency - create IntermediateAssetDep
5559
table = translator.get_asset_key_str(dep.fqn)
56-
key = translator.get_asset_key(
57-
context, dep.fqn
58-
).to_user_string()
59-
internal_asset_deps.add(key)
60+
key = translator.get_asset_key(context, dep.fqn)
61+
internal_asset_deps.add(key.to_user_string())
6062

61-
# create an external dep
62-
deps_map[table] = translator.create_asset_dep(key=key)
63+
# Create lazy intermediate representation for caching
64+
deps_map[table] = translator.create_asset_dep(
65+
key=key.to_user_string()
66+
)
6367

6468
model_key = translator.get_asset_key_str(model.fqn)
6569
asset_outs[model_key] = translator.create_asset_out(

dagster_sqlmesh/resource.py

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
ContextFactory,
2626
)
2727
from dagster_sqlmesh.controller.dagster import DagsterSQLMeshController
28-
29-
if t.TYPE_CHECKING:
30-
from dagster_sqlmesh.translator import SQLMeshDagsterTranslator
28+
from dagster_sqlmesh.translator import SQLMeshDagsterTranslator
3129

3230
logger = logging.getLogger(__name__)
3331

@@ -101,9 +99,9 @@ def from_dagster_metadata(
10199
# convert metadata values
102100
converted: dict[str, dg.MetadataValue] = {}
103101
for key, value in metadata.items():
104-
assert isinstance(
105-
value, dg.MetadataValue
106-
), f"Expected MetadataValue for {key}, got {type(value)}"
102+
assert isinstance(value, dg.MetadataValue), (
103+
f"Expected MetadataValue for {key}, got {type(value)}"
104+
)
107105
converted[key] = value
108106

109107
return cls.model_validate(
@@ -224,7 +222,7 @@ def stop_promotion(self) -> None:
224222

225223
def plan(self, batches: dict[Snapshot, int]) -> None:
226224
self._batches = batches
227-
self._count: dict[Snapshot, int] = {}
225+
self._count = {}
228226

229227
for snapshot, _ in self._batches.items():
230228
self._count[snapshot] = 0
@@ -331,7 +329,7 @@ def __init__(
331329
models_map: dict[str, Model],
332330
dag: DAG[t.Any],
333331
prefix: str,
334-
translator: "SQLMeshDagsterTranslator",
332+
translator: SQLMeshDagsterTranslator,
335333
is_testing: bool = False,
336334
materializations_enabled: bool = True,
337335
) -> None:
@@ -423,9 +421,9 @@ def create_materialize_result(
423421
)
424422
last_materialization_status = None
425423
else:
426-
assert (
427-
last_materialization.asset_materialization is not None
428-
), "Expected asset materialization to be present."
424+
assert last_materialization.asset_materialization is not None, (
425+
"Expected asset materialization to be present."
426+
)
429427
try:
430428
last_materialization_status = (
431429
ModelMaterializationStatus.from_dagster_metadata(
@@ -496,15 +494,19 @@ def report_event(self, event: console.ConsoleEvent) -> None:
496494
log_context.info(
497495
"Snapshot progress complete",
498496
{
499-
"asset_key": self._translator.get_asset_key_str(snapshot.model.name),
497+
"asset_key": self._translator.get_asset_key_str(
498+
snapshot.model.name
499+
),
500500
},
501501
)
502502
self._tracker.update_run(snapshot)
503503
else:
504504
log_context.info(
505505
"Snapshot progress update",
506506
{
507-
"asset_key": self._translator.get_asset_key_str(snapshot.model.name),
507+
"asset_key": self._translator.get_asset_key_str(
508+
snapshot.model.name
509+
),
508510
"progress": f"{done}/{expected}",
509511
"duration_ms": duration_ms,
510512
},
@@ -580,6 +582,15 @@ def errors(self) -> list[Exception]:
580582

581583

582584
class SQLMeshResource(dg.ConfigurableResource):
585+
"""Dagster resource for executing SQLMesh plan and run operations.
586+
587+
The translator is obtained from `config.get_translator()` to ensure
588+
consistency between asset definition loading and runtime execution.
589+
590+
Attributes:
591+
is_testing: Whether the resource is being used in a testing context.
592+
"""
593+
583594
is_testing: bool = False
584595

585596
def run(
@@ -599,16 +610,14 @@ def run(
599610
run_options: RunOptions | None = None,
600611
materializations_enabled: bool = True,
601612
) -> t.Iterable[dg.MaterializeResult[t.Any]]:
602-
"""Execute SQLMesh based on the configuration given"""
613+
"""Execute SQLMesh plan and run, yielding MaterializeResult for each model."""
603614
plan_options = plan_options or {}
604615
run_options = run_options or {}
605616

606617
logger = context.log
607618

608619
controller = self.get_controller(
609-
config=config,
610-
context_factory=context_factory,
611-
log_override=logger
620+
config=config, context_factory=context_factory, log_override=logger
612621
)
613622

614623
with controller.instance(environment) as mesh:
@@ -620,7 +629,9 @@ def run(
620629
[model.fqn for model, _ in mesh.non_external_models_dag()]
621630
)
622631
selected_models_set, models_map, select_models = (
623-
self._get_selected_models_from_context(context=context, config=config, models=models)
632+
self._get_selected_models_from_context(
633+
context=context, config=config, models=models
634+
)
624635
)
625636

626637
if all_available_models == selected_models_set or select_models is None:
@@ -696,6 +707,7 @@ def create_event_handler(
696707
is_testing: bool,
697708
materializations_enabled: bool,
698709
) -> DagsterSQLMeshEventHandler:
710+
"""Create an event handler for processing SQLMesh console events."""
699711
translator = config.get_translator()
700712
return DagsterSQLMeshEventHandler(
701713
context=context,
@@ -708,14 +720,17 @@ def create_event_handler(
708720
)
709721

710722
def _get_selected_models_from_context(
711-
self,
712-
context: dg.AssetExecutionContext,
723+
self,
724+
context: dg.AssetExecutionContext,
713725
config: SQLMeshContextConfig,
714-
models: MappingProxyType[str, Model]
726+
models: MappingProxyType[str, Model],
715727
) -> tuple[set[str], dict[str, Model], list[str] | None]:
728+
"""Get the selected models from the execution context."""
716729
models_map = models.copy()
717730
try:
718-
selected_output_names = set(context.op_execution_context.selected_output_names)
731+
selected_output_names = set(
732+
context.op_execution_context.selected_output_names
733+
)
719734
except (DagsterInvalidPropertyError, AttributeError) as e:
720735
# Special case for direct execution context when testing. This is related to:
721736
# https://github.com/dagster-io/dagster/issues/23633
@@ -744,6 +759,7 @@ def get_controller(
744759
context_factory: ContextFactory[ContextCls],
745760
log_override: logging.Logger | None = None,
746761
) -> DagsterSQLMeshController[ContextCls]:
762+
"""Get a SQLMesh controller for executing operations."""
747763
return DagsterSQLMeshController.setup_with_config(
748764
config=config,
749765
context_factory=context_factory,

0 commit comments

Comments
 (0)