Skip to content
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9b4f144
Migrate Table → TableProvider; refactor registration and access, update
kosiew Sep 15, 2025
3da3f93
Refactors and bug fixes around TableProvider registration and
kosiew Sep 16, 2025
512442b
TableProvider refactor & PyDataFrame integration
kosiew Sep 16, 2025
a8275dc
Normalize & simplify TableProvider/DataFrame registration; add
kosiew Sep 16, 2025
00bd445
refactor: update documentation for DataFrame to Table Provider conver…
kosiew Sep 18, 2025
6869919
refactor: replace to_view_provider with inner_df for DataFrame access
kosiew Sep 18, 2025
6e46d43
refactor: streamline TableProvider creation from DataFrame by consoli…
kosiew Sep 18, 2025
38af2b5
Merge branch 'main' into table-provider-1239
kosiew Sep 18, 2025
1872a7f
fix ruff errors
kosiew Sep 18, 2025
5948fb4
refactor: enhance autoapi_skip_member_fn to skip private variables an…
kosiew Sep 18, 2025
b9851d8
revert main 49.0.0 md
kosiew Sep 22, 2025
586c2cf
refactor: add comment in autoapi_skip_member_fn
kosiew Sep 22, 2025
d4ff136
refactor: remove isort and ruff comments to clean up import section
kosiew Sep 22, 2025
29203c6
docs: enhance docstring for DataFrame.into_view method to clarify usa…
kosiew Sep 22, 2025
ae8c1dd
docs: update example in DataFrame.into_view docstring for clarity
kosiew Sep 22, 2025
0c5eb17
docs: update example for registering Delta Lake tables to simplify usage
kosiew Sep 22, 2025
f9a3a22
docs: update table provider documentation for clarity and deprecate o…
kosiew Sep 22, 2025
f930181
docs: update documentation to reflect removal of TableProvider and us…
kosiew Sep 22, 2025
afc9b4e
remove TableProvider in Python, update missing_exports function, doc
kosiew Sep 22, 2025
918b1ce
Fix Ruff errors
kosiew Sep 22, 2025
93f0a31
Refactor test_table_loading to use Table instead of TableProvider
kosiew Sep 22, 2025
7bc303d
Refactor aggregate tests to simplify result assertions and improve re…
kosiew Sep 22, 2025
4429614
Add comments to clarify table normalization in aggregate tests
kosiew Sep 22, 2025
38bb25a
Initial implementation of unified table suggestion
timsaucer Oct 1, 2025
49abaeb
update unit tests
timsaucer Oct 1, 2025
cb7a755
Change documentation to be more user oriented
timsaucer Oct 1, 2025
937d39c
Update ffi examples
timsaucer Oct 1, 2025
c70968f
Update documentation
timsaucer Oct 1, 2025
90b3cb6
More documentation
timsaucer Oct 1, 2025
25d4141
Make documentation more user facing
timsaucer Oct 1, 2025
29b634e
More documentation updates
timsaucer Oct 1, 2025
20099d2
remove cruft
timsaucer Oct 1, 2025
c82d617
merge main
timsaucer Oct 1, 2025
9964b7f
fix ordering
timsaucer Oct 1, 2025
81b46cb
give read_table the same treatment
timsaucer Oct 2, 2025
36084a0
Reuse Table constructor to idenfity non-ffi tables when using udtf
timsaucer Oct 3, 2025
a50c1c6
Merge main into tsaucer/table-provider-recommendations
timsaucer Oct 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def autoapi_skip_member_fn(app, what, name, obj, skip, options) -> bool: # noqa
("method", "datafusion.context.SessionContext.tables"),
("method", "datafusion.dataframe.DataFrame.unnest_column"),
]
# Explicitly skip certain members listed above. These are either
# re-exports, duplicate module-level documentation, deprecated
# API surfaces, or private variables that would otherwise appear
# in the generated docs and cause confusing duplication.
# Keeping this explicit list avoids surprising entries in the
# AutoAPI output and gives us a single place to opt-out items
# when we intentionally hide them from the docs.
if (what, name) in skip_contents:
skip = True

