Skip to content

Commit 38af2b5

Browse files
committed
Merge branch 'main' into table-provider-1239
2 parents 6e46d43 + bf22c1d commit 38af2b5

File tree

19 files changed

+1294
-769
lines changed

19 files changed

+1294
-769
lines changed

Cargo.lock

Lines changed: 400 additions & 417 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ protoc = [ "datafusion-substrait/protoc" ]
3434
substrait = ["dep:datafusion-substrait"]
3535

3636
[dependencies]
37-
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38-
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
39-
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
37+
tokio = { version = "1.47", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38+
pyo3 = { version = "0.25", features = ["extension-module", "abi3", "abi3-py39"] }
39+
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"]}
4040
pyo3-log = "0.12.4"
41-
arrow = { version = "55.1.0", features = ["pyarrow"] }
42-
datafusion = { version = "49.0.2", features = ["avro", "unicode_expressions"] }
43-
datafusion-substrait = { version = "49.0.2", optional = true }
44-
datafusion-proto = { version = "49.0.2" }
45-
datafusion-ffi = { version = "49.0.2" }
41+
arrow = { version = "56", features = ["pyarrow"] }
42+
datafusion = { version = "50", features = ["avro", "unicode_expressions"] }
43+
datafusion-substrait = { version = "50", optional = true }
44+
datafusion-proto = { version = "50" }
45+
datafusion-ffi = { version = "50" }
4646
prost = "0.13.1" # keep in line with `datafusion-substrait`
4747
uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
@@ -54,7 +54,7 @@ log = "0.4.27"
5454

5555
[build-dependencies]
5656
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
57-
pyo3-build-config = "0.24"
57+
pyo3-build-config = "0.25"
5858

5959
[lib]
6060
name = "datafusion_python"

benchmarks/max_cpu_usage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353

5454
def main(num_rows: int, partitions: int) -> None:
5555
"""Run a simple aggregation after repartitioning.
56-
56+
5757
This function demonstrates basic partitioning concepts using synthetic data.
5858
Real-world performance will depend on your specific data sources, query types,
5959
and system configuration.

docs/source/user-guide/common-operations/windows.rst

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ We'll use the pokemon dataset (from Ritchie Vink) in the following examples.
3131
.. ipython:: python
3232
3333
from datafusion import SessionContext
34-
from datafusion import col
34+
from datafusion import col, lit
3535
from datafusion import functions as f
3636
3737
ctx = SessionContext()
@@ -120,16 +120,14 @@ two preceding rows.
120120

121121
.. ipython:: python
122122
123-
from datafusion.expr import WindowFrame
123+
from datafusion.expr import Window, WindowFrame
124124
125125
df.select(
126126
col('"Name"'),
127127
col('"Speed"'),
128-
f.window("avg",
129-
[col('"Speed"')],
130-
order_by=[col('"Speed"')],
131-
window_frame=WindowFrame("rows", 2, 0)
132-
).alias("Previous Speed")
128+
f.avg(col('"Speed"'))
129+
.over(Window(window_frame=WindowFrame("rows", 2, 0), order_by=[col('"Speed"')]))
130+
.alias("Previous Speed"),
133131
)
134132
135133
Null Treatment
@@ -151,21 +149,27 @@ it's ``Type 2`` column that are null.
151149
152150
from datafusion.common import NullTreatment
153151
154-
df.filter(col('"Type 1"') == lit("Bug")).select(
152+
df.filter(col('"Type 1"') == lit("Bug")).select(
155153
'"Name"',
156154
'"Type 2"',
157-
f.window("last_value", [col('"Type 2"')])
158-
.window_frame(WindowFrame("rows", None, 0))
159-
.order_by(col('"Speed"'))
160-
.null_treatment(NullTreatment.IGNORE_NULLS)
161-
.build()
162-
.alias("last_wo_null"),
163-
f.window("last_value", [col('"Type 2"')])
164-
.window_frame(WindowFrame("rows", None, 0))
165-
.order_by(col('"Speed"'))
166-
.null_treatment(NullTreatment.RESPECT_NULLS)
167-
.build()
168-
.alias("last_with_null")
155+
f.last_value(col('"Type 2"'))
156+
.over(
157+
Window(
158+
window_frame=WindowFrame("rows", None, 0),
159+
order_by=[col('"Speed"')],
160+
null_treatment=NullTreatment.IGNORE_NULLS,
161+
)
162+
)
163+
.alias("last_wo_null"),
164+
f.last_value(col('"Type 2"'))
165+
.over(
166+
Window(
167+
window_frame=WindowFrame("rows", None, 0),
168+
order_by=[col('"Speed"')],
169+
null_treatment=NullTreatment.RESPECT_NULLS,
170+
)
171+
)
172+
.alias("last_with_null"),
169173
)
170174
171175
Aggregate Functions

docs/source/user-guide/data-sources.rst

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,41 @@ which can lead to a significant performance difference.
181181
df = ctx.table("my_delta_table")
182182
df.show()
183183
184-
Iceberg
185-
-------
184+
Apache Iceberg
185+
--------------
186186

187-
Coming soon!
187+
DataFusion 45.0.0 and later support the ability to register Apache Iceberg tables as table providers through the Custom Table Provider interface.
188+
189+
This requires either the `pyiceberg <https://pypi.org/project/pyiceberg/>`__ library (>=0.10.0) or the `pyiceberg-core <https://pypi.org/project/pyiceberg-core/>`__ library (>=0.5.0).
190+
191+
* The ``pyiceberg-core`` library exposes Iceberg Rust's implementation of the Custom Table Provider interface as python bindings.
192+
* The ``pyiceberg`` library utilizes the ``pyiceberg-core`` python bindings under the hood and provides a native way for Python users to interact with the DataFusion.
193+
194+
.. code-block:: python
195+
196+
from datafusion import SessionContext
197+
from pyiceberg.catalog import load_catalog
198+
import pyarrow as pa
199+
200+
# Load catalog and create/load a table
201+
catalog = load_catalog("catalog", type="in-memory")
202+
catalog.create_namespace_if_not_exists("default")
203+
204+
# Create some sample data
205+
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
206+
iceberg_table = catalog.create_table("default.test", schema=data.schema)
207+
iceberg_table.append(data)
208+
209+
# Register the table with DataFusion
210+
ctx = SessionContext()
211+
ctx.register_table_provider("test", iceberg_table)
212+
213+
# Query the table using DataFusion
214+
ctx.table("test").show()
215+
216+
217+
Note that the Datafusion integration rely on features from the `Iceberg Rust <https://github.com/apache/iceberg-rust/>`_ implementation instead of the `PyIceberg <https://github.com/apache/iceberg-python/>`_ implementation.
218+
Features that are available in PyIceberg but not yet in Iceberg Rust will not be available when using DataFusion.
188219

189220
Custom Table Provider
190221
---------------------

docs/source/user-guide/dataframe/index.rst

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,56 @@ DataFusion's DataFrame API offers a wide range of operations:
126126
# Drop columns
127127
df = df.drop("temporary_column")
128128
129+
Column Names as Function Arguments
130+
----------------------------------
131+
132+
Some ``DataFrame`` methods accept column names when an argument refers to an
133+
existing column. These include:
134+
135+
* :py:meth:`~datafusion.DataFrame.select`
136+
* :py:meth:`~datafusion.DataFrame.sort`
137+
* :py:meth:`~datafusion.DataFrame.drop`
138+
* :py:meth:`~datafusion.DataFrame.join` (``on`` argument)
139+
* :py:meth:`~datafusion.DataFrame.aggregate` (grouping columns)
140+
141+
See the full function documentation for details on any specific function.
142+
143+
Note that :py:meth:`~datafusion.DataFrame.join_on` expects ``col()``/``column()`` expressions rather than plain strings.
144+
145+
For such methods, you can pass column names directly:
146+
147+
.. code-block:: python
148+
149+
from datafusion import col, functions as f
150+
151+
df.sort('id')
152+
df.aggregate('id', [f.count(col('value'))])
153+
154+
The same operation can also be written with explicit column expressions, using either ``col()`` or ``column()``:
155+
156+
.. code-block:: python
157+
158+
from datafusion import col, column, functions as f
159+
160+
df.sort(col('id'))
161+
df.aggregate(column('id'), [f.count(col('value'))])
162+
163+
Note that ``column()`` is an alias of ``col()``, so you can use either name; the example above shows both in action.
164+
165+
Whenever an argument represents an expression—such as in
166+
:py:meth:`~datafusion.DataFrame.filter` or
167+
:py:meth:`~datafusion.DataFrame.with_column`—use ``col()`` to reference
168+
columns. The comparison and arithmetic operators on ``Expr`` will automatically
169+
convert any non-``Expr`` value into a literal expression, so writing
170+
171+
.. code-block:: python
172+
173+
from datafusion import col
174+
df.filter(col("age") > 21)
175+
176+
is equivalent to using ``lit(21)`` explicitly. Use ``lit()`` (also available
177+
as ``literal()``) when you need to construct a literal expression directly.
178+
129179
Terminal Operations
130180
-------------------
131181

python/datafusion/context.py

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222
import warnings
2323
from typing import TYPE_CHECKING, Any, Protocol
2424

25-
import pyarrow as pa
26-
2725
try:
2826
from warnings import deprecated # Python 3.13+
2927
except ImportError:
3028
from typing_extensions import deprecated # Python 3.12
3129

30+
import pyarrow as pa
31+
3232
from datafusion.catalog import Catalog, CatalogProvider, Table
3333
from datafusion.dataframe import DataFrame
34-
from datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list
34+
from datafusion.expr import SortKey, sort_list_to_raw_sort_list
3535
from datafusion.record_batch import RecordBatchStream
3636
from datafusion.user_defined import AggregateUDF, ScalarUDF, TableFunction, WindowUDF
3737
from datafusion.utils import _normalize_table_provider
@@ -40,12 +40,14 @@
4040
from ._internal import SessionConfig as SessionConfigInternal
4141
from ._internal import SessionContext as SessionContextInternal
4242
from ._internal import SQLOptions as SQLOptionsInternal
43+
from ._internal import expr as expr_internal
4344

4445
if TYPE_CHECKING:
4546
import pathlib
47+
from collections.abc import Sequence
4648

4749
import pandas as pd
48-
import polars as pl
50+
import polars as pl # type: ignore[import]
4951

5052
from datafusion import TableProvider
5153
from datafusion.plan import ExecutionPlan, LogicalPlan
@@ -555,7 +557,7 @@ def register_listing_table(
555557
table_partition_cols: list[tuple[str, str | pa.DataType]] | None = None,
556558
file_extension: str = ".parquet",
557559
schema: pa.Schema | None = None,
558-
file_sort_order: list[list[Expr | SortExpr]] | None = None,
560+
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
559561
) -> None:
560562
"""Register multiple files as a single table.
561563
@@ -569,23 +571,20 @@ def register_listing_table(
569571
table_partition_cols: Partition columns.
570572
file_extension: File extension of the provided table.
571573
schema: The data source schema.
572-
file_sort_order: Sort order for the file.
574+
file_sort_order: Sort order for the file. Each sort key can be
575+
specified as a column name (``str``), an expression
576+
(``Expr``), or a ``SortExpr``.
573577
"""
574578
if table_partition_cols is None:
575579
table_partition_cols = []
576580
table_partition_cols = self._convert_table_partition_cols(table_partition_cols)
577-
file_sort_order_raw = (
578-
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
579-
if file_sort_order is not None
580-
else None
581-
)
582581
self.ctx.register_listing_table(
583582
name,
584583
str(path),
585584
table_partition_cols,
586585
file_extension,
587586
schema,
588-
file_sort_order_raw,
587+
self._convert_file_sort_order(file_sort_order),
589588
)
590589

591590
def sql(self, query: str, options: SQLOptions | None = None) -> DataFrame:
@@ -831,7 +830,7 @@ def register_parquet(
831830
file_extension: str = ".parquet",
832831
skip_metadata: bool = True,
833832
schema: pa.Schema | None = None,
834-
file_sort_order: list[list[SortExpr]] | None = None,
833+
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
835834
) -> None:
836835
"""Register a Parquet file as a table.
837836
@@ -850,7 +849,9 @@ def register_parquet(
850849
that may be in the file schema. This can help avoid schema
851850
conflicts due to metadata.
852851
schema: The data source schema.
853-
file_sort_order: Sort order for the file.
852+
file_sort_order: Sort order for the file. Each sort key can be
853+
specified as a column name (``str``), an expression
854+
(``Expr``), or a ``SortExpr``.
854855
"""
855856
if table_partition_cols is None:
856857
table_partition_cols = []
@@ -863,9 +864,7 @@ def register_parquet(
863864
file_extension,
864865
skip_metadata,
865866
schema,
866-
[sort_list_to_raw_sort_list(exprs) for exprs in file_sort_order]
867-
if file_sort_order is not None
868-
else None,
867+
self._convert_file_sort_order(file_sort_order),
869868
)
870869

871870
def register_csv(
@@ -1122,7 +1121,7 @@ def read_parquet(
11221121
file_extension: str = ".parquet",
11231122
skip_metadata: bool = True,
11241123
schema: pa.Schema | None = None,
1125-
file_sort_order: list[list[Expr | SortExpr]] | None = None,
1124+
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
11261125
) -> DataFrame:
11271126
"""Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.
11281127
@@ -1139,19 +1138,17 @@ def read_parquet(
11391138
schema: An optional schema representing the parquet files. If None,
11401139
the parquet reader will try to infer it based on data in the
11411140
file.
1142-
file_sort_order: Sort order for the file.
1141+
file_sort_order: Sort order for the file. Each sort key can be
1142+
specified as a column name (``str``), an expression
1143+
(``Expr``), or a ``SortExpr``.
11431144
11441145
Returns:
11451146
DataFrame representation of the read Parquet files
11461147
"""
11471148
if table_partition_cols is None:
11481149
table_partition_cols = []
11491150
table_partition_cols = self._convert_table_partition_cols(table_partition_cols)
1150-
file_sort_order = (
1151-
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
1152-
if file_sort_order is not None
1153-
else None
1154-
)
1151+
file_sort_order = self._convert_file_sort_order(file_sort_order)
11551152
return DataFrame(
11561153
self.ctx.read_parquet(
11571154
str(path),
@@ -1202,6 +1199,24 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
12021199
"""Execute the ``plan`` and return the results."""
12031200
return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions))
12041201

1202+
@staticmethod
1203+
def _convert_file_sort_order(
1204+
file_sort_order: Sequence[Sequence[SortKey]] | None,
1205+
) -> list[list[expr_internal.SortExpr]] | None:
1206+
"""Convert nested ``SortKey`` sequences into raw sort expressions.
1207+
1208+
Each ``SortKey`` can be a column name string, an ``Expr``, or a
1209+
``SortExpr`` and will be converted using
1210+
:func:`datafusion.expr.sort_list_to_raw_sort_list`.
1211+
"""
1212+
# Convert each ``SortKey`` in the provided sort order to the low-level
1213+
# representation expected by the Rust bindings.
1214+
return (
1215+
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
1216+
if file_sort_order is not None
1217+
else None
1218+
)
1219+
12051220
@staticmethod
12061221
def _convert_table_partition_cols(
12071222
table_partition_cols: list[tuple[str, str | pa.DataType]],

0 commit comments

Comments
 (0)