diff --git a/dagster_sqlmesh/controller/dagster.py b/dagster_sqlmesh/controller/dagster.py index c0e0343..0f702de 100644 --- a/dagster_sqlmesh/controller/dagster.py +++ b/dagster_sqlmesh/controller/dagster.py @@ -7,7 +7,7 @@ ) from dagster_sqlmesh.translator import SQLMeshDagsterTranslator from dagster_sqlmesh.types import ( - ConvertibleToAssetDep, + ConvertibleToAssetKey, ConvertibleToAssetOut, SQLMeshModelDep, SQLMeshMultiAssetOptions, @@ -24,11 +24,14 @@ def to_asset_outs( environment: str, translator: SQLMeshDagsterTranslator, ) -> SQLMeshMultiAssetOptions: - """Loads all the asset outs of the current sqlmesh environment. If a - cache is provided, it will be tried first to load the asset outs.""" + """Loads all the asset outs of the current sqlmesh environment. + + If a cache is provided, it will be tried first to load the asset outs. + External dependencies use IntermediateAssetDep objects that convert to AssetKey. + """ internal_asset_deps_map: dict[str, set[str]] = {} - deps_map: dict[str, ConvertibleToAssetDep] = {} + deps_map: dict[str, ConvertibleToAssetKey] = {} asset_outs: dict[str, ConvertibleToAssetOut] = {} with self.instance(environment, "to_asset_outs") as instance: @@ -52,14 +55,15 @@ def to_asset_outs( internal_asset_deps.add(dep_asset_key_str) else: + # External dependency - create IntermediateAssetDep table = translator.get_asset_key_str(dep.fqn) - key = translator.get_asset_key( - context, dep.fqn - ).to_user_string() - internal_asset_deps.add(key) + key = translator.get_asset_key(context, dep.fqn) + internal_asset_deps.add(key.to_user_string()) - # create an external dep - deps_map[table] = translator.create_asset_dep(key=key) + # Create lazy intermediate representation for caching + deps_map[table] = translator.create_asset_dep( + key=key.to_user_string() + ) model_key = translator.get_asset_key_str(model.fqn) asset_outs[model_key] = translator.create_asset_out( diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 50e1cc0..decfd6c 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -25,9 +25,7 @@ ContextFactory, ) from dagster_sqlmesh.controller.dagster import DagsterSQLMeshController - -if t.TYPE_CHECKING: - from dagster_sqlmesh.translator import SQLMeshDagsterTranslator +from dagster_sqlmesh.translator import SQLMeshDagsterTranslator logger = logging.getLogger(__name__) @@ -101,9 +99,9 @@ def from_dagster_metadata( # convert metadata values converted: dict[str, dg.MetadataValue] = {} for key, value in metadata.items(): - assert isinstance( - value, dg.MetadataValue - ), f"Expected MetadataValue for {key}, got {type(value)}" + assert isinstance(value, dg.MetadataValue), ( + f"Expected MetadataValue for {key}, got {type(value)}" + ) converted[key] = value return cls.model_validate( @@ -224,7 +222,7 @@ def stop_promotion(self) -> None: def plan(self, batches: dict[Snapshot, int]) -> None: self._batches = batches - self._count: dict[Snapshot, int] = {} + self._count = {} for snapshot, _ in self._batches.items(): self._count[snapshot] = 0 @@ -331,7 +329,7 @@ def __init__( models_map: dict[str, Model], dag: DAG[t.Any], prefix: str, - translator: "SQLMeshDagsterTranslator", + translator: SQLMeshDagsterTranslator, is_testing: bool = False, materializations_enabled: bool = True, ) -> None: @@ -423,9 +421,9 @@ def create_materialize_result( ) last_materialization_status = None else: - assert ( - last_materialization.asset_materialization is not None - ), "Expected asset materialization to be present." + assert last_materialization.asset_materialization is not None, ( + "Expected asset materialization to be present." + ) try: last_materialization_status = ( ModelMaterializationStatus.from_dagster_metadata( @@ -496,7 +494,9 @@ def report_event(self, event: console.ConsoleEvent) -> None: log_context.info( "Snapshot progress complete", { - "asset_key": self._translator.get_asset_key_str(snapshot.model.name), + "asset_key": self._translator.get_asset_key_str( + snapshot.model.name + ), }, ) self._tracker.update_run(snapshot) @@ -504,7 +504,9 @@ def report_event(self, event: console.ConsoleEvent) -> None: log_context.info( "Snapshot progress update", { - "asset_key": self._translator.get_asset_key_str(snapshot.model.name), + "asset_key": self._translator.get_asset_key_str( + snapshot.model.name + ), "progress": f"{done}/{expected}", "duration_ms": duration_ms, }, @@ -580,6 +582,15 @@ def errors(self) -> list[Exception]: class SQLMeshResource(dg.ConfigurableResource): + """Dagster resource for executing SQLMesh plan and run operations. + + The translator is obtained from `config.get_translator()` to ensure + consistency between asset definition loading and runtime execution. + + Attributes: + is_testing: Whether the resource is being used in a testing context. + """ + is_testing: bool = False def run( @@ -599,16 +610,14 @@ def run( run_options: RunOptions | None = None, materializations_enabled: bool = True, ) -> t.Iterable[dg.MaterializeResult[t.Any]]: - """Execute SQLMesh based on the configuration given""" + """Execute SQLMesh plan and run, yielding MaterializeResult for each model.""" plan_options = plan_options or {} run_options = run_options or {} logger = context.log controller = self.get_controller( - config=config, - context_factory=context_factory, - log_override=logger + config=config, context_factory=context_factory, log_override=logger ) with controller.instance(environment) as mesh: @@ -620,7 +629,9 @@ def run( [model.fqn for model, _ in mesh.non_external_models_dag()] ) selected_models_set, models_map, select_models = ( - self._get_selected_models_from_context(context=context, config=config, models=models) + self._get_selected_models_from_context( + context=context, config=config, models=models + ) ) if all_available_models == selected_models_set or select_models is None: @@ -696,6 +707,7 @@ def create_event_handler( is_testing: bool, materializations_enabled: bool, ) -> DagsterSQLMeshEventHandler: + """Create an event handler for processing SQLMesh console events.""" translator = config.get_translator() return DagsterSQLMeshEventHandler( context=context, @@ -708,14 +720,17 @@ def create_event_handler( ) def _get_selected_models_from_context( - self, - context: dg.AssetExecutionContext, + self, + context: dg.AssetExecutionContext, config: SQLMeshContextConfig, - models: MappingProxyType[str, Model] + models: MappingProxyType[str, Model], ) -> tuple[set[str], dict[str, Model], list[str] | None]: + """Get the selected models from the execution context.""" models_map = models.copy() try: - selected_output_names = set(context.op_execution_context.selected_output_names) + selected_output_names = set( + context.op_execution_context.selected_output_names + ) except (DagsterInvalidPropertyError, AttributeError) as e: # Special case for direct execution context when testing. This is related to: # https://github.com/dagster-io/dagster/issues/23633 @@ -744,6 +759,7 @@ def get_controller( context_factory: ContextFactory[ContextCls], log_override: logging.Logger | None = None, ) -> DagsterSQLMeshController[ContextCls]: + """Get a SQLMesh controller for executing operations.""" return DagsterSQLMeshController.setup_with_config( config=config, context_factory=context_factory, diff --git a/dagster_sqlmesh/translator.py b/dagster_sqlmesh/translator.py index 7b3d879..e05b332 100644 --- a/dagster_sqlmesh/translator.py +++ b/dagster_sqlmesh/translator.py @@ -1,81 +1,95 @@ import typing as t from collections.abc import Sequence +from dataclasses import dataclass, field from inspect import signature -from dagster import AssetDep, AssetKey, AssetOut -from pydantic import BaseModel, Field +from dagster import AssetKey, AssetOut, ConfigurableResource from sqlglot import exp from sqlmesh.core.context import Context from sqlmesh.core.model import Model -from .types import ConvertibleToAssetDep, ConvertibleToAssetOut +from .types import ConvertibleToAssetKey, ConvertibleToAssetOut -class IntermediateAssetOut(BaseModel): +@dataclass +class IntermediateAssetOut: + """Intermediate representation of an AssetOut for lazy evaluation. + + Stores information to create an AssetOut but defers creation until + `to_asset_out()` is called. Useful for caching during asset loading. + """ + model_key: str asset_key: str tags: t.Mapping[str, str] | None = None is_required: bool = True group_name: str | None = None kinds: set[str] | None = None - kwargs: dict[str, t.Any] = Field(default_factory=dict) + kwargs: dict[str, t.Any] = field(default_factory=dict) def to_asset_out(self) -> AssetOut: + """Convert to a Dagster AssetOut.""" asset_key = AssetKey.from_user_string(self.asset_key) + kinds = self.kinds if "kinds" not in signature(AssetOut).parameters: - self.kinds = None + kinds = None return AssetOut( key=asset_key, tags=self.tags, is_required=self.is_required, group_name=self.group_name, - kinds=self.kinds, + kinds=kinds, **self.kwargs, ) -class IntermediateAssetDep(BaseModel): +@dataclass +class IntermediateAssetDep: + """Intermediate representation of an external dependency for lazy evaluation. + + Converts to AssetKey for use in containerized code locations. + """ + key: str - kwargs: dict[str, t.Any] = Field(default_factory=dict) - def to_asset_dep(self) -> AssetDep: - return AssetDep(AssetKey.from_user_string(self.key)) + def to_asset_key(self) -> AssetKey: + """Convert to a Dagster AssetKey.""" + return AssetKey.from_user_string(self.key) -class SQLMeshDagsterTranslator: +class SQLMeshDagsterTranslator(ConfigurableResource): """Translates SQLMesh objects for Dagster. - - This class provides methods to convert SQLMesh models and metadata into - Dagster-compatible formats. It can be subclassed to customize the translation - behavior, such as changing asset key generation or grouping logic. - - The translator is used throughout the dagster-sqlmesh integration, including - in the DagsterSQLMeshEventHandler and asset generation process. + + Converts SQLMesh models and metadata into Dagster-compatible formats. + Can be subclassed to customize translation behavior such as asset key + generation or grouping logic. + + Custom attributes must be declared as Pydantic fields (not set in __init__). """ def get_asset_key(self, context: Context, fqn: str) -> AssetKey: """Get the Dagster AssetKey for a SQLMesh model. - + Args: - context: The SQLMesh context (unused in default implementation) + context: The SQLMesh context fqn: Fully qualified name of the SQLMesh model - + Returns: - AssetKey: The Dagster asset key for this model + The Dagster asset key for this model """ path = self.get_asset_key_name(fqn) return AssetKey(path) def get_asset_key_name(self, fqn: str) -> Sequence[str]: """Parse a fully qualified name into asset key components. - + Args: - fqn: Fully qualified name of the SQLMesh model (e.g., "catalog.schema.table") - + fqn: Fully qualified name (e.g., "catalog.schema.table") + Returns: - Sequence[str]: Asset key components [catalog, schema, table] + Asset key components [catalog, schema, table] """ table = exp.to_table(fqn) asset_key_name = [table.catalog, table.db, table.name] @@ -84,58 +98,51 @@ def get_asset_key_name(self, fqn: str) -> Sequence[str]: def get_group_name(self, context: Context, model: Model) -> str: """Get the Dagster asset group name for a SQLMesh model. - + Args: - context: The SQLMesh context (unused in default implementation) + context: The SQLMesh context model: The SQLMesh model - + Returns: - str: The asset group name (defaults to the schema/database name) + The asset group name (defaults to the schema/database name) """ path = self.get_asset_key_name(model.fqn) return path[-2] def get_context_dialect(self, context: Context) -> str: """Get the SQL dialect used by the SQLMesh context. - + Args: context: The SQLMesh context - + Returns: - str: The SQL dialect name (e.g., "duckdb", "postgres", etc.) + The SQL dialect name (e.g., "duckdb", "postgres") """ return context.engine_adapter.dialect - def create_asset_dep(self, *, key: str, **kwargs: t.Any) -> ConvertibleToAssetDep: - """Create an object that resolves to an AssetDep. + def create_asset_dep(self, *, key: str) -> ConvertibleToAssetKey: + """Create an IntermediateAssetDep for an external dependency. - This creates an intermediate representation that can be converted to a - Dagster AssetDep. Most users will not need to use this method directly. - Args: key: The asset key string for the dependency - **kwargs: Additional arguments to pass to the AssetDep - + Returns: - ConvertibleToAssetDep: An object that can be converted to an AssetDep + An object that can be converted to an AssetKey """ - return IntermediateAssetDep(key=key, kwargs=kwargs) + return IntermediateAssetDep(key=key) def create_asset_out( self, *, model_key: str, asset_key: str, **kwargs: t.Any ) -> ConvertibleToAssetOut: - """Create an object that resolves to an AssetOut. + """Create an IntermediateAssetOut for a model. - This creates an intermediate representation that can be converted to a - Dagster AssetOut. Most users will not need to use this method directly. - Args: model_key: Internal key for the SQLMesh model asset_key: The asset key string for the output - **kwargs: Additional arguments including tags, group_name, kinds, etc. - + **kwargs: Additional arguments (tags, group_name, kinds, etc.) + Returns: - ConvertibleToAssetOut: An object that can be converted to an AssetOut + An object that can be converted to an AssetOut """ return IntermediateAssetOut( model_key=model_key, @@ -149,39 +156,37 @@ def create_asset_out( def get_asset_key_str(self, fqn: str) -> str: """Get asset key string with sqlmesh prefix for internal mapping. - - This creates an internal identifier used to map outputs and dependencies - within the dagster-sqlmesh integration. It will not affect the actual - AssetKeys that users see. The result contains only alphanumeric characters - and underscores, making it safe for internal usage. - + + Creates an internal identifier used to map outputs and dependencies + within the dagster-sqlmesh integration. Does not affect the actual + AssetKeys that users see. The result contains only alphanumeric + characters and underscores, making it safe for internal usage. + Args: fqn: Fully qualified name of the SQLMesh model - + Returns: - str: Internal asset key string with "sqlmesh__" prefix + Internal asset key string with "sqlmesh__" prefix """ table = exp.to_table(fqn) asset_key_name = [table.catalog, table.db, table.name] - + return "sqlmesh__" + "_".join(asset_key_name) def get_tags(self, context: Context, model: Model) -> dict[str, str]: """Get Dagster asset tags for a SQLMesh model. - + Args: - context: The SQLMesh context (unused in default implementation) + context: The SQLMesh context model: The SQLMesh model - + Returns: - dict[str, str]: Dictionary of tags to apply to the Dagster asset. - Default implementation converts SQLMesh model tags to - empty string values, which causes the Dagster UI to - render them as labels rather than key-value pairs. - - Note: - Tags must contain only strings as keys and values. The Dagster UI - will render tags with empty string values as "labels" rather than + Dictionary of tags to apply to the Dagster asset. Default + converts SQLMesh model tags to empty string values, which + causes the Dagster UI to render them as labels rather than key-value pairs. + + Note: + Tags must contain only strings as keys and values. """ return {k: "" for k in model.tags} diff --git a/dagster_sqlmesh/types.py b/dagster_sqlmesh/types.py index 50187b5..4211e18 100644 --- a/dagster_sqlmesh/types.py +++ b/dagster_sqlmesh/types.py @@ -1,11 +1,12 @@ import typing as t from dataclasses import dataclass, field -from dagster import AssetCheckResult, AssetDep, AssetKey, AssetMaterialization, AssetOut +from dagster import ( + AssetKey, + AssetOut, +) from sqlmesh.core.model import Model -MultiAssetResponse = t.Iterable[AssetCheckResult | AssetMaterialization] - @dataclass(kw_only=True) class SQLMeshParsedFQN: @@ -30,43 +31,43 @@ class SQLMeshModelDep: def parse_fqn(self) -> SQLMeshParsedFQN: return SQLMeshParsedFQN.parse(self.fqn) + class ConvertibleToAssetOut(t.Protocol): def to_asset_out(self) -> AssetOut: """Convert to an AssetOut object.""" ... -class ConvertibleToAssetDep(t.Protocol): - def to_asset_dep(self) -> AssetDep: - """Convert to an AssetDep object.""" - ... class ConvertibleToAssetKey(t.Protocol): + """Protocol for objects that can be lazily converted to AssetKey.""" + def to_asset_key(self) -> AssetKey: + """Convert to an AssetKey object.""" ... + @dataclass(kw_only=True) class SQLMeshMultiAssetOptions: - """Generic class for returning dagster multi asset options from SQLMesh, the - types used are intentionally generic so to allow for potentially using an - intermediate representation of the dagster asset objects. This is most - useful in caching purposes and is done to allow for users of this library to - manipulate the dagster asset creation process as they see fit.""" + """Intermediate representation of Dagster multi-asset options from SQLMesh. + + Uses generic types to allow caching and lazy evaluation during asset loading. + """ outs: t.Mapping[str, ConvertibleToAssetOut] = field(default_factory=lambda: {}) - deps: t.Iterable[ConvertibleToAssetDep] = field(default_factory=lambda: []) + deps: t.Iterable[ConvertibleToAssetKey] = field(default_factory=lambda: []) internal_asset_deps: t.Mapping[str, set[str]] = field(default_factory=lambda: {}) def to_asset_outs(self) -> t.Mapping[str, AssetOut]: """Convert to an iterable of AssetOut objects.""" return {key: out.to_asset_out() for key, out in self.outs.items()} - def to_asset_deps(self) -> t.Iterable[AssetDep]: - """Convert to an iterable of AssetDep objects.""" - return [dep.to_asset_dep() for dep in self.deps] + def to_asset_deps(self) -> t.Iterable[AssetKey]: + """Convert dependencies to AssetKey objects.""" + return [dep.to_asset_key() for dep in self.deps] def to_internal_asset_deps(self) -> dict[str, set[AssetKey]]: """Convert to a dictionary of internal asset dependencies.""" return { key: {AssetKey.from_user_string(dep) for dep in deps} for key, deps in self.internal_asset_deps.items() - } \ No newline at end of file + } diff --git a/sample/dagster_project/definitions.py b/sample/dagster_project/definitions.py index 669d4a7..d084517 100644 --- a/sample/dagster_project/definitions.py +++ b/sample/dagster_project/definitions.py @@ -32,7 +32,7 @@ class CustomSQLMeshContextConfig(SQLMeshContextConfig): custom_key: str def get_translator(self): - return RewrittenSQLMeshTranslator(self.custom_key) + return RewrittenSQLMeshTranslator(custom_key=self.custom_key) sqlmesh_config = CustomSQLMeshContextConfig( path=SQLMESH_PROJECT_PATH, @@ -45,10 +45,13 @@ class RewrittenSQLMeshTranslator(SQLMeshDagsterTranslator): sqlmesh project and only uses the table db and name We include this as a test of the translator functionality. + + Note: Since SQLMeshDagsterTranslator extends ConfigurableResource, custom + attributes must be declared as Pydantic fields (not set in __init__). """ - def __init__(self, custom_key: str): - self.custom_key = custom_key + # Declare custom_key as a Pydantic field (ConfigurableResource is frozen) + custom_key: str def get_asset_key(self, context: Context, fqn: str) -> AssetKey: table = exp.to_table(fqn) # Ensure fqn is a valid table expression