Expand Down
2 changes: 1 addition & 1 deletion docs/source/contributor-guide/ffi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ as performant as possible and to utilize the features of DataFusion, you may dec
your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a Python library.

At first glance, it may appear the best way to do this is to add the ``datafusion-python``
crate as a dependency, provide a ``PyTable``, and then to register it with the
crate as a dependency, provide a ``PyTable``, and then to register it with the
``SessionContext``. Unfortunately, this will not work.

When you produce your code as a Python library and it needs to interact with the DataFusion
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/data-sources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ as Delta Lake. This will require a recent version of
from deltalake import DeltaTable

delta_table = DeltaTable("path_to_table")
ctx.register_table_provider("my_delta_table", delta_table)
ctx.register_table("my_delta_table", delta_table)
df = ctx.table("my_delta_table")
df.show()

On older versions of ``deltalake`` (prior to 0.22) you can use the
On older versions of ``deltalake`` (prior to 0.22) you can use the
`Arrow DataSet <https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html>`_
interface to import to DataFusion, but this does not support features such as filter push down
which can lead to a significant performance difference.
Expand Down
20 changes: 12 additions & 8 deletions docs/source/user-guide/io/table_provider.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,26 @@ A complete example can be found in the `examples folder <https://github.com/apac
&self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
let name = CString::new("datafusion_table_provider").unwrap();
let name = cr"datafusion_table_provider".into();

let provider = Arc::new(self.clone())
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
let provider = FFI_TableProvider::new(Arc::new(provider), false);
let provider = Arc::new(self.clone());
let provider = FFI_TableProvider::new(provider, false, None);

PyCapsule::new_bound(py, provider, Some(name.clone()))
}
}

Once you have this library available, in python you can register your table provider
to the ``SessionContext``.
Once you have this library available, you can construct a
:py:class:`~datafusion.Table` in Python and register it with the
``SessionContext``.

.. code-block:: python

from datafusion import SessionContext, Table

ctx = SessionContext()
provider = MyTableProvider()
ctx.register_table_provider("my_table", provider)

ctx.table("my_table").show()
ctx.register_table("capsule_table", provider)

ctx.table("capsule_table").show()
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def test_catalog_provider():

my_catalog_schemas = my_catalog.names()
assert expected_schema_name in my_catalog_schemas
my_database = my_catalog.database(expected_schema_name)
assert expected_table_name in my_database.names()
my_table = my_database.table(expected_table_name)
my_schema = my_catalog.schema(expected_schema_name)
assert expected_table_name in my_schema.names()
my_table = my_schema.table(expected_table_name)
assert expected_table_columns == my_table.schema.names

result = ctx.table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_ffi_table_function_call_directly():
table_udtf = udtf(table_func, "my_table_func")

my_table = table_udtf()
ctx.register_table_provider("t", my_table)
ctx.register_table("t", my_table)
result = ctx.table("t").collect()

assert len(result) == 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
def test_table_loading():
ctx = SessionContext()
table = MyTableProvider(3, 2, 4)
ctx.register_table_provider("t", table)
ctx.register_table("t", table)
result = ctx.table("t").collect()

assert len(result) == 4
Expand All @@ -40,3 +40,7 @@ def test_table_loading():
]

assert result == expected

result = ctx.read_table(table).collect()
result = [r.column(0) for r in result]
assert result == expected
Comment on lines +44 to +46
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kosiew Here is the unit test for read_table with a PyCapsule based table provider.

12 changes: 4 additions & 8 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@
try:
import importlib.metadata as importlib_metadata
except ImportError:
import importlib_metadata
import importlib_metadata # type: ignore[import]

# Public submodules
from . import functions, object_store, substrait, unparser

# The following imports are okay to remain as opaque to the user.
from ._internal import Config
from .catalog import Catalog, Database, Table
from .col import col, column
from .common import (
DFSchema,
)
from .common import DFSchema
from .context import (
RuntimeEnvBuilder,
SessionConfig,
Expand All @@ -47,10 +46,7 @@
)
from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
from .dataframe_formatter import configure_formatter
from .expr import (
Expr,
WindowFrame,
)
from .expr import Expr, WindowFrame
from .io import read_avro, read_csv, read_json, read_parquet
from .plan import ExecutionPlan, LogicalPlan
from .record_batch import RecordBatch, RecordBatchStream
Expand Down
62 changes: 44 additions & 18 deletions python/datafusion/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Protocol
from typing import TYPE_CHECKING, Any, Protocol

import datafusion._internal as df_internal

if TYPE_CHECKING:
import pyarrow as pa

from datafusion import DataFrame
from datafusion.context import TableProviderExportable

try:
from warnings import deprecated # Python 3.13+
except ImportError:
Expand Down Expand Up @@ -82,7 +85,11 @@ def database(self, name: str = "public") -> Schema:
"""Returns the database with the given ``name`` from this catalog."""
return self.schema(name)

def register_schema(self, name, schema) -> Schema | None:
def register_schema(
self,
name: str,
schema: Schema | SchemaProvider | SchemaProviderExportable,
) -> Schema | None:
"""Register a schema with this catalog."""
if isinstance(schema, Schema):
return self.catalog.register_schema(name, schema._raw_schema)
Expand Down Expand Up @@ -122,10 +129,12 @@ def table(self, name: str) -> Table:
"""Return the table with the given ``name`` from this schema."""
return Table(self._raw_schema.table(name))

def register_table(self, name, table) -> None:
"""Register a table provider in this schema."""
if isinstance(table, Table):
return self._raw_schema.register_table(name, table.table)
def register_table(
self,
name: str,
table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
) -> None:
"""Register a table in this schema."""
return self._raw_schema.register_table(name, table)

def deregister_table(self, name: str) -> None:
Expand All @@ -139,30 +148,45 @@ class Database(Schema):


class Table:
"""DataFusion table."""
"""A DataFusion table.

def __init__(self, table: df_internal.catalog.RawTable) -> None:
"""This constructor is not typically called by the end user."""
self.table = table
Internally we currently support the following types of tables:

- Tables created using built-in DataFusion methods, such as
reading from CSV or Parquet
- pyarrow datasets
- DataFusion DataFrames, which will be converted into a view
- Externally provided tables implemented with the FFI PyCapsule
interface (advanced)
"""

__slots__ = ("_inner",)

def __init__(
self, table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset
) -> None:
"""Constructor."""
self._inner = df_internal.catalog.RawTable(table)

def __repr__(self) -> str:
"""Print a string representation of the table."""
return self.table.__repr__()
return repr(self._inner)

@staticmethod
@deprecated("Use Table() constructor instead.")
def from_dataset(dataset: pa.dataset.Dataset) -> Table:
"""Turn a pyarrow Dataset into a Table."""
return Table(df_internal.catalog.RawTable.from_dataset(dataset))
"""Turn a :mod:`pyarrow.dataset` ``Dataset`` into a :class:`Table`."""
return Table(dataset)

@property
def schema(self) -> pa.Schema:
"""Returns the schema associated with this table."""
return self.table.schema
return self._inner.schema

@property
def kind(self) -> str:
"""Returns the kind of table."""
return self.table.kind
return self._inner.kind


class CatalogProvider(ABC):
Expand Down Expand Up @@ -219,14 +243,16 @@ def table(self, name: str) -> Table | None:
"""Retrieve a specific table from this schema."""
...

def register_table(self, name: str, table: Table) -> None: # noqa: B027
"""Add a table from this schema.
def register_table( # noqa: B027
self, name: str, table: Table | TableProviderExportable | Any
) -> None:
"""Add a table to this schema.

This method is optional. If your schema provides a fixed list of tables, you do
not need to implement this method.
"""

def deregister_table(self, name, cascade: bool) -> None: # noqa: B027
def deregister_table(self, name: str, cascade: bool) -> None: # noqa: B027
"""Remove a table from this schema.

This method is optional. If your schema provides a fixed list of tables, you do
Expand Down
53 changes: 32 additions & 21 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@

import pyarrow as pa

from datafusion.catalog import Catalog, CatalogProvider, Table
from datafusion.catalog import Catalog
from datafusion.dataframe import DataFrame
from datafusion.expr import SortKey, sort_list_to_raw_sort_list
from datafusion.expr import sort_list_to_raw_sort_list
from datafusion.record_batch import RecordBatchStream
from datafusion.user_defined import AggregateUDF, ScalarUDF, TableFunction, WindowUDF

from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
from ._internal import SessionConfig as SessionConfigInternal
Expand All @@ -48,7 +47,15 @@
import pandas as pd
import polars as pl # type: ignore[import]

from datafusion.catalog import CatalogProvider, Table
from datafusion.expr import SortKey
from datafusion.plan import ExecutionPlan, LogicalPlan
from datafusion.user_defined import (
AggregateUDF,
ScalarUDF,
TableFunction,
WindowUDF,
)


class ArrowStreamExportable(Protocol):
Expand Down Expand Up @@ -733,7 +740,7 @@ def from_polars(self, data: pl.DataFrame, name: str | None = None) -> DataFrame:
# https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
# is the discussion on how we arrived at adding register_view
def register_view(self, name: str, df: DataFrame) -> None:
"""Register a :py:class: `~datafusion.detaframe.DataFrame` as a view.
"""Register a :py:class:`~datafusion.dataframe.DataFrame` as a view.

Args:
name (str): The name to register the view under.
Expand All @@ -742,16 +749,21 @@ def register_view(self, name: str, df: DataFrame) -> None:
view = df.into_view()
self.ctx.register_table(name, view)

def register_table(self, name: str, table: Table) -> None:
"""Register a :py:class: `~datafusion.catalog.Table` as a table.
def register_table(
self,
name: str,
table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
) -> None:
"""Register a :py:class:`~datafusion.Table` with this context.

The registered table can be referenced from SQL statement executed against.
The registered table can be referenced from SQL statements executed against
this context.

Args:
name: Name of the resultant table.
table: DataFusion table to add to the session context.
table: Any object that can be converted into a :class:`Table`.
"""
self.ctx.register_table(name, table.table)
self.ctx.register_table(name, table)

def deregister_table(self, name: str) -> None:
"""Remove a table from the session."""
Expand All @@ -770,15 +782,17 @@ def register_catalog_provider(
else:
self.ctx.register_catalog_provider(name, provider)

@deprecated("Use register_table() instead.")
def register_table_provider(
self, name: str, provider: TableProviderExportable
self,
name: str,
provider: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
) -> None:
"""Register a table provider.

This table provider must have a method called ``__datafusion_table_provider__``
which returns a PyCapsule that exposes a ``FFI_TableProvider``.
Deprecated: use :meth:`register_table` instead.
"""
self.ctx.register_table_provider(name, provider)
self.register_table(name, provider)

def register_udtf(self, func: TableFunction) -> None:
"""Register a user defined table function."""
Expand Down Expand Up @@ -1163,14 +1177,11 @@ def read_avro(
self.ctx.read_avro(str(path), schema, file_partition_cols, file_extension)
)

def read_table(self, table: Table) -> DataFrame:
"""Creates a :py:class:`~datafusion.dataframe.DataFrame` from a table.

For a :py:class:`~datafusion.catalog.Table` such as a
:py:class:`~datafusion.catalog.ListingTable`, create a
:py:class:`~datafusion.dataframe.DataFrame`.
"""
return DataFrame(self.ctx.read_table(table.table))
def read_table(
self, table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset
) -> DataFrame:
"""Creates a :py:class:`~datafusion.dataframe.DataFrame` from a table."""
return DataFrame(self.ctx.read_table(table))

def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
"""Execute the ``plan`` and return the results."""
Expand Down
Loading