Skip to content

Commit e371e61

Browse files
committed
fix: more fixes
1 parent 63060e5 commit e371e61

File tree

1 file changed

+67
-25
lines changed

1 file changed

+67
-25
lines changed

dagster_sqlmesh/resource.py

Lines changed: 67 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@
2929

3030
logger = logging.getLogger(__name__)
3131

32+
3233
def _START_OF_UNIX_TIME():
33-
dt = datetime.strptime(
34-
"1970-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"
35-
)
34+
dt = datetime.strptime("1970-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ")
3635
return dt.astimezone(UTC)
3736

3837

@@ -72,7 +71,9 @@ def as_dagster_metadata(
7271
last_promoted = dg.MetadataValue.timestamp(
7372
max(previous.last_promoted, self.last_promoted)
7473
)
75-
last_backfill = dg.MetadataValue.timestamp(max(previous.last_backfill, self.last_backfill))
74+
last_backfill = dg.MetadataValue.timestamp(
75+
max(previous.last_backfill, self.last_backfill)
76+
)
7677
created_at = dg.MetadataValue.timestamp(previous.created_at)
7778
else:
7879
# If there is no previous materialization status all dates can use
@@ -98,21 +99,25 @@ def from_dagster_metadata(
9899
# convert metadata values
99100
converted: dict[str, dg.MetadataValue] = {}
100101
for key, value in metadata.items():
101-
assert isinstance(value, dg.MetadataValue), f"Expected MetadataValue for {key}, got {type(value)}"
102+
assert isinstance(
103+
value, dg.MetadataValue
104+
), f"Expected MetadataValue for {key}, got {type(value)}"
102105
converted[key] = value
103106

104-
return cls.model_validate(dict(
105-
model_fqn=converted["model_fqn"].value,
106-
snapshot_id=converted["snapshot_id"].value,
107-
created_at=converted["created_at"].value,
108-
last_updated_or_restated=converted["last_updated_or_restated"].value,
109-
last_promoted=converted["last_promoted"].value,
110-
last_backfill=converted["last_backfill"].value,
111-
))
112-
107+
return cls.model_validate(
108+
dict(
109+
model_fqn=converted["model_fqn"].value,
110+
snapshot_id=converted["snapshot_id"].value,
111+
created_at=converted["created_at"].value,
112+
last_updated_or_restated=converted["last_updated_or_restated"].value,
113+
last_promoted=converted["last_promoted"].value,
114+
last_backfill=converted["last_backfill"].value,
115+
)
116+
)
117+
113118
def as_glot_table(self) -> exp.Table:
114119
return sqlglot.to_table(self.model_fqn)
115-
120+
116121
def is_match(self, input: str, ignore_catalog: bool = False) -> bool:
117122
"""Tests if the passed in string matches this model's table
118123
@@ -133,7 +138,7 @@ def is_match(self, input: str, ignore_catalog: bool = False) -> bool:
133138
return False
134139
if input_as_table.db != table.db:
135140
return False
136-
141+
137142
if not ignore_catalog:
138143
if input_as_table.catalog != table.catalog:
139144
return False
@@ -251,12 +256,17 @@ def notify_queue_next(self) -> tuple[str, ModelMaterializationStatus] | None:
251256

252257
if model_name_for_notification in self._non_model_names:
253258
self._current_index += 1
254-
self.logger.debug(f"skipping non-model snapshot {model_name_for_notification}")
259+
self.logger.debug(
260+
f"skipping non-model snapshot {model_name_for_notification}"
261+
)
255262
continue
256263

257264
if model_name_for_notification in self._model_metadata:
258265
self._current_index += 1
259-
return (model_name_for_notification, self._model_metadata[model_name_for_notification])
266+
return (
267+
model_name_for_notification,
268+
self._model_metadata[model_name_for_notification],
269+
)
260270
return None
261271

262272

@@ -320,7 +330,23 @@ def __init__(
320330
dag: DAG[t.Any],
321331
prefix: str,
322332
is_testing: bool = False,
333+
materializations_enabled: bool = True,
323334
) -> None:
335+
"""Dagster event handler for SQLMesh models.
336+
337+
The handler is responsible for reporting events from sqlmesh to dagster.
338+
339+
Args:
340+
context: The Dagster asset execution context.
341+
models_map: A mapping of model names to their SQLMesh model instances.
342+
dag: The directed acyclic graph representing the SQLMesh models.
343+
prefix: A prefix to use for all asset keys generated by this handler.
344+
is_testing: Whether the handler is being used in a testing context.
345+
materializations_enabled: Whether the handler is to generate
346+
materializations, this should be disabled if you with to run a
347+
sqlmesh plan or run in an environment different from the normal
348+
target environment.
349+
"""
324350
self._models_map = models_map
325351
self._prefix = prefix
326352
self._context = context
@@ -331,6 +357,7 @@ def __init__(
331357
self._stage = "plan"
332358
self._errors: list[Exception] = []
333359
self._is_testing = is_testing
360+
self._materializations_enabled = materializations_enabled
334361

335362
def process_events(self, event: console.ConsoleEvent) -> None:
336363
self.report_event(event)
@@ -363,7 +390,14 @@ def notify_success(
363390
)
364391
else:
365392
asset_key = self._context.asset_key_for_output(output_key)
366-
yield self.create_materialize_result(self._context, asset_key, materialization_status)
393+
if self._materializations_enabled:
394+
yield self.create_materialize_result(
395+
self._context, asset_key, materialization_status
396+
)
397+
else:
398+
self._logger.debug(
399+
f"Materializations disabled. Would have materialized for {asset_key.to_user_string()}"
400+
)
367401
notify = self._tracker.notify_queue_next()
368402
else:
369403
self._logger.debug("No more materializations to process")
@@ -384,12 +418,14 @@ def create_materialize_result(
384418
)
385419
last_materialization_status = None
386420
else:
387-
assert last_materialization.asset_materialization is not None, (
388-
"Expected asset materialization to be present."
389-
)
421+
assert (
422+
last_materialization.asset_materialization is not None
423+
), "Expected asset materialization to be present."
390424
try:
391-
last_materialization_status = ModelMaterializationStatus.from_dagster_metadata(
392-
dict(last_materialization.asset_materialization.metadata)
425+
last_materialization_status = (
426+
ModelMaterializationStatus.from_dagster_metadata(
427+
dict(last_materialization.asset_materialization.metadata)
428+
)
393429
)
394430
except Exception as e:
395431
self._logger.warning(
@@ -399,7 +435,9 @@ def create_materialize_result(
399435

400436
return dg.MaterializeResult(
401437
asset_key=asset_key,
402-
metadata=current_materialization_status.as_dagster_metadata(last_materialization_status),
438+
metadata=current_materialization_status.as_dagster_metadata(
439+
last_materialization_status
440+
),
403441
)
404442

405443
def report_event(self, event: console.ConsoleEvent) -> None:
@@ -554,6 +592,7 @@ def run(
554592
skip_run: bool = False,
555593
plan_options: PlanOptions | None = None,
556594
run_options: RunOptions | None = None,
595+
materializations_enabled: bool = True,
557596
) -> t.Iterable[dg.MaterializeResult]:
558597
"""Execute SQLMesh based on the configuration given"""
559598
plan_options = plan_options or {}
@@ -592,6 +631,7 @@ def run(
592631
dag=dag,
593632
prefix="sqlmesh: ",
594633
is_testing=self.is_testing,
634+
materializations_enabled=materializations_enabled,
595635
)
596636

597637
def raise_for_sqlmesh_errors(
@@ -645,13 +685,15 @@ def create_event_handler(
645685
models_map: dict[str, Model],
646686
prefix: str,
647687
is_testing: bool,
688+
materializations_enabled: bool,
648689
) -> DagsterSQLMeshEventHandler:
649690
return DagsterSQLMeshEventHandler(
650691
context=context,
651692
dag=dag,
652693
models_map=models_map,
653694
prefix=prefix,
654695
is_testing=is_testing,
696+
materializations_enabled=materializations_enabled,
655697
)
656698

657699
def _get_selected_models_from_context(

0 commit comments

Comments
 (0)