Skip to content

Commit 9b4f144

Browse files
committed
Migrate Table → TableProvider; refactor registration and access, update
docs/tests, add DataFrame view support, and improve Send/concurrency support. migrates the codebase from using `Table` to a `TableProvider`-based API, refactors registration and access paths to simplify catalog/context interactions, and updates documentation and examples. DataFrame view handling is improved (`into_view` is now public), the test-suite is expanded to cover new registration and async SQL scenarios, and `TableProvider` now supports the `Send` trait across modules for safer concurrency. Minor import cleanup and utility adjustments (including a refined `pyany_to_table_provider`) are included.
1 parent c609dfa commit 9b4f144

File tree

15 files changed

+347
-110
lines changed

15 files changed

+347
-110
lines changed

docs/source/contributor-guide/ffi.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ as performant as possible and to utilize the features of DataFusion, you may dec
3434
your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a Python library.
3535

3636
At first glance, it may appear the best way to do this is to add the ``datafusion-python``
37-
crate as a dependency, provide a ``PyTable``, and then to register it with the
37+
crate as a dependency, produce a DataFusion table in Rust, and then register it with the
3838
``SessionContext``. Unfortunately, this will not work.
3939

4040
When you produce your code as a Python library and it needs to interact with the DataFusion

docs/source/user-guide/data-sources.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,11 @@ as Delta Lake. This will require a recent version of
152152
.. code-block:: python
153153
154154
from deltalake import DeltaTable
155+
from datafusion import TableProvider
155156
156157
delta_table = DeltaTable("path_to_table")
157-
ctx.register_table_provider("my_delta_table", delta_table)
158+
provider = TableProvider.from_capsule(delta_table)
159+
ctx.register_table("my_delta_table", provider)
158160
df = ctx.table("my_delta_table")
159161
df.show()
160162

