Skip to content

Commit 170cc68

Browse files
feat(datasets): make table write mode configurable (#1093)
Signed-off-by: gitgud5000 <[email protected]> Signed-off-by: Deepyaman Datta <[email protected]> Co-authored-by: Deepyaman Datta <[email protected]>
1 parent c297ad0 commit 170cc68

File tree

4 files changed

+218
-11
lines changed

4 files changed

+218
-11
lines changed

kedro-datasets/RELEASE.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Upcoming Release
2+
23
## Major features and improvements
34

45
- Group datasets documentation according to the dependencies to clean up the nav bar.
@@ -29,14 +30,16 @@
2930
| ------------------------------ | ------------------------------------------------------------- | ------------------------------------ |
3031
| `polars.PolarsDatabaseDataset` | A dataset to load and save data to a SQL backend using Polars | `kedro_datasets_experimental.polars` |
3132

33+
- Added `mode` save argument to `ibis.TableDataset`, supporting "append", "overwrite", "error"/"errorifexists", and "ignore" save modes. The deprecated `overwrite` save argument is mapped to `mode` for backward compatibility and will be removed in a future release. Specifying both `mode` and `overwrite` results in an error.
34+
3235
## Bug fixes and other changes
3336

3437
- Added primary key constraint to BaseTable.
3538
- Added save/load with `use_pyarrow=True` save_args for LazyPolarsDataset partitioned parquet files.
3639
- Updated the json schema for Kedro 1.0.0.
3740

38-
## Breaking Changes
3941
## Community contributions
42+
4043
- [Minura Punchihewa](https://github.com/MinuraPunchihewa)
4144
- [gitgud5000](https://github.com/gitgud5000)
4245

@@ -70,7 +73,6 @@ Many thanks to the following Kedroids for contributing PRs to this release:
7073
- [Seohyun Park](https://github.com/soyamimi)
7174
- [Daniel Russell-Brain](https://github.com/killerfridge)
7275

73-
7476
# Release 7.0.0
7577

7678
## Major features and improvements

kedro-datasets/kedro_datasets/ibis/table_dataset.py

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,37 @@
11
"""Provide data loading and saving functionality for Ibis's backends."""
22
from __future__ import annotations
33

4+
import sys
45
from copy import deepcopy
6+
from enum import auto
57
from typing import TYPE_CHECKING, Any, ClassVar
8+
from warnings import warn
9+
10+
if sys.version_info >= (3, 11):
11+
from enum import StrEnum # pragma: no cover
12+
else:
13+
from backports.strenum import StrEnum # pragma: no cover
614

715
import ibis.expr.types as ir
8-
from kedro.io import AbstractDataset
16+
from kedro.io import AbstractDataset, DatasetError
917

18+
from kedro_datasets import KedroDeprecationWarning
1019
from kedro_datasets._utils import ConnectionMixin
1120

1221
if TYPE_CHECKING:
1322
from ibis import BaseBackend
1423

1524

25+
class SaveMode(StrEnum):
26+
"""`SaveMode` is used to specify the expected behavior of saving a table."""
27+
28+
APPEND = auto()
29+
OVERWRITE = auto()
30+
ERROR = auto()
31+
ERRORIFEXISTS = auto()
32+
IGNORE = auto()
33+
34+
1635
class TableDataset(ConnectionMixin, AbstractDataset[ir.Table, ir.Table]):
1736
"""`TableDataset` loads/saves data from/to Ibis table expressions.
1837
@@ -28,14 +47,18 @@ class TableDataset(ConnectionMixin, AbstractDataset[ir.Table, ir.Table]):
2847
database: company.db
2948
save_args:
3049
materialized: table
50+
mode: append
3151
3252
motorbikes:
3353
type: ibis.TableDataset
3454
table_name: motorbikes
3555
connection:
3656
backend: duckdb
3757
database: company.db
38-
```
58+
save_args:
59+
materialized: view
60+
mode: overwrite
61+
```
3962
4063
Using the [Python API](https://docs.kedro.org/en/stable/catalog-data/advanced_data_catalog_usage/):
4164
@@ -62,7 +85,7 @@ class TableDataset(ConnectionMixin, AbstractDataset[ir.Table, ir.Table]):
6285
DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {}
6386
DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = {
6487
"materialized": "view",
65-
"overwrite": True,
88+
"mode": "overwrite",
6689
}
6790

6891
_CONNECTION_GROUP: ClassVar[str] = "ibis"
@@ -109,7 +132,12 @@ def __init__( # noqa: PLR0913
109132
`create_{materialized}` method. By default, ``ir.Table``
110133
objects are materialized as views. To save a table using
111134
a different materialization strategy, supply a value for
112-
`materialized` in `save_args`.
135+
`materialized` in `save_args`. The `mode` parameter controls
136+
the behavior when saving data:
137+
- _"overwrite"_: Overwrite existing data in the table.
138+
- _"append"_: Append contents of the new data to the existing table (does not overwrite).
139+
- _"error"_ or _"errorifexists"_: Throw an exception if the table already exists.
140+
- _"ignore"_: Silently ignore the operation if the table already exists.
113141
metadata: Any arbitrary metadata. This is ignored by Kedro,
114142
but may be consumed by users or external plugins.
115143
"""
@@ -134,6 +162,28 @@ def __init__( # noqa: PLR0913
134162

135163
self._materialized = self._save_args.pop("materialized")
136164

165+
# Handle mode/overwrite conflict.
166+
if save_args and "mode" in save_args and "overwrite" in self._save_args:
167+
raise ValueError("Cannot specify both 'mode' and deprecated 'overwrite'.")
168+
169+
# Map legacy overwrite if present.
170+
if "overwrite" in self._save_args:
171+
warn(
172+
"'overwrite' is deprecated and will be removed in a future release. "
173+
"Please use 'mode' instead.",
174+
KedroDeprecationWarning,
175+
stacklevel=2,
176+
)
177+
legacy = self._save_args.pop("overwrite")
178+
# Remove any lingering 'mode' key from defaults to avoid
179+
# leaking into writer kwargs.
180+
del self._save_args["mode"]
181+
mode = "overwrite" if legacy else "error"
182+
else:
183+
mode = self._save_args.pop("mode")
184+
185+
self._mode = SaveMode(mode)
186+
137187
def _connect(self) -> BaseBackend:
138188
import ibis # noqa: PLC0415
139189

@@ -151,7 +201,21 @@ def load(self) -> ir.Table:
151201

152202
def save(self, data: ir.Table) -> None:
153203
writer = getattr(self.connection, f"create_{self._materialized}")
154-
writer(self._table_name, data, **self._save_args)
204+
if self._mode == "append":
205+
if not self._exists():
206+
writer(self._table_name, data, overwrite=False, **self._save_args)
207+
elif hasattr(self.connection, "insert"):
208+
self.connection.insert(self._table_name, data, **self._save_args)
209+
else:
210+
raise DatasetError(
211+
f"The {self.connection.name} backend for Ibis does not support inserts."
212+
)
213+
elif self._mode == "overwrite":
214+
writer(self._table_name, data, overwrite=True, **self._save_args)
215+
elif self._mode in {"error", "errorifexists"}:
216+
writer(self._table_name, data, overwrite=False, **self._save_args)
217+
elif self._mode == "ignore" and not self._exists():
218+
writer(self._table_name, data, overwrite=False, **self._save_args)
155219

156220
def _describe(self) -> dict[str, Any]:
157221
load_args = deepcopy(self._load_args)
@@ -165,6 +229,7 @@ def _describe(self) -> dict[str, Any]:
165229
"load_args": load_args,
166230
"save_args": save_args,
167231
"materialized": self._materialized,
232+
"mode": self._mode,
168233
}
169234

170235
def _exists(self) -> bool:

kedro-datasets/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ description = "Kedro-Datasets is where you can find all of Kedro's data connecto
1111
requires-python = ">=3.10"
1212
license = {text = "Apache Software License (Apache 2.0)"}
1313
dependencies = [
14+
"backports.strenum; python_version < '3.11'",
1415
"kedro>=1.0.0rc1, <2.0.0",
1516
"lazy_loader",
1617
]

kedro-datasets/tests/ibis/test_table_dataset.py

Lines changed: 143 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import duckdb
22
import ibis
3+
import pandas as pd
34
import pytest
5+
from kedro.io import DatasetError
46
from packaging.version import Version
57
from pandas.testing import assert_frame_equal
68

9+
from kedro_datasets import KedroDeprecationWarning
710
from kedro_datasets.ibis import FileDataset, TableDataset
811

912
_SENTINEL = object()
@@ -37,13 +40,15 @@ def connection_config(request, database):
3740

3841
@pytest.fixture
3942
def table_dataset(database_name, connection_config, load_args, save_args):
40-
return TableDataset(
43+
ds = TableDataset(
4144
table_name="test",
4245
database=database_name,
4346
connection=connection_config,
4447
load_args=load_args,
4548
save_args=save_args,
4649
)
50+
yield ds
51+
getattr(ds._connection, f"drop_{ds._materialized}")("test", force=True)
4752

4853

4954
@pytest.fixture
@@ -77,9 +82,10 @@ def test_save_and_load(self, table_dataset, dummy_table, database):
7782
assert "test" in con.sql("SELECT * FROM duckdb_views").fetchnumpy()["view_name"]
7883

7984
@pytest.mark.parametrize(
80-
"connection_config", [{"backend": "polars"}], indirect=True
85+
("connection_config", "save_args"),
86+
[({"backend": "polars"}, {"materialized": "table"})],
87+
indirect=True,
8188
)
82-
@pytest.mark.parametrize("save_args", [{"materialized": "table"}], indirect=True)
8389
def test_save_and_load_polars(
8490
self, table_dataset, connection_config, save_args, dummy_table
8591
):
@@ -102,14 +108,147 @@ def test_exists(self, table_dataset, dummy_table):
102108
table_dataset.save(dummy_table)
103109
assert table_dataset.exists()
104110

111+
@pytest.mark.parametrize(
112+
"save_args", [{"materialized": "table", "mode": "append"}], indirect=True
113+
)
114+
def test_save_mode_append(self, table_dataset, dummy_table):
115+
"""Saving with mode=append should add rows to an existing table."""
116+
df1 = dummy_table
117+
df2 = dummy_table
118+
119+
table_dataset.save(df1)
120+
table_dataset.save(df2)
121+
122+
df1 = df1.execute()
123+
df2 = df2.execute()
124+
reloaded = table_dataset.load().execute()
125+
assert len(reloaded) == len(df1) + len(df2)
126+
127+
@pytest.mark.parametrize(
128+
"save_args",
129+
[
130+
{"materialized": "table", "mode": "error"},
131+
{"materialized": "table", "mode": "errorifexists"},
132+
],
133+
indirect=True,
134+
)
135+
def test_save_mode_error_variants(self, table_dataset, dummy_table):
136+
"""Saving with error/errorifexists should raise when table exists."""
137+
table_dataset.save(dummy_table)
138+
with pytest.raises(DatasetError, match='Table with name "test" already exists'):
139+
table_dataset.save(dummy_table)
140+
141+
@pytest.mark.parametrize(
142+
"save_args", [{"materialized": "table", "mode": "ignore"}], indirect=True
143+
)
144+
def test_save_mode_ignore(self, table_dataset, dummy_table):
145+
"""Saving with ignore should not change existing table."""
146+
df1 = dummy_table
147+
df2 = dummy_table
148+
149+
table_dataset.save(df1)
150+
table_dataset.save(df2)
151+
df1 = df1.execute()
152+
153+
reloaded = table_dataset.load().execute()
154+
# Should remain as first save only
155+
assert_frame_equal(reloaded.reset_index(drop=True), df1.reset_index(drop=True))
156+
157+
def test_unsupported_save_mode_raises(self, database_name, connection_config):
158+
"""Providing an unsupported save mode should raise a DatasetError."""
159+
with pytest.raises(
160+
ValueError, match="'unsupported_mode' is not a valid SaveMode"
161+
):
162+
TableDataset(
163+
table_name="unsupported_mode",
164+
database=database_name,
165+
connection=connection_config,
166+
save_args={"materialized": "table", "mode": "unsupported_mode"},
167+
)
168+
169+
def test_legacy_overwrite_conflict_raises(self, database_name, connection_config):
170+
"""Providing both mode and overwrite should raise a ValueError."""
171+
with pytest.raises(ValueError):
172+
TableDataset(
173+
table_name="conflict",
174+
database=database_name,
175+
connection=connection_config,
176+
save_args={
177+
"materialized": "table",
178+
"mode": "append",
179+
"overwrite": True,
180+
},
181+
)
182+
183+
def test_legacy_overwrite_deprecation_warning(
184+
self, database_name, connection_config
185+
):
186+
"""Using legacy overwrite should raise a deprecation warning."""
187+
with pytest.warns(KedroDeprecationWarning, match="'overwrite' is deprecated"):
188+
TableDataset(
189+
table_name="deprecated_overwrite",
190+
database=database_name,
191+
connection=connection_config,
192+
save_args={"overwrite": True},
193+
)
194+
195+
@pytest.mark.parametrize(
196+
("connection_config", "save_args"),
197+
[({"backend": "polars"}, {"materialized": "table", "mode": "append"})],
198+
indirect=True,
199+
)
200+
def test_append_mode_no_insert_raises(self, table_dataset, dummy_table):
201+
"""Test that saving with mode=append on a backend without 'insert' raises DatasetError (polars backend)."""
202+
# Save once to create the table
203+
table_dataset.save(dummy_table)
204+
# Try to append again, should raise DatasetError
205+
with pytest.raises(DatasetError, match="does not support inserts"):
206+
table_dataset.save(dummy_table)
207+
208+
@pytest.mark.parametrize(
209+
"save_args",
210+
[
211+
{"materialized": "table", "overwrite": True},
212+
{"materialized": "table", "overwrite": False},
213+
],
214+
indirect=True,
215+
)
216+
def test_legacy_overwrite_behavior(self, table_dataset, save_args, dummy_table):
217+
"""Legacy overwrite should map to overwrite or error behavior."""
218+
legacy_overwrite = save_args["overwrite"]
219+
df2 = ibis.memtable(pd.DataFrame({"col1": [7], "col2": [8], "col3": [9]}))
220+
221+
table_dataset.save(dummy_table) # First save should always work
222+
if legacy_overwrite:
223+
# Should overwrite existing table with new contents
224+
table_dataset.save(df2)
225+
df2 = df2.execute()
226+
out = table_dataset.load().execute().reset_index(drop=True)
227+
assert_frame_equal(out, df2.reset_index(drop=True))
228+
else:
229+
# Should raise on second save when table exists
230+
with pytest.raises(DatasetError):
231+
table_dataset.save(df2)
232+
233+
def test_describe_includes_backend_mode_and_materialized(self, table_dataset):
234+
"""_describe should expose backend, mode and materialized; nested args exclude database."""
235+
236+
desc = table_dataset._describe()
237+
238+
assert {"backend", "mode", "materialized"} <= desc.keys()
239+
assert "database" in desc
240+
# database key should not be duplicated inside nested args
241+
assert "database" not in desc["load_args"]
242+
assert "database" not in desc["save_args"]
243+
105244
@pytest.mark.parametrize("load_args", [{"database": "test"}], indirect=True)
106245
def test_load_extra_params(self, table_dataset, load_args):
107246
"""Test overriding the default load arguments."""
108247
for key, value in load_args.items():
109248
assert table_dataset._load_args[key] == value
110249

111250
@pytest.mark.parametrize("save_args", [{"materialized": "table"}], indirect=True)
112-
def test_save_extra_params(self, table_dataset, save_args, dummy_table, database):
251+
def test_save_extra_params(self, table_dataset, dummy_table, database):
113252
"""Test overriding the default save arguments."""
114253
table_dataset.save(dummy_table)
115254

0 commit comments

Comments
 (0)