Skip to content

Commit a759712

Browse files
authored
Fix: Catalog creation / deletion for motherduck (#5223)
1 parent 87f724a commit a759712

File tree

4 files changed

+75
-29
lines changed

4 files changed

+75
-29
lines changed

sqlmesh/core/config/connection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,10 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
537537
connection_str += f"{'&' if self.database else '?'}motherduck_token={self.token}"
538538
return {"database": connection_str, "config": custom_user_agent_config}
539539

540+
@property
541+
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
542+
return {"is_motherduck": True}
543+
540544

541545
class DuckDBConnectionConfig(BaseDuckDBConnectionConfig):
542546
"""Configuration for the DuckDB connection."""

sqlmesh/core/engine_adapter/duckdb.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,30 @@ def set_current_catalog(self, catalog: str) -> None:
4747
self.execute(exp.Use(this=exp.to_identifier(catalog)))
4848

4949
def _create_catalog(self, catalog_name: exp.Identifier) -> None:
50-
db_filename = f"{catalog_name.output_name}.db"
51-
self.execute(
52-
exp.Attach(this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True)
53-
)
50+
if not self._is_motherduck:
51+
db_filename = f"{catalog_name.output_name}.db"
52+
self.execute(
53+
exp.Attach(
54+
this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True
55+
)
56+
)
57+
else:
58+
self.execute(
59+
exp.Create(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True)
60+
)
5461

5562
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
56-
db_file_path = Path(f"{catalog_name.output_name}.db")
57-
self.execute(exp.Detach(this=catalog_name, exists=True))
58-
if db_file_path.exists():
59-
db_file_path.unlink()
63+
if not self._is_motherduck:
64+
db_file_path = Path(f"{catalog_name.output_name}.db")
65+
self.execute(exp.Detach(this=catalog_name, exists=True))
66+
if db_file_path.exists():
67+
db_file_path.unlink()
68+
else:
69+
self.execute(
70+
exp.Drop(
71+
this=exp.Table(this=catalog_name), kind="DATABASE", cascade=True, exists=True
72+
)
73+
)
6074

6175
def _df_to_source_queries(
6276
self,
@@ -198,3 +212,7 @@ def _create_table(
198212
expr.sql(dialect=self.dialect) for expr in partitioned_by_exps
199213
)
200214
self.execute(f"ALTER TABLE {table_name_str} SET PARTITIONED BY ({partitioned_by_str});")
215+
216+
@property
217+
def _is_motherduck(self) -> bool:
218+
return self._extra_config.get("is_motherduck", False)

sqlmesh/core/state_sync/common.py

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import abc
88

99
from dataclasses import dataclass
10+
from sqlglot import exp
1011

1112
from sqlmesh.core.console import Console
1213
from sqlmesh.core.dialect import schema_
@@ -29,7 +30,7 @@ def cleanup_expired_views(
2930
warn_on_delete_failure: bool = False,
3031
console: t.Optional[Console] = None,
3132
) -> None:
32-
expired_schema_environments = [
33+
expired_schema_or_catalog_environments = [
3334
environment
3435
for environment in environments
3536
if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
@@ -45,8 +46,9 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
4546
return default_adapter
4647

4748
catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
49+
schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()
4850

49-
# Drop the schemas for the expired environments
51+
# Collect schemas and catalogs to drop
5052
for engine_adapter, expired_catalog, expired_schema, suffix_target in {
5153
(
5254
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
@@ -58,29 +60,16 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
5860
),
5961
environment.suffix_target,
6062
)
61-
for environment in expired_schema_environments
63+
for environment in expired_schema_or_catalog_environments
6264
for snapshot in environment.snapshots
6365
if snapshot.is_model and not snapshot.is_symbolic
6466
}:
65-
schema = schema_(expired_schema, expired_catalog)
66-
try:
67-
engine_adapter.drop_schema(
68-
schema,
69-
ignore_if_not_exists=True,
70-
cascade=True,
71-
)
72-
73-
if suffix_target.is_catalog and expired_catalog:
67+
if suffix_target.is_catalog:
68+
if expired_catalog:
7469
catalogs_to_drop.add((engine_adapter, expired_catalog))
75-
76-
if console:
77-
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
78-
except Exception as e:
79-
message = f"Failed to drop the expired environment schema '{schema}': {e}"
80-
if warn_on_delete_failure:
81-
logger.warning(message)
82-
else:
83-
raise SQLMeshError(message) from e
70+
else:
71+
schema = schema_(expired_schema, expired_catalog)
72+
schemas_to_drop.add((engine_adapter, schema))
8473

8574
# Drop the views for the expired environments
8675
for engine_adapter, expired_view in {
@@ -105,6 +94,23 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
10594
else:
10695
raise SQLMeshError(message) from e
10796

97+
# Drop the schemas for the expired environments
98+
for engine_adapter, schema in schemas_to_drop:
99+
try:
100+
engine_adapter.drop_schema(
101+
schema,
102+
ignore_if_not_exists=True,
103+
cascade=True,
104+
)
105+
if console:
106+
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
107+
except Exception as e:
108+
message = f"Failed to drop the expired environment schema '{schema}': {e}"
109+
if warn_on_delete_failure:
110+
logger.warning(message)
111+
else:
112+
raise SQLMeshError(message) from e
113+
108114
# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
109115
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
110116
for engine_adapter, catalog in catalogs_to_drop:

tests/core/engine_adapter/test_duckdb.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,31 @@ def test_create_catalog(make_mocked_engine_adapter: t.Callable) -> None:
101101
assert to_sql_calls(adapter) == ["ATTACH IF NOT EXISTS 'foo.db' AS \"foo\""]
102102

103103

104+
def test_create_catalog_motherduck(make_mocked_engine_adapter: t.Callable) -> None:
105+
adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(
106+
DuckDBEngineAdapter, is_motherduck=True
107+
)
108+
adapter.create_catalog(exp.to_identifier("foo"))
109+
110+
assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "foo"']
111+
112+
104113
def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None:
105114
adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(DuckDBEngineAdapter)
106115
adapter.drop_catalog(exp.to_identifier("foo"))
107116

108117
assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"']
109118

110119

120+
def test_drop_catalog_motherduck(make_mocked_engine_adapter: t.Callable) -> None:
121+
adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(
122+
DuckDBEngineAdapter, is_motherduck=True
123+
)
124+
adapter.drop_catalog(exp.to_identifier("foo"))
125+
126+
assert to_sql_calls(adapter) == ['DROP DATABASE IF EXISTS "foo" CASCADE']
127+
128+
111129
def test_ducklake_partitioning(adapter: EngineAdapter, duck_conn, tmp_path):
112130
catalog = "a_ducklake_db"
113131

0 commit comments

Comments
 (0)