Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9b4f144
Migrate Table → TableProvider; refactor registration and access, update
kosiew Sep 15, 2025
3da3f93
Refactors and bug fixes around TableProvider registration and
kosiew Sep 16, 2025
512442b
TableProvider refactor & PyDataFrame integration
kosiew Sep 16, 2025
a8275dc
Normalize & simplify TableProvider/DataFrame registration; add
kosiew Sep 16, 2025
00bd445
refactor: update documentation for DataFrame to Table Provider conver…
kosiew Sep 18, 2025
6869919
refactor: replace to_view_provider with inner_df for DataFrame access
kosiew Sep 18, 2025
6e46d43
refactor: streamline TableProvider creation from DataFrame by consoli…
kosiew Sep 18, 2025
38af2b5
Merge branch 'main' into table-provider-1239
kosiew Sep 18, 2025
1872a7f
fix ruff errors
kosiew Sep 18, 2025
5948fb4
refactor: enhance autoapi_skip_member_fn to skip private variables an…
kosiew Sep 18, 2025
b9851d8
revert main 49.0.0 md
kosiew Sep 22, 2025
586c2cf
refactor: add comment in autoapi_skip_member_fn
kosiew Sep 22, 2025
d4ff136
refactor: remove isort and ruff comments to clean up import section
kosiew Sep 22, 2025
29203c6
docs: enhance docstring for DataFrame.into_view method to clarify usa…
kosiew Sep 22, 2025
ae8c1dd
docs: update example in DataFrame.into_view docstring for clarity
kosiew Sep 22, 2025
0c5eb17
docs: update example for registering Delta Lake tables to simplify usage
kosiew Sep 22, 2025
f9a3a22
docs: update table provider documentation for clarity and deprecate o…
kosiew Sep 22, 2025
f930181
docs: update documentation to reflect removal of TableProvider and us…
kosiew Sep 22, 2025
afc9b4e
remove TableProvider in Python, update missing_exports function, doc
kosiew Sep 22, 2025
918b1ce
Fix Ruff errors
kosiew Sep 22, 2025
93f0a31
Refactor test_table_loading to use Table instead of TableProvider
kosiew Sep 22, 2025
7bc303d
Refactor aggregate tests to simplify result assertions and improve re…
kosiew Sep 22, 2025
4429614
Add comments to clarify table normalization in aggregate tests
kosiew Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def autoapi_skip_member_fn(app, what, name, obj, skip, options) -> bool: # noqa
# Duplicate modules (skip module-level docs to avoid duplication)
("module", "datafusion.col"),
("module", "datafusion.udf"),
# Private variables causing duplicate documentation
("data", "datafusion.utils._PYARROW_DATASET_TYPES"),
("variable", "datafusion.utils._PYARROW_DATASET_TYPES"),
# Deprecated
("class", "datafusion.substrait.serde"),
("class", "datafusion.substrait.plan"),
Expand All @@ -91,9 +94,28 @@ def autoapi_skip_member_fn(app, what, name, obj, skip, options) -> bool: # noqa
("method", "datafusion.context.SessionContext.tables"),
("method", "datafusion.dataframe.DataFrame.unnest_column"),
]
# Explicitly skip certain members listed above. These are either
# re-exports, duplicate module-level documentation, deprecated
# API surfaces, or private variables that would otherwise appear
# in the generated docs and cause confusing duplication.
# Keeping this explicit list avoids surprising entries in the
# AutoAPI output and gives us a single place to opt-out items
# when we intentionally hide them from the docs.
if (what, name) in skip_contents:
skip = True

# Skip private module-level names (those whose final component
# starts with an underscore) when AutoAPI is rendering data or
# variable entries. Many internal module-level constants are
# implementation details (for example private pyarrow dataset type
# mappings) that would otherwise be emitted as top-level "data"
# or "variable" docs. Filtering them here avoids noisy,
# duplicate, or implementation-specific entries in the public
# documentation while still allowing public members and types to
# be documented normally.
if name.split(".")[-1].startswith("_") and what in ("data", "variable"):
skip = True

return skip


Expand Down
2 changes: 1 addition & 1 deletion docs/source/contributor-guide/ffi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ as performant as possible and to utilize the features of DataFusion, you may dec
your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a Python library.

At first glance, it may appear the best way to do this is to add the ``datafusion-python``
crate as a dependency, provide a ``PyTable``, and then to register it with the
crate as a dependency, produce a DataFusion table in Rust, and then register it with the
``SessionContext``. Unfortunately, this will not work.

When you produce your code as a Python library and it needs to interact with the DataFusion
Expand Down
17 changes: 15 additions & 2 deletions docs/source/user-guide/data-sources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,26 @@ as Delta Lake. This will require a recent version of
.. code-block:: python

from deltalake import DeltaTable
from datafusion import Table

delta_table = DeltaTable("path_to_table")
ctx.register_table_provider("my_delta_table", delta_table)
table = Table.from_capsule(delta_table.__datafusion_table_provider__())
ctx.register_table("my_delta_table", table)
df = ctx.table("my_delta_table")
df.show()

On older versions of ``deltalake`` (prior to 0.22) you can use the
Objects that implement ``__datafusion_table_provider__`` are supported directly by
:py:meth:`~datafusion.context.SessionContext.register_table`, making it easy to
work with custom table providers from Python libraries such as Delta Lake.

.. note::

:py:meth:`~datafusion.context.SessionContext.register_table_provider` is
deprecated. Use
:py:meth:`~datafusion.context.SessionContext.register_table` with a
:py:class:`~datafusion.Table` instead.

