Skip to content

Commit 3da3f93

Browse files
committed
Refactors and bug fixes around TableProvider registration and
DataFrame→TableProvider conversion, plus tests and FFI/pycapsule improvements. -- Registration logic & API * Refactor of table provider registration logic for improved clarity and simpler call sites. * Remove PyTableProvider registration from an internal module (reduces surprising side effects). * Update table registration method to call `register_table` instead of `register_table_provider`. * Extend `register_table` to support `TableProviderExportable` so more provider types can be registered uniformly. * Improve error messages related to registration failures (missing PyCapsule name and DataFrame registration errors). -- DataFrame ↔ TableProvider conversions * Introduce utility functions to simplify table provider conversions and centralize conversion logic. * Rename `into_view_provider` → `to_view_provider` for clearer intent. * Fix `from_dataframe` to return the correct type and update `DataFrame.into_view` to import the correct `TableProvider`. * Remove an obsolete `dataframe_into_view` test case after the refactor. -- FFI / PyCapsule handling * Update `FFI_TableProvider` initialization to accept an optional parameter (improves FFI ergonomics). * Introduce `table_provider_from_pycapsule` utility to standardize pycapsule-based construction. * Improve the error message when a PyCapsule name is missing to help debugging. -- DeltaTable & specific integrations * Update TableProvider registration for `DeltaTable` to use the correct registration method (matches the new API surface). -- Tests, docs & minor fixes * Add tests for registering a `TableProvider` from a `DataFrame` and from a capsule to ensure conversion paths are covered. * Fix a typo in the `register_view` docstring and another typo in the error message for unsupported volatility type. * Simplify version retrieval by removing exception handling around `PackageNotFoundError` (streamlines code path).
1 parent 9b4f144 commit 3da3f93

File tree

16 files changed

+186
-164
lines changed

16 files changed

+186
-164
lines changed

docs/source/user-guide/data-sources.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ as Delta Lake. This will require a recent version of
155155
from datafusion import TableProvider
156156
157157
delta_table = DeltaTable("path_to_table")
158-
provider = TableProvider.from_capsule(delta_table)
158+
provider = TableProvider.from_capsule(delta_table.__datafusion_table_provider__())
159159
ctx.register_table("my_delta_table", provider)
160160
df = ctx.table("my_delta_table")
161161
df.show()

docs/source/user-guide/io/table_provider.rst

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ A complete example can be found in the `examples folder <https://github.com/apac
3939
) -> PyResult<Bound<'py, PyCapsule>> {
4040
let name = CString::new("datafusion_table_provider").unwrap();
4141
42-
let provider = Arc::new(self.clone())
43-
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
44-
let provider = FFI_TableProvider::new(Arc::new(provider), false);
42+
let provider = Arc::new(self.clone());
43+
let provider = FFI_TableProvider::new(provider, false, None);
4544
4645
PyCapsule::new_bound(py, provider, Some(name.clone()))
4746
}
@@ -50,8 +49,10 @@ A complete example can be found in the `examples folder <https://github.com/apac
5049
Once you have this library available, you can construct a
5150
:py:class:`~datafusion.TableProvider` in Python and register it with the
5251
``SessionContext``. Table providers can be created either from the PyCapsule exposed by
53-
your Rust provider or from an existing :py:class:`~datafusion.dataframe.DataFrame`
54-
using ``TableProvider.from_view()``.
52+
your Rust provider or from an existing :py:class:`~datafusion.dataframe.DataFrame`.
53+
Call the provider's ``__datafusion_table_provider__()`` method to obtain the capsule
54+
before constructing a ``TableProvider``. The ``TableProvider.from_view()`` helper is
55+
deprecated; instead use ``TableProvider.from_dataframe()`` or ``DataFrame.into_view()``.
5556

5657
.. code-block:: python
5758
@@ -60,16 +61,18 @@ using ``TableProvider.from_view()``.
6061
ctx = SessionContext()
6162
provider = MyTableProvider()
6263
63-
capsule_provider = TableProvider.from_capsule(provider)
64+
capsule = provider.__datafusion_table_provider__()
65+
capsule_provider = TableProvider.from_capsule(capsule)
6466
6567
df = ctx.from_pydict({"a": [1]})
66-
view_provider = TableProvider.from_view(df)
68+
view_provider = TableProvider.from_dataframe(df)
69+
# or: view_provider = df.into_view()
6770
6871
ctx.register_table("capsule_table", capsule_provider)
6972
ctx.register_table("view_table", view_provider)
7073
7174
ctx.table("capsule_table").show()
7275
ctx.table("view_table").show()
7376
74-
Both ``TableProvider.from_capsule()`` and ``TableProvider.from_view()`` create
77+
Both ``TableProvider.from_capsule()`` and ``TableProvider.from_dataframe()`` create
7578
table providers that can be registered with the SessionContext using ``register_table()``.

