Skip to content

Commit 93c97e2

Browse files
committed
add first simple version of using unbound ibis tables in transformations
1 parent 71bedc6 commit 93c97e2

File tree

6 files changed

+69
-3
lines changed

6 files changed

+69
-3
lines changed

.github/workflows/test_common.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ jobs:
110110
pytest tests/pipeline/test_pipeline_extra.py -k arrow ${{ matrix.pytest_args }}
111111
112112
- name: Install pipeline and sources dependencies
113-
run: uv sync ${{ matrix.uv_sync_args }} --extra duckdb --extra cli --extra parquet --extra deltalake --extra sql_database --group sentry-sdk --group pipeline --group sources
113+
run: uv sync ${{ matrix.uv_sync_args }} --extra duckdb --extra cli --extra parquet --extra deltalake --extra sql_database --group sentry-sdk --group pipeline --group sources --group ibis
114114

115115
- name: Run extract and pipeline tests
116116
run: |

dlt/common/destination/dataset.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
if TYPE_CHECKING:
2323
from dlt.common.libs.pandas import DataFrame
2424
from dlt.common.libs.pyarrow import Table as ArrowTable
25-
from dlt.helpers.ibis import BaseBackend as IbisBackend
25+
from dlt.helpers.ibis import BaseBackend as IbisBackend, Table as IbisTable
2626
else:
2727
DataFrame = Any
2828
ArrowTable = Any
2929
IbisBackend = Any
30+
IbisTable = Any
3031

3132

3233
class SupportsReadableRelation:
@@ -193,6 +194,14 @@ def select(self, *columns: str) -> Self:
193194
"""
194195
raise NotImplementedError("`select()` method is not supported for this relation")
195196

197+
def ibis(self) -> IbisTable:
198+
"""Returns an undbound ibis table representing the relation.
199+
200+
Returns:
201+
IbisTable: The ibis table for the relation
202+
"""
203+
raise NotImplementedError("`ibis()` method is not supported for this relation")
204+
196205
@overload
197206
def __getitem__(self, column: str) -> Self: ...
198207

dlt/destinations/dataset/ibis_relation.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,7 @@ def __repr__(self) -> str:
138138

139139
def __str__(self) -> str:
140140
return self._ibis_object.__str__() # type: ignore[no-any-return]
141+
142+
def ibis(self) -> IbisTable:
143+
"""Returns an undbound ibis table representing the relation."""
144+
return self._ibis_object

dlt/destinations/dataset/relation.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
from dlt.common.schema.typing import TTableSchemaColumns
1515
from dlt.common.typing import Self
16-
from dlt.common.exceptions import TypeErrorWithKnownTypes
1716
from dlt.transformations import lineage
1817
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
1918
from dlt.destinations.dataset.exceptions import (
@@ -23,8 +22,10 @@
2322

2423
if TYPE_CHECKING:
2524
from dlt.destinations.dataset.dataset import ReadableDBAPIDataset
25+
from dlt.helpers.ibis import Table as IbisTable
2626
else:
2727
ReadableDBAPIDataset = Any
28+
IbisTable = Any
2829

2930

3031
class BaseReadableDBAPIRelation(SupportsReadableRelation, WithSqlClient):
@@ -326,3 +327,13 @@ def __getitem__(self, columns: Sequence[str]) -> Self:
326327

327328
def head(self, limit: int = 5) -> Self:
328329
return self.limit(limit)
330+
331+
def ibis(self) -> IbisTable:
332+
"""Returns an undbound ibis table representing the relation."""
333+
from dlt.helpers.ibis import create_unbound_ibis_table
334+
335+
return create_unbound_ibis_table(
336+
self._dataset.schema,
337+
self._dataset.dataset_name,
338+
self._table_name,
339+
)

dlt/transformations/transformation.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
TransformationException,
1919
IncompatibleDatasetsException,
2020
)
21+
22+
from dlt.common.exceptions import MissingDependencyException
2123
from dlt.pipeline.exceptions import PipelineConfigMissing
2224
from dlt.destinations.dataset import ReadableDBAPIDataset
2325
from dlt.common.schema.typing import (
@@ -33,6 +35,12 @@
3335
from dlt.extract.exceptions import CurrentSourceNotAvailable
3436
from dlt.extract.pipe_iterator import DataItemWithMeta
3537

38+
try:
39+
from dlt.helpers.ibis import Expr as IbisExpr
40+
from dlt.helpers.ibis import compile_ibis_to_sqlglot
41+
except (ImportError, MissingDependencyException):
42+
IbisExpr = None
43+
3644

3745
class MaterializableSqlModel(SqlModel, WithComputableHints):
3846
# NOTE: we could forward all data access methods to this class
@@ -125,6 +133,10 @@ def transformation_function(*args: Any, **kwargs: Any) -> Iterator[TDataItems]:
125133
relation = datasets[0](unwrapped_item)
126134
except sqlglot.errors.ParseError:
127135
pass
136+
# TODO: after merge of sqlglot based readble relation, we want to nativly support ibis expressions in the constructor of the relation
137+
elif IbisExpr and isinstance(unwrapped_item, IbisExpr):
138+
sql_query = compile_ibis_to_sqlglot(unwrapped_item, datasets[0].sqlglot_dialect)
139+
relation = datasets[0](sql_query.sql(datasets[0].sqlglot_dialect))
128140

129141
# we have something else, so fall back to regular resource behavior
130142
if not relation:

tests/transformations/test_transformations.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,33 @@ def materializable_sql_model(dataset: SupportsReadableDataset[Any]) -> Any:
175175
"id": {"name": "id", "data_type": "bigint", "nullable": False},
176176
"name": {"name": "name", "x-annotation-pii": True, "data_type": "text", "nullable": True},
177177
}
178+
179+
180+
@pytest.mark.parametrize(
181+
"destination_config",
182+
transformation_configs(only_duckdb=True),
183+
ids=lambda x: x.name,
184+
)
185+
def test_ibis_unbound_table_transformation(
186+
destination_config: DestinationTestConfiguration,
187+
) -> None:
188+
fruit_p, dest_p = setup_transformation_pipelines(destination_config)
189+
fruit_p.run(fruitshop_source())
190+
191+
@dlt.transformation()
192+
def materializable_sql_model(dataset: SupportsReadableDataset[Any]) -> Any:
193+
purchases = dataset.purchases.ibis()
194+
customers = dataset.customers.ibis()
195+
yield purchases.join(customers, purchases.customer_id == customers.id)[
196+
["id", "customer_id", "inventory_id", "quantity", "name"]
197+
]
198+
199+
model = list(materializable_sql_model(fruit_p.dataset()))[0]
200+
assert model.relation.arrow().column_names == [
201+
"id",
202+
"customer_id",
203+
"inventory_id",
204+
"quantity",
205+
"name",
206+
]
207+
assert model.relation.arrow().shape == (3, 5)

0 commit comments

Comments
 (0)