docs/source/user-guide/io/table_provider.rst

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,29 @@ A complete example can be found in the `examples folder <https://github.com/apac
4747
}
4848
}
4949
50-
Once you have this library available, in python you can register your table provider
51-
to the ``SessionContext``.
50+
Once you have this library available, you can construct a
51+
:py:class:`~datafusion.TableProvider` in Python and register it with the
52+
``SessionContext``. Table providers can be created either from the PyCapsule exposed by
53+
your Rust provider or from an existing :py:class:`~datafusion.dataframe.DataFrame`
54+
using ``TableProvider.from_view()``.
5255

5356
.. code-block:: python
5457
58+
from datafusion import SessionContext, TableProvider
59+
60+
ctx = SessionContext()
5561
provider = MyTableProvider()
56-
ctx.register_table_provider("my_table", provider)
5762
58-
ctx.table("my_table").show()
63+
capsule_provider = TableProvider.from_capsule(provider)
64+
65+
df = ctx.from_pydict({"a": [1]})
66+
view_provider = TableProvider.from_view(df)
67+
68+
ctx.register_table("capsule_table", capsule_provider)
69+
ctx.register_table("view_table", view_provider)
70+
71+
ctx.table("capsule_table").show()
72+
ctx.table("view_table").show()
73+
74+
Both ``TableProvider.from_capsule()`` and ``TableProvider.from_view()`` create
75+
table providers that can be registered with the SessionContext using ``register_table()``.

python/datafusion/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from . import functions, object_store, substrait, unparser
3434

3535
# The following imports are okay to remain as opaque to the user.
36-
from ._internal import Config
36+
from ._internal import Config, TableProvider
3737
from .catalog import Catalog, Database, Table
3838
from .col import col, column
3939
from .common import (
@@ -90,6 +90,7 @@
9090
"SessionContext",
9191
"Table",
9292
"TableFunction",
93+
"TableProvider",
9394
"WindowFrame",
9495
"WindowUDF",
9596
"catalog",

python/datafusion/catalog.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
if TYPE_CHECKING:
2828
import pyarrow as pa
2929

30+
from datafusion import TableProvider
31+
3032
try:
3133
from warnings import deprecated # Python 3.13+
3234
except ImportError:
@@ -122,8 +124,8 @@ def table(self, name: str) -> Table:
122124
"""Return the table with the given ``name`` from this schema."""
123125
return Table(self._raw_schema.table(name))
124126

125-
def register_table(self, name, table) -> None:
126-
"""Register a table provider in this schema."""
127+
def register_table(self, name, table: Table | TableProvider) -> None:
128+
"""Register a table or table provider in this schema."""
127129
if isinstance(table, Table):
128130
return self._raw_schema.register_table(name, table.table)
129131
return self._raw_schema.register_table(name, table)
@@ -219,8 +221,8 @@ def table(self, name: str) -> Table | None:
219221
"""Retrieve a specific table from this schema."""
220222
...
221223

222-
def register_table(self, name: str, table: Table) -> None: # noqa: B027
223-
"""Add a table from this schema.
224+
def register_table(self, name: str, table: Table | TableProvider) -> None: # noqa: B027
225+
"""Add a table to this schema.
224226
225227
This method is optional. If your schema provides a fixed list of tables, you do
226228
not need to implement this method.

python/datafusion/context.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import pandas as pd
4747
import polars as pl
4848

49+
from datafusion import TableProvider
4950
from datafusion.plan import ExecutionPlan, LogicalPlan
5051

5152

@@ -743,16 +744,21 @@ def register_view(self, name: str, df: DataFrame) -> None:
743744
view = df.into_view()
744745
self.ctx.register_table(name, view)
745746

746-
def register_table(self, name: str, table: Table) -> None:
747-
"""Register a :py:class: `~datafusion.catalog.Table` as a table.
747+
def register_table(self, name: str, table: Table | TableProvider) -> None:
748+
"""Register a :py:class:`~datafusion.catalog.Table` or ``TableProvider``.
748749
749-
The registered table can be referenced from SQL statement executed against.
750+
The registered table can be referenced from SQL statements executed against
751+
this context.
750752
751753
Args:
752754
name: Name of the resultant table.
753-
table: DataFusion table to add to the session context.
755+
table: DataFusion :class:`Table` or :class:`TableProvider` to add to the
756+
session context.
754757
"""
755-
self.ctx.register_table(name, table.table)
758+
if isinstance(table, Table):
759+
self.ctx.register_table(name, table.table)
760+
else:
761+
self.ctx.register_table(name, table)
756762

757763
def deregister_table(self, name: str) -> None:
758764
"""Remove a table from the session."""
@@ -772,14 +778,18 @@ def register_catalog_provider(
772778
self.ctx.register_catalog_provider(name, provider)
773779

774780
def register_table_provider(
775-
self, name: str, provider: TableProviderExportable
781+
self, name: str, provider: TableProviderExportable | TableProvider
776782
) -> None:
777783
"""Register a table provider.
778784
779-
This table provider must have a method called ``__datafusion_table_provider__``
780-
which returns a PyCapsule that exposes a ``FFI_TableProvider``.
785+
Deprecated: use :meth:`register_table` instead.
781786
"""
782-
self.ctx.register_table_provider(name, provider)
787+
warnings.warn(
788+
"register_table_provider is deprecated; use register_table",
789+
DeprecationWarning,
790+
stacklevel=2,
791+
)
792+
self.register_table(name, provider)
783793

784794
def register_udtf(self, func: TableFunction) -> None:
785795
"""Register a user defined table function."""

python/datafusion/dataframe.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from datafusion._internal import DataFrame as DataFrameInternal
4141
from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
4242
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
43+
from datafusion._internal import TableProvider as TableProviderInternal
4344
from datafusion.expr import Expr, SortExpr, sort_or_default
4445
from datafusion.plan import ExecutionPlan, LogicalPlan
4546
from datafusion.record_batch import RecordBatchStream
@@ -307,8 +308,14 @@ def __init__(self, df: DataFrameInternal) -> None:
307308
"""
308309
self.df = df
309310

310-
def into_view(self) -> pa.Table:
311-
"""Convert DataFrame as a ViewTable which can be used in register_table."""
311+
def into_view(self) -> TableProviderInternal:
312+
"""Convert ``DataFrame`` into a ``TableProvider`` view for registration.
313+
314+
This is the preferred way to obtain a view for
315+
:py:meth:`~datafusion.context.SessionContext.register_table`.
316+
``TableProvider.from_dataframe`` calls this method under the hood,
317+
and the older ``TableProvider.from_view`` helper is deprecated.
318+
"""
312319
return self.df.into_view()
313320

314321
def __getitem__(self, key: str | list[str]) -> DataFrame:

python/tests/test_context.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
SessionConfig,
2828
SessionContext,
2929
SQLOptions,
30+
TableProvider,
3031
column,
3132
literal,
3233
)
@@ -330,6 +331,35 @@ def test_deregister_table(ctx, database):
330331
assert public.names() == {"csv1", "csv2"}
331332

332333

334+
def test_register_table_from_dataframe_into_view(ctx):
335+
df = ctx.from_pydict({"a": [1, 2]})
336+
provider = df.into_view()
337+
ctx.register_table("view_tbl", provider)
338+
result = ctx.sql("SELECT * FROM view_tbl").collect()
339+
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
340+
341+
342+
def test_table_provider_from_capsule(ctx):
343+
df = ctx.from_pydict({"a": [1, 2]})
344+
provider = df.into_view()
345+
capsule = provider.__datafusion_table_provider__()
346+
provider2 = TableProvider.from_capsule(capsule)
347+
ctx.register_table("capsule_tbl", provider2)
348+
result = ctx.sql("SELECT * FROM capsule_tbl").collect()
349+
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
350+
351+
352+
def test_table_provider_from_capsule_invalid():
353+
with pytest.raises(Exception): # noqa: B017
354+
TableProvider.from_capsule(object())
355+
356+
357+
def test_register_table_with_dataframe_errors(ctx):
358+
df = ctx.from_pydict({"a": [1]})
359+
with pytest.raises(Exception): # noqa: B017
360+
ctx.register_table("bad", df)
361+
362+
333363
def test_register_dataset(ctx):
334364
# create a RecordBatch and register it as a pyarrow.dataset.Dataset
335365
batch = pa.RecordBatch.from_arrays(

src/catalog.rs

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use crate::dataset::Dataset;
1919
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, PyDataFusionResult};
20+
use crate::table::PyTableProvider;
2021
use crate::utils::{validate_pycapsule, wait_for_future};
2122
use async_trait::async_trait;
2223
use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
@@ -51,7 +52,7 @@ pub struct PySchema {
5152
#[pyclass(name = "RawTable", module = "datafusion.catalog", subclass)]
5253
#[derive(Clone)]
5354
pub struct PyTable {
54-
pub table: Arc<dyn TableProvider>,
55+
pub table: Arc<dyn TableProvider + Send>,
5556
}
5657

5758
impl From<Arc<dyn CatalogProvider>> for PyCatalog {
@@ -67,11 +68,11 @@ impl From<Arc<dyn SchemaProvider>> for PySchema {
6768
}
6869

6970
impl PyTable {
70-
pub fn new(table: Arc<dyn TableProvider>) -> Self {
71+
pub fn new(table: Arc<dyn TableProvider + Send>) -> Self {
7172
Self { table }
7273
}
7374

74-
pub fn table(&self) -> Arc<dyn TableProvider> {
75+
pub fn table(&self) -> Arc<dyn TableProvider + Send> {
7576
self.table.clone()
7677
}
7778
}
@@ -205,15 +206,18 @@ impl PySchema {
205206

206207
let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
207208
let provider: ForeignTableProvider = provider.into();
208-
Arc::new(provider) as Arc<dyn TableProvider>
209+
Arc::new(provider) as Arc<dyn TableProvider + Send>
209210
} else {
210211
match table_provider.extract::<PyTable>() {
211212
Ok(py_table) => py_table.table,
212-
Err(_) => {
213-
let py = table_provider.py();
214-
let provider = Dataset::new(&table_provider, py)?;
215-
Arc::new(provider) as Arc<dyn TableProvider>
216-
}
213+
Err(_) => match table_provider.extract::<PyTableProvider>() {
214+
Ok(py_provider) => py_provider.into_inner(),
215+
Err(_) => {
216+
let py = table_provider.py();
217+
let provider = Dataset::new(&table_provider, py)?;
218+
Arc::new(provider) as Arc<dyn TableProvider + Send>
219+
}
220+
},
217221
}
218222
};
219223

@@ -294,7 +298,7 @@ impl RustWrappedPySchemaProvider {
294298
}
295299
}
296300

297-
fn table_inner(&self, name: &str) -> PyResult<Option<Arc<dyn TableProvider>>> {
301+
fn table_inner(&self, name: &str) -> PyResult<Option<Arc<dyn TableProvider + Send>>> {
298302
Python::with_gil(|py| {
299303
let provider = self.schema_provider.bind(py);
300304
let py_table_method = provider.getattr("table")?;
@@ -305,26 +309,30 @@ impl RustWrappedPySchemaProvider {
305309
}
306310

307311
if py_table.hasattr("__datafusion_table_provider__")? {
308-
let capsule = provider.getattr("__datafusion_table_provider__")?.call0()?;
312+
let capsule = py_table.getattr("__datafusion_table_provider__")?.call0()?;
309313
let capsule = capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
310314
validate_pycapsule(capsule, "datafusion_table_provider")?;
311315

312316
let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
313317
let provider: ForeignTableProvider = provider.into();
314318

315-
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
319+
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider + Send>))
316320
} else {
317321
if let Ok(inner_table) = py_table.getattr("table") {
318322
if let Ok(inner_table) = inner_table.extract::<PyTable>() {
319323
return Ok(Some(inner_table.table));
320324
}
321325
}
322326

327+
if let Ok(py_provider) = py_table.extract::<PyTableProvider>() {
328+
return Ok(Some(py_provider.into_inner()));
329+
}
330+
323331
match py_table.extract::<PyTable>() {
324332
Ok(py_table) => Ok(Some(py_table.table)),
325333
Err(_) => {
326334
let ds = Dataset::new(&py_table, py).map_err(py_datafusion_err)?;
327-
Ok(Some(Arc::new(ds) as Arc<dyn TableProvider>))
335+
Ok(Some(Arc::new(ds) as Arc<dyn TableProvider + Send>))
328336
}
329337
}
330338
}
@@ -360,15 +368,32 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
360368
&self,
361369
name: &str,
362370
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
363-
self.table_inner(name).map_err(to_datafusion_err)
371+
// Convert from our internal Send type to the trait expected type
372+
match self.table_inner(name).map_err(to_datafusion_err)? {
373+
Some(table) => {
374+
// Safe conversion: we're widening the bounds (removing Send)
375+
let raw = Arc::into_raw(table);
376+
let wide: *const dyn TableProvider = raw as *const _;
377+
let arc = unsafe { Arc::from_raw(wide) };
378+
Ok(Some(arc))
379+
}
380+
None => Ok(None),
381+
}
364382
}
365383

366384
fn register_table(
367385
&self,
368386
name: String,
369387
table: Arc<dyn TableProvider>,
370388
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
371-
let py_table = PyTable::new(table);
389+
// Convert from trait type to our internal Send type
390+
let send_table = {
391+
let raw = Arc::into_raw(table);
392+
let send: *const (dyn TableProvider + Send) = raw as *const _;
393+
unsafe { Arc::from_raw(send) }
394+
};
395+
396+
let py_table = PyTable::new(send_table);
372397
Python::with_gil(|py| {
373398
let provider = self.schema_provider.bind(py);
374399
let _ = provider
@@ -397,7 +422,14 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
397422
// If we can turn this table provider into a `Dataset`, return it.
398423
// Otherwise, return None.
399424
let dataset = match Dataset::new(&table, py) {
400-
Ok(dataset) => Some(Arc::new(dataset) as Arc<dyn TableProvider>),
425+
Ok(dataset) => {
426+
// Convert from our internal Send type to trait expected type
427+
let send_table = Arc::new(dataset) as Arc<dyn TableProvider + Send>;
428+
let raw = Arc::into_raw(send_table);
429+
let wide: *const dyn TableProvider = raw as *const _;
430+
let arc = unsafe { Arc::from_raw(wide) };
431+
Some(arc)
432+
}
401433
Err(_) => None,
402434
};
403435

0 commit comments

Comments
 (0)