On older versions of ``deltalake`` (prior to 0.22) you can use the
`Arrow DataSet <https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html>`_
interface to import to DataFusion, but this does not support features such as filter push down
which can lead to a significant performance difference.
Expand Down
41 changes: 34 additions & 7 deletions docs/source/user-guide/io/table_provider.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,47 @@ A complete example can be found in the `examples folder <https://github.com/apac
) -> PyResult<Bound<'py, PyCapsule>> {
let name = CString::new("datafusion_table_provider").unwrap();

let provider = Arc::new(self.clone())
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
let provider = FFI_TableProvider::new(Arc::new(provider), false);
let provider = Arc::new(self.clone());
let provider = FFI_TableProvider::new(provider, false, None);

PyCapsule::new_bound(py, provider, Some(name.clone()))
}
}

Once you have this library available, in python you can register your table provider
to the ``SessionContext``.
Once you have this library available, you can construct a
:py:class:`~datafusion.Table` in Python and register it with the
``SessionContext``. Tables can be created either from the PyCapsule exposed by your
Rust provider or from an existing :py:class:`~datafusion.dataframe.DataFrame`.
Call the provider's ``__datafusion_table_provider__()`` method to obtain the capsule
before constructing a ``Table``. The ``Table.from_view()`` helper is
deprecated; instead use ``Table.from_dataframe()`` or ``DataFrame.into_view()``.

.. note::

:py:meth:`~datafusion.context.SessionContext.register_table_provider` is
deprecated. Use
:py:meth:`~datafusion.context.SessionContext.register_table` with the
resulting :py:class:`~datafusion.Table` instead.

.. code-block:: python

from datafusion import SessionContext, Table

ctx = SessionContext()
provider = MyTableProvider()
ctx.register_table_provider("my_table", provider)

ctx.table("my_table").show()
capsule = provider.__datafusion_table_provider__()
capsule_table = Table.from_capsule(capsule)

df = ctx.from_pydict({"a": [1]})
view_table = Table.from_dataframe(df)
# or: view_table = df.into_view()

ctx.register_table("capsule_table", capsule_table)
ctx.register_table("view_table", view_table)

ctx.table("capsule_table").show()
ctx.table("view_table").show()

Both ``Table.from_capsule()`` and ``Table.from_dataframe()`` create
table providers that can be registered with the SessionContext using ``register_table()``.
30 changes: 15 additions & 15 deletions examples/datafusion-ffi-example/python/tests/_test_aggregate_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,33 @@ def test_ffi_aggregate_register():

result = ctx.sql("select my_custom_sum(a) from test_table group by b").collect()

assert len(result) == 2
assert result
assert result[0].num_columns == 1

result = [r.column(0) for r in result]
expected = [
pa.array([3], type=pa.int64()),
pa.array([3], type=pa.int64()),
]
# Normalizing table registration in _normalize_table_provider feeds the Rust layer
# an actual TableProvider, so collect() emits the grouped rows in a single record batch
# instead of two separate batches.
aggregates = pa.concat_arrays([batch.column(0) for batch in result])

assert result == expected
assert len(aggregates) == 2
assert aggregates.to_pylist() == [3, 3]


def test_ffi_aggregate_call_directly():
ctx = setup_context_with_table()
my_udaf = udaf(MySumUDF())

result = (
ctx.table("test_table").aggregate([col("b")], [my_udaf(col("a"))]).collect()
)

assert len(result) == 2
# Normalizing table registration in _normalize_table_provider feeds the Rust layer
# an actual TableProvider, so collect() emits the grouped rows in a single record batch
# instead of two separate batches.
assert result
assert result[0].num_columns == 2

result = [r.column(1) for r in result]
expected = [
pa.array([3], type=pa.int64()),
pa.array([3], type=pa.int64()),
]
aggregates = pa.concat_arrays([batch.column(1) for batch in result])

assert result == expected
assert len(aggregates) == 2
assert aggregates.to_pylist() == [3, 3]
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_ffi_table_function_call_directly():
table_udtf = udtf(table_func, "my_table_func")

my_table = table_udtf()
ctx.register_table_provider("t", my_table)
ctx.register_table("t", my_table)
result = ctx.table("t").collect()

assert len(result) == 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
from __future__ import annotations

import pyarrow as pa
from datafusion import SessionContext
from datafusion import SessionContext, Table
from datafusion_ffi_example import MyTableProvider


def test_table_loading():
ctx = SessionContext()
table = MyTableProvider(3, 2, 4)
ctx.register_table_provider("t", table)
ctx.register_table("t", Table.from_capsule(table.__datafusion_table_provider__()))
result = ctx.table("t").collect()

assert len(result) == 4
Expand Down
15 changes: 6 additions & 9 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@
try:
import importlib.metadata as importlib_metadata
except ImportError:
import importlib_metadata
import importlib_metadata # type: ignore[import]

# Public submodules
from . import functions, object_store, substrait, unparser

# The following imports are okay to remain as opaque to the user.
from ._internal import Config
from ._internal import EXPECTED_PROVIDER_MSG, Config
from .catalog import Catalog, Database, Table
from .col import col, column
from .common import (
DFSchema,
)
from .common import DFSchema
from .context import (
RuntimeEnvBuilder,
SessionConfig,
Expand All @@ -47,10 +46,7 @@
)
from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
from .dataframe_formatter import configure_formatter
from .expr import (
Expr,
WindowFrame,
)
from .expr import Expr, WindowFrame
from .io import read_avro, read_csv, read_json, read_parquet
from .plan import ExecutionPlan, LogicalPlan
from .record_batch import RecordBatch, RecordBatchStream
Expand All @@ -69,6 +65,7 @@
__version__ = importlib_metadata.version(__name__)

__all__ = [
"EXPECTED_PROVIDER_MSG",
"Accumulator",
"AggregateUDF",
"Catalog",
Expand Down
Loading
Loading