examples/datafusion-ffi-example/python/tests/_test_table_function.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def test_ffi_table_function_call_directly():
5353
table_udtf = udtf(table_func, "my_table_func")
5454

5555
my_table = table_udtf()
56-
ctx.register_table_provider("t", my_table)
56+
ctx.register_table("t", my_table)
5757
result = ctx.table("t").collect()
5858

5959
assert len(result) == 2

examples/datafusion-ffi-example/python/tests/_test_table_provider.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
from __future__ import annotations
1919

2020
import pyarrow as pa
21-
from datafusion import SessionContext
21+
from datafusion import SessionContext, TableProvider
2222
from datafusion_ffi_example import MyTableProvider
2323

2424

2525
def test_table_loading():
2626
ctx = SessionContext()
2727
table = MyTableProvider(3, 2, 4)
28-
ctx.register_table_provider("t", table)
28+
ctx.register_table(
29+
"t", TableProvider.from_capsule(table.__datafusion_table_provider__())
30+
)
2931
result = ctx.table("t").collect()
3032

3133
assert len(result) == 4

python/datafusion/catalog.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import pyarrow as pa
2929

3030
from datafusion import TableProvider
31+
from datafusion.context import TableProviderExportable
3132

3233
try:
3334
from warnings import deprecated # Python 3.13+
@@ -124,8 +125,14 @@ def table(self, name: str) -> Table:
124125
"""Return the table with the given ``name`` from this schema."""
125126
return Table(self._raw_schema.table(name))
126127

127-
def register_table(self, name, table: Table | TableProvider) -> None:
128-
"""Register a table or table provider in this schema."""
128+
def register_table(
129+
self, name, table: Table | TableProvider | TableProviderExportable
130+
) -> None:
131+
"""Register a table or table provider in this schema.
132+
133+
Objects implementing ``__datafusion_table_provider__`` are also supported
134+
and treated as :class:`TableProvider` instances.
135+
"""
129136
if isinstance(table, Table):
130137
return self._raw_schema.register_table(name, table.table)
131138
return self._raw_schema.register_table(name, table)
@@ -221,11 +228,16 @@ def table(self, name: str) -> Table | None:
221228
"""Retrieve a specific table from this schema."""
222229
...
223230

224-
def register_table(self, name: str, table: Table | TableProvider) -> None: # noqa: B027
231+
def register_table( # noqa: B027
232+
self, name: str, table: Table | TableProvider | TableProviderExportable
233+
) -> None:
225234
"""Add a table to this schema.
226235
227236
This method is optional. If your schema provides a fixed list of tables, you do
228237
not need to implement this method.
238+
239+
Objects implementing ``__datafusion_table_provider__`` are also supported
240+
and treated as :class:`TableProvider` instances.
229241
"""
230242

231243
def deregister_table(self, name, cascade: bool) -> None: # noqa: B027

python/datafusion/context.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,7 @@ def from_polars(self, data: pl.DataFrame, name: str | None = None) -> DataFrame:
735735
# https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
736736
# is the discussion on how we arrived at adding register_view
737737
def register_view(self, name: str, df: DataFrame) -> None:
738-
"""Register a :py:class: `~datafusion.detaframe.DataFrame` as a view.
738+
"""Register a :py:class:`~datafusion.dataframe.DataFrame` as a view.
739739
740740
Args:
741741
name (str): The name to register the view under.
@@ -744,16 +744,26 @@ def register_view(self, name: str, df: DataFrame) -> None:
744744
view = df.into_view()
745745
self.ctx.register_table(name, view)
746746

