Skip to content

Commit 594d1ea

Browse files
authored
feat: add to_lazy_polars to Dataset (#116)
* feat: add to_lazy_polars to Dataset * remove not required variables * add test data * add unit test for exception * allow passing arbitrary transaction_rid * add doc entry * fix docstring * fix: get_last_transaction() now excludes open transactions in to_lazy_polars() * test with explicit transaction passing
1 parent 6a3f536 commit 594d1ea

File tree

9 files changed

+242
-3
lines changed

9 files changed

+242
-3
lines changed

docs/examples/dataset.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,28 @@ print(df)
282282
```
283283
````
284284

285+
### Polars LazyFrame with direct S3-compatible API access
286+
287+
Access dataset files directly via the S3-compatible API as a Polars LazyFrame for efficient lazy evaluation. This method bypasses FoundrySqlServer and works with both regular and hive-partitioned parquet datasets.
288+
289+
````{tab} v2
290+
```python
291+
from foundry_dev_tools import FoundryContext
292+
import polars as pl
293+
294+
ctx = FoundryContext()
295+
ds = ctx.get_dataset_by_path("/path/to/test_dataset")
296+
lazy_df = ds.to_lazy_polars()
297+
298+
# Perform lazy operations (not executed yet)
299+
result = lazy_df.filter(pl.col("age") > 25).select(["name", "age"])
300+
301+
# Execute and collect results
302+
df = result.collect()
303+
print(df)
304+
```
305+
````
306+
285307
### DuckDB Table from Spark SQL dialect
286308

287309
Queries the Foundry SQL server with Spark SQL dialect, load arrow stream using [duckdb](https://duckdb.org/).

libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from foundry_dev_tools.errors.dataset import (
1515
BranchNotFoundError,
1616
DatasetHasNoOpenTransactionError,
17+
DatasetHasNoTransactionsError,
1718
DatasetNotFoundError,
1819
TransactionTypeMismatchError,
1920
)
@@ -261,11 +262,16 @@ def get_transactions(
261262
).json()["values"]
262263
]
263264

264-
def get_last_transaction(self) -> api_types.Transaction | None:
265-
"""Returns the last transaction or None if there are no transactions."""
265+
def get_last_transaction(self, include_open_exclusive_transaction: bool = True) -> api_types.Transaction | None:
266+
"""Returns the last transaction or None if there are no transactions.
267+
268+
Args:
269+
include_open_exclusive_transaction: If True, includes open transactions
270+
in the results. If False, only returns committed transactions.
271+
"""
266272
v = self.get_transactions(
267273
page_size=1,
268-
include_open_exclusive_transaction=True,
274+
include_open_exclusive_transaction=include_open_exclusive_transaction,
269275
)
270276
if v is not None and len(v) > 0:
271277
return v[0]
@@ -799,6 +805,48 @@ def to_polars(self) -> pl.DataFrame:
799805
"""
800806
return self.query_foundry_sql("SELECT *", return_type="polars")
801807

808+
def to_lazy_polars(self, transaction_rid: str | None = None) -> pl.LazyFrame:
809+
"""Get dataset as a :py:class:`polars.LazyFrame`.
810+
811+
Returns a lazy polars DataFrame that can be queried efficiently using
812+
polars' lazy evaluation API. The data is accessed directly via the
813+
S3-compatible API without going through FoundrySqlServer.
814+
815+
Args:
816+
transaction_rid: The transaction RID to read from. If None, uses the
817+
last committed transaction. Useful for reading specific historical
818+
versions of the dataset.
819+
820+
Returns:
821+
pl.LazyFrame: A lazy polars DataFrame
822+
823+
Example:
824+
>>> ds = ctx.get_dataset_by_path("/path/to/dataset")
825+
>>> lf = ds.to_lazy_polars()
826+
>>> result = lf.filter(pl.col("age") > 25).select(["name", "age"])
827+
>>> # Execute and collect results
828+
>>> df = result.collect()
829+
830+
Note:
831+
This method uses the S3-compatible API to directly access dataset files.
832+
For hive-partitioned datasets, polars will automatically read
833+
the partition structure.
834+
"""
835+
from foundry_dev_tools._optional.polars import pl
836+
837+
if transaction_rid is None:
838+
maybe_transaction = self.get_last_transaction(include_open_exclusive_transaction=False)
839+
if maybe_transaction is None:
840+
msg = f"Dataset has no committed transactions: {self.rid=}"
841+
raise DatasetHasNoTransactionsError(info=msg)
842+
transaction_rid = maybe_transaction["rid"]
843+
844+
return pl.scan_parquet(
845+
f"s3://{self.rid}.{transaction_rid}/**/*.parquet",
846+
storage_options=self._context.s3.get_polars_storage_options(),
847+
hive_partitioning=True,
848+
)
849+
802850
@contextmanager
803851
def transaction_context(
804852
self,

tests/integration/resources/test_dataset.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,36 @@ def test_crud_dataset(spark_session, tmp_path): # noqa: PLR0915
130130
# # check that deletion was successful
131131
with pytest.raises(DatasetNotFoundError):
132132
ds.sync()
133+
134+
135+
def test_to_lazy_polars_parquet_dataset():
136+
ds = TEST_SINGLETON.iris_parquet
137+
lazy_df = ds.to_lazy_polars()
138+
139+
assert isinstance(lazy_df, pl.LazyFrame)
140+
141+
df = lazy_df.collect()
142+
assert df.shape == (150, 5)
143+
assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"]
144+
145+
146+
def test_to_lazy_polars_parquet_dataset_explicit_transaction():
147+
ds = TEST_SINGLETON.iris_parquet
148+
lazy_df = ds.to_lazy_polars(ds.get_last_transaction()["rid"])
149+
150+
assert isinstance(lazy_df, pl.LazyFrame)
151+
152+
df = lazy_df.collect()
153+
assert df.shape == (150, 5)
154+
assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"]
155+
156+
157+
def test_to_lazy_polars_hive_partitioned():
158+
ds = TEST_SINGLETON.iris_hive_partitioned
159+
lazy_df = ds.to_lazy_polars()
160+
161+
assert isinstance(lazy_df, pl.LazyFrame)
162+
163+
df = lazy_df.collect()
164+
assert df.shape == (150, 5)
165+
assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"]

tests/integration/utils.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,79 @@
144144
},
145145
}
146146

147+
IRIS_SCHEMA_HIVE = {
148+
"fieldSchemaList": [
149+
{
150+
"type": "DOUBLE",
151+
"name": "sepal_width",
152+
"nullable": None,
153+
"userDefinedTypeClass": None,
154+
"customMetadata": {},
155+
"arraySubtype": None,
156+
"precision": None,
157+
"scale": None,
158+
"mapKeyType": None,
159+
"mapValueType": None,
160+
"subSchemas": None,
161+
},
162+
{
163+
"type": "DOUBLE",
164+
"name": "sepal_length",
165+
"nullable": None,
166+
"userDefinedTypeClass": None,
167+
"customMetadata": {},
168+
"arraySubtype": None,
169+
"precision": None,
170+
"scale": None,
171+
"mapKeyType": None,
172+
"mapValueType": None,
173+
"subSchemas": None,
174+
},
175+
{
176+
"type": "DOUBLE",
177+
"name": "petal_width",
178+
"nullable": None,
179+
"userDefinedTypeClass": None,
180+
"customMetadata": {},
181+
"arraySubtype": None,
182+
"precision": None,
183+
"scale": None,
184+
"mapKeyType": None,
185+
"mapValueType": None,
186+
"subSchemas": None,
187+
},
188+
{
189+
"type": "DOUBLE",
190+
"name": "petal_length",
191+
"nullable": None,
192+
"userDefinedTypeClass": None,
193+
"customMetadata": {},
194+
"arraySubtype": None,
195+
"precision": None,
196+
"scale": None,
197+
"mapKeyType": None,
198+
"mapValueType": None,
199+
"subSchemas": None,
200+
},
201+
{
202+
"type": "STRING",
203+
"name": "is_setosa",
204+
"nullable": None,
205+
"userDefinedTypeClass": None,
206+
"customMetadata": {},
207+
"arraySubtype": None,
208+
"precision": None,
209+
"scale": None,
210+
"mapKeyType": None,
211+
"mapValueType": None,
212+
"subSchemas": None,
213+
},
214+
],
215+
"primaryKey": None,
216+
"dataFrameReaderClass": "com.palantir.foundry.spark.input.ParquetDataFrameReader",
217+
"customMetadata": {"format": "parquet"},
218+
}
219+
147220
FOUNDRY_SCHEMA_COMPLEX_DATASET = {
148221
"fieldSchemaList": [
149222
{
@@ -515,6 +588,30 @@ def iris_no_schema(self) -> Dataset:
515588
)
516589
return _iris_no_schema
517590

591+
@cached_property
592+
def iris_hive_partitioned(self) -> Dataset:
593+
_iris_hive_partitioned = self.ctx.get_dataset_by_path(
594+
INTEGRATION_TEST_COMPASS_ROOT_PATH + "/iris_hive_partitioned",
595+
create_if_not_exist=True,
596+
)
597+
if _iris_hive_partitioned.__created__:
598+
_ = _iris_hive_partitioned.upload_folder(TEST_FOLDER.joinpath("test_data", "iris", "iris_hive_partitioned"))
599+
_iris_hive_partitioned.upload_schema(
600+
_iris_hive_partitioned.get_last_transaction()["rid"], schema=IRIS_SCHEMA_HIVE
601+
)
602+
return _iris_hive_partitioned
603+
604+
@cached_property
605+
def iris_parquet(self) -> Dataset:
606+
_iris_parquet = self.ctx.get_dataset_by_path(
607+
INTEGRATION_TEST_COMPASS_ROOT_PATH + "/iris_parquet",
608+
create_if_not_exist=True,
609+
)
610+
if _iris_parquet.__created__:
611+
_ = _iris_parquet.upload_folder(TEST_FOLDER.joinpath("test_data", "iris", "iris_parquet"))
612+
_iris_parquet.upload_schema(_iris_parquet.get_last_transaction()["rid"], schema=IRIS_SCHEMA_HIVE)
613+
return _iris_parquet
614+
518615
@cached_property
519616
def empty_dataset(self) -> Dataset:
520617
return self.ctx.get_dataset_by_path(
Binary file not shown.
Binary file not shown.
Binary file not shown.
3.1 KB
Binary file not shown.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from unittest import mock
2+
3+
import pytest
4+
5+
from foundry_dev_tools.errors.dataset import DatasetHasNoTransactionsError
6+
from foundry_dev_tools.resources.dataset import Dataset
7+
8+
9+
def test_to_lazy_polars_no_transaction():
10+
with mock.patch.object(Dataset, "__created__", True):
11+
ds = Dataset.__new__(Dataset)
12+
ds.rid = "ri.foundry.main.dataset.test-dataset"
13+
ds.path = "/test/dataset/path"
14+
15+
with mock.patch.object(ds, "get_last_transaction", return_value=None):
16+
with pytest.raises(DatasetHasNoTransactionsError) as exc_info:
17+
ds.to_lazy_polars()
18+
19+
error_message = str(exc_info.value)
20+
assert "Dataset has no transactions" in error_message
21+
assert ds.rid in error_message
22+
23+
24+
def test_to_lazy_polars_transaction_rid_logic():
25+
with mock.patch.object(Dataset, "__created__", True):
26+
ds = Dataset.__new__(Dataset)
27+
ds.rid = "ri.foundry.main.dataset.abc123"
28+
ds._context = mock.MagicMock()
29+
ds._context.s3.get_polars_storage_options.return_value = {"aws_access_key_id": "test"}
30+
31+
with mock.patch("foundry_dev_tools._optional.polars.pl.scan_parquet") as mock_scan:
32+
mock_scan.return_value = mock.MagicMock()
33+
ds.to_lazy_polars(transaction_rid="test")
34+
35+
mock_scan.assert_called_once()
36+
call_args = mock_scan.call_args
37+
assert call_args[0][0] == f"s3://{ds.rid}.test/**/*.parquet"
38+
assert call_args[1]["storage_options"] == ds._context.s3.get_polars_storage_options()
39+
assert call_args[1]["hive_partitioning"] is True

0 commit comments

Comments
 (0)