747-
def register_table(self, name: str, table: Table | TableProvider) -> None:
747+
def register_table(
748+
self, name: str, table: Table | TableProvider | TableProviderExportable
749+
) -> None:
748750
"""Register a :py:class:`~datafusion.catalog.Table` or ``TableProvider``.
749751
750752
The registered table can be referenced from SQL statements executed against
751753
this context.
752754
755+
Plain :py:class:`~datafusion.dataframe.DataFrame` objects are not supported;
756+
convert them first with :meth:`datafusion.dataframe.DataFrame.into_view` or
757+
:meth:`datafusion.catalog.TableProvider.from_dataframe`.
758+
759+
Objects implementing ``__datafusion_table_provider__`` are also supported
760+
and treated as :class:`~datafusion.catalog.TableProvider` instances.
761+
753762
Args:
754763
name: Name of the resultant table.
755-
table: DataFusion :class:`Table` or :class:`TableProvider` to add to the
756-
session context.
764+
table: DataFusion :class:`Table`, :class:`TableProvider`, or any object
765+
implementing ``__datafusion_table_provider__`` to add to the session
766+
context.
757767
"""
758768
if isinstance(table, Table):
759769
self.ctx.register_table(name, table.table)
@@ -778,11 +788,14 @@ def register_catalog_provider(
778788
self.ctx.register_catalog_provider(name, provider)
779789

780790
def register_table_provider(
781-
self, name: str, provider: TableProviderExportable | TableProvider
791+
self, name: str, provider: Table | TableProvider | TableProviderExportable
782792
) -> None:
783793
"""Register a table provider.
784794
785795
Deprecated: use :meth:`register_table` instead.
796+
797+
Objects implementing ``__datafusion_table_provider__`` are also supported
798+
and treated as :class:`~datafusion.catalog.TableProvider` instances.
786799
"""
787800
warnings.warn(
788801
"register_table_provider is deprecated; use register_table",

python/datafusion/dataframe.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
from datafusion._internal import DataFrame as DataFrameInternal
4141
from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
4242
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
43-
from datafusion._internal import TableProvider as TableProviderInternal
4443
from datafusion.expr import Expr, SortExpr, sort_or_default
4544
from datafusion.plan import ExecutionPlan, LogicalPlan
4645
from datafusion.record_batch import RecordBatchStream
@@ -53,6 +52,7 @@
5352
import polars as pl
5453
import pyarrow as pa
5554

55+
from datafusion._internal import TableProvider
5656
from datafusion._internal import expr as expr_internal
5757

5858
from enum import Enum
@@ -308,7 +308,7 @@ def __init__(self, df: DataFrameInternal) -> None:
308308
"""
309309
self.df = df
310310

311-
def into_view(self) -> TableProviderInternal:
311+
def into_view(self) -> TableProvider:
312312
"""Convert ``DataFrame`` into a ``TableProvider`` view for registration.
313313
314314
This is the preferred way to obtain a view for

python/tests/test_context.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,16 +349,45 @@ def test_table_provider_from_capsule(ctx):
349349
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
350350

351351

352+
def test_table_provider_from_dataframe(ctx):
353+
df = ctx.from_pydict({"a": [1, 2]}).df
354+
provider = TableProvider.from_dataframe(df)
355+
ctx.register_table("from_dataframe_tbl", provider)
356+
result = ctx.sql("SELECT * FROM from_dataframe_tbl").collect()
357+
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
358+
359+
360+
def test_register_table_capsule_direct(ctx):
361+
df = ctx.from_pydict({"a": [1, 2]})
362+
provider = df.into_view()
363+
364+
class CapsuleProvider:
365+
def __init__(self, inner):
366+
self._inner = inner
367+
368+
def __datafusion_table_provider__(self):
369+
return self._inner.__datafusion_table_provider__()
370+
371+
ctx.register_table("capsule_direct_tbl", CapsuleProvider(provider))
372+
result = ctx.sql("SELECT * FROM capsule_direct_tbl").collect()
373+
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
374+
375+
352376
def test_table_provider_from_capsule_invalid():
353377
with pytest.raises(Exception): # noqa: B017
354378
TableProvider.from_capsule(object())
355379

356380

357381
def test_register_table_with_dataframe_errors(ctx):
358382
df = ctx.from_pydict({"a": [1]})
359-
with pytest.raises(Exception): # noqa: B017
383+
with pytest.raises(Exception) as exc_info: # noqa: B017
360384
ctx.register_table("bad", df)
361385

386+
assert (
387+
str(exc_info.value)
388+
== 'Expected a Table or TableProvider. Convert DataFrames with "DataFrame.into_view()" or "TableProvider.from_dataframe()".'
389+
)
390+
362391

363392
def test_register_dataset(ctx):
364393
# create a RecordBatch and register it as a pyarrow.dataset.Dataset

src/catalog.rs

Lines changed: 18 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
use crate::dataset::Dataset;
1919
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, PyDataFusionResult};
2020
use crate::table::PyTableProvider;
21-
use crate::utils::{validate_pycapsule, wait_for_future};
21+
use crate::utils::{
22+
table_provider_from_pycapsule, table_provider_send_to_table_provider, table_provider_to_send,
23+
validate_pycapsule, wait_for_future,
24+
};
2225
use async_trait::async_trait;
2326
use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
2427
use datafusion::common::DataFusionError;
@@ -28,7 +31,6 @@ use datafusion::{
2831
datasource::{TableProvider, TableType},
2932
};
3033
use datafusion_ffi::schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider};
31-
use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
3234
use pyo3::exceptions::PyKeyError;
3335
use pyo3::prelude::*;
3436
use pyo3::types::PyCapsule;
@@ -197,28 +199,16 @@ impl PySchema {
197199
}
198200

199201
fn register_table(&self, name: &str, table_provider: Bound<'_, PyAny>) -> PyResult<()> {
200-
let provider = if table_provider.hasattr("__datafusion_table_provider__")? {
201-
let capsule = table_provider
202-
.getattr("__datafusion_table_provider__")?
203-
.call0()?;
204-
let capsule = capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
205-
validate_pycapsule(capsule, "datafusion_table_provider")?;
206-
207-
let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
208-
let provider: ForeignTableProvider = provider.into();
209-
Arc::new(provider) as Arc<dyn TableProvider + Send>
202+
let provider = if let Ok(py_table) = table_provider.extract::<PyTable>() {
203+
py_table.table
204+
} else if let Ok(py_provider) = table_provider.extract::<PyTableProvider>() {
205+
py_provider.into_inner()
206+
} else if let Some(provider) = table_provider_from_pycapsule(&table_provider)? {
207+
provider
210208
} else {
211-
match table_provider.extract::<PyTable>() {
212-
Ok(py_table) => py_table.table,
213-
Err(_) => match table_provider.extract::<PyTableProvider>() {
214-
Ok(py_provider) => py_provider.into_inner(),
215-
Err(_) => {
216-
let py = table_provider.py();
217-
let provider = Dataset::new(&table_provider, py)?;
218-
Arc::new(provider) as Arc<dyn TableProvider + Send>
219-
}
220-
},
221-
}
209+
let py = table_provider.py();
210+
let provider = Dataset::new(&table_provider, py)?;
211+
Arc::new(provider) as Arc<dyn TableProvider + Send>
222212
};
223213

224214
let _ = self
@@ -308,15 +298,8 @@ impl RustWrappedPySchemaProvider {
308298
return Ok(None);
309299
}
310300

311-
if py_table.hasattr("__datafusion_table_provider__")? {
312-
let capsule = py_table.getattr("__datafusion_table_provider__")?.call0()?;
313-
let capsule = capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
314-
validate_pycapsule(capsule, "datafusion_table_provider")?;
315-
316-
let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
317-
let provider: ForeignTableProvider = provider.into();
318-
319-
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider + Send>))
301+
if let Some(provider) = table_provider_from_pycapsule(&py_table)? {
302+
Ok(Some(provider))
320303
} else {
321304
if let Ok(inner_table) = py_table.getattr("table") {
322305
if let Ok(inner_table) = inner_table.extract::<PyTable>() {
@@ -370,13 +353,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
370353
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
371354
// Convert from our internal Send type to the trait expected type
372355
match self.table_inner(name).map_err(to_datafusion_err)? {
373-
Some(table) => {
374-
// Safe conversion: we're widening the bounds (removing Send)
375-
let raw = Arc::into_raw(table);
376-
let wide: *const dyn TableProvider = raw as *const _;
377-
let arc = unsafe { Arc::from_raw(wide) };
378-
Ok(Some(arc))
379-
}
356+
Some(table) => Ok(Some(table_provider_send_to_table_provider(table))),
380357
None => Ok(None),
381358
}
382359
}
@@ -387,11 +364,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
387364
table: Arc<dyn TableProvider>,
388365
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
389366
// Convert from trait type to our internal Send type
390-
let send_table = {
391-
let raw = Arc::into_raw(table);
392-
let send: *const (dyn TableProvider + Send) = raw as *const _;
393-
unsafe { Arc::from_raw(send) }
394-
};
367+
let send_table = table_provider_to_send(table);
395368

396369
let py_table = PyTable::new(send_table);
397370
Python::with_gil(|py| {
@@ -423,12 +396,8 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
423396
// Otherwise, return None.
424397
let dataset = match Dataset::new(&table, py) {
425398
Ok(dataset) => {
426-
// Convert from our internal Send type to trait expected type
427399
let send_table = Arc::new(dataset) as Arc<dyn TableProvider + Send>;
428-
let raw = Arc::into_raw(send_table);
429-
let wide: *const dyn TableProvider = raw as *const _;
430-
let arc = unsafe { Arc::from_raw(wide) };
431-
Some(arc)
400+
Some(table_provider_send_to_table_provider(send_table))
432401
}
433402
Err(_) => None,
434403
};

0 commit comments

Comments
 (0)