Skip to content

Commit 0154c03

Browse files
committed
CHANGES to review
1 parent f08d5b0 commit 0154c03

File tree

23 files changed

+383
-278
lines changed

23 files changed

+383
-278
lines changed

docs/source/conf.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ def autoapi_skip_member_fn(app, what, name, obj, skip, options) -> bool: # noqa
9191
("method", "datafusion.context.SessionContext.tables"),
9292
("method", "datafusion.dataframe.DataFrame.unnest_column"),
9393
]
94+
# Explicitly skip certain members listed above. These are either
95+
# re-exports, duplicate module-level documentation, deprecated
96+
# API surfaces, or private variables that would otherwise appear
97+
# in the generated docs and cause confusing duplication.
98+
# Keeping this explicit list avoids surprising entries in the
99+
# AutoAPI output and gives us a single place to opt-out items
100+
# when we intentionally hide them from the docs.
94101
if (what, name) in skip_contents:
95102
skip = True
96103

docs/source/contributor-guide/ffi.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ as performant as possible and to utilize the features of DataFusion, you may dec
3434
your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a Python library.
3535

3636
At first glance, it may appear the best way to do this is to add the ``datafusion-python``
37-
crate as a dependency, provide a ``PyTable``, and then to register it with the
37+
crate as a dependency, provide a ``PyTable``, and then to register it with the
3838
``SessionContext``. Unfortunately, this will not work.
3939

4040
When you produce your code as a Python library and it needs to interact with the DataFusion

docs/source/user-guide/data-sources.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,11 @@ as Delta Lake. This will require a recent version of
154154
from deltalake import DeltaTable
155155
156156
delta_table = DeltaTable("path_to_table")
157-
ctx.register_table_provider("my_delta_table", delta_table)
157+
ctx.register_table("my_delta_table", delta_table)
158158
df = ctx.table("my_delta_table")
159159
df.show()
160160
161-
On older versions of ``deltalake`` (prior to 0.22) you can use the
161+
On older versions of ``deltalake`` (prior to 0.22) you can use the
162162
`Arrow DataSet <https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html>`_
163163
interface to import to DataFusion, but this does not support features such as filter push down
164164
which can lead to a significant performance difference.

docs/source/user-guide/io/table_provider.rst

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,26 @@ A complete example can be found in the `examples folder <https://github.com/apac
3737
&self,
3838
py: Python<'py>,
3939
) -> PyResult<Bound<'py, PyCapsule>> {
40-
let name = CString::new("datafusion_table_provider").unwrap();
40+
let name = cr"datafusion_table_provider".into();
4141
42-
let provider = Arc::new(self.clone())
43-
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
44-
let provider = FFI_TableProvider::new(Arc::new(provider), false);
42+
let provider = Arc::new(self.clone());
43+
let provider = FFI_TableProvider::new(provider, false, None);
4544
4645
PyCapsule::new_bound(py, provider, Some(name.clone()))
4746
}
4847
}
4948
50-
Once you have this library available, in python you can register your table provider
51-
to the ``SessionContext``.
49+
Once you have this library available, you can construct a
50+
:py:class:`~datafusion.Table` in Python and register it with the
51+
``SessionContext``.
5252

5353
.. code-block:: python
5454
55+
from datafusion import SessionContext, Table
56+
57+
ctx = SessionContext()
5558
provider = MyTableProvider()
56-
ctx.register_table_provider("my_table", provider)
5759
58-
ctx.table("my_table").show()
60+
ctx.register_table("capsule_table", provider)
61+
62+
ctx.table("capsule_table").show()

examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ def test_catalog_provider():
3636

3737
my_catalog_schemas = my_catalog.names()
3838
assert expected_schema_name in my_catalog_schemas
39-
my_database = my_catalog.database(expected_schema_name)
40-
assert expected_table_name in my_database.names()
41-
my_table = my_database.table(expected_table_name)
39+
my_schema = my_catalog.schema(expected_schema_name)
40+
assert expected_table_name in my_schema.names()
41+
my_table = my_schema.table(expected_table_name)
4242
assert expected_table_columns == my_table.schema.names
4343

4444
result = ctx.table(

examples/datafusion-ffi-example/python/tests/_test_table_function.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def test_ffi_table_function_call_directly():
5353
table_udtf = udtf(table_func, "my_table_func")
5454

5555
my_table = table_udtf()
56-
ctx.register_table_provider("t", my_table)
56+
ctx.register_table("t", my_table)
5757
result = ctx.table("t").collect()
5858

5959
assert len(result) == 2

examples/datafusion-ffi-example/python/tests/_test_table_provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
def test_table_loading():
2626
ctx = SessionContext()
2727
table = MyTableProvider(3, 2, 4)
28-
ctx.register_table_provider("t", table)
28+
ctx.register_table("t", table)
2929
result = ctx.table("t").collect()
3030

3131
assert len(result) == 4

python/datafusion/__init__.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,16 @@
2828
try:
2929
import importlib.metadata as importlib_metadata
3030
except ImportError:
31-
import importlib_metadata
31+
import importlib_metadata # type: ignore[import]
3232

33+
# Public submodules
3334
from . import functions, object_store, substrait, unparser
3435

3536
# The following imports are okay to remain as opaque to the user.
3637
from ._internal import Config
3738
from .catalog import Catalog, Database, Table
3839
from .col import col, column
39-
from .common import (
40-
DFSchema,
41-
)
40+
from .common import DFSchema
4241
from .context import (
4342
RuntimeEnvBuilder,
4443
SessionConfig,
@@ -47,10 +46,7 @@
4746
)
4847
from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
4948
from .dataframe_formatter import configure_formatter
50-
from .expr import (
51-
Expr,
52-
WindowFrame,
53-
)
49+
from .expr import Expr, WindowFrame
5450
from .io import read_avro, read_csv, read_json, read_parquet
5551
from .plan import ExecutionPlan, LogicalPlan
5652
from .record_batch import RecordBatch, RecordBatchStream

python/datafusion/catalog.py

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
from __future__ import annotations
2121

2222
from abc import ABC, abstractmethod
23-
from typing import TYPE_CHECKING, Protocol
23+
from typing import TYPE_CHECKING, Any, Protocol
2424

2525
import datafusion._internal as df_internal
2626

2727
if TYPE_CHECKING:
2828
import pyarrow as pa
2929

30+
from datafusion import DataFrame
31+
from datafusion.context import TableProviderExportable
32+
3033
try:
3134
from warnings import deprecated # Python 3.13+
3235
except ImportError:
@@ -82,7 +85,11 @@ def database(self, name: str = "public") -> Schema:
8285
"""Returns the database with the given ``name`` from this catalog."""
8386
return self.schema(name)
8487

85-
def register_schema(self, name, schema) -> Schema | None:
88+
def register_schema(
89+
self,
90+
name: str,
91+
schema: Schema | SchemaProvider | SchemaProviderExportable,
92+
) -> Schema | None:
8693
"""Register a schema with this catalog."""
8794
if isinstance(schema, Schema):
8895
return self.catalog.register_schema(name, schema._raw_schema)
@@ -122,10 +129,12 @@ def table(self, name: str) -> Table:
122129
"""Return the table with the given ``name`` from this schema."""
123130
return Table(self._raw_schema.table(name))
124131

125-
def register_table(self, name, table) -> None:
126-
"""Register a table provider in this schema."""
127-
if isinstance(table, Table):
128-
return self._raw_schema.register_table(name, table.table)
132+
def register_table(
133+
self,
134+
name: str,
135+
table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
136+
) -> None:
137+
"""Register a table in this schema."""
129138
return self._raw_schema.register_table(name, table)
130139

131140
def deregister_table(self, name: str) -> None:
@@ -139,30 +148,45 @@ class Database(Schema):
139148

140149

141150
class Table:
142-
"""DataFusion table."""
151+
"""A DataFusion table.
143152
144-
def __init__(self, table: df_internal.catalog.RawTable) -> None:
145-
"""This constructor is not typically called by the end user."""
146-
self.table = table
153+
Internally we currently support the following types of tables:
154+
155+
- Tables created using built-in DataFusion methods, such as
156+
reading from CSV or Parquet
157+
- pyarrow datasets
158+
- DataFusion DataFrames, which will be converted into a view
159+
- Externally provided tables implemented with the FFI PyCapsule
160+
interface (advanced)
161+
"""
162+
163+
__slots__ = ("_inner",)
164+
165+
def __init__(
166+
self, table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset
167+
) -> None:
168+
"""Constructor."""
169+
self._inner = df_internal.catalog.RawTable(table)
147170

148171
def __repr__(self) -> str:
149172
"""Print a string representation of the table."""
150-
return self.table.__repr__()
173+
return repr(self._inner)
151174

152175
@staticmethod
176+
@deprecated("Use Table() constructor instead.")
153177
def from_dataset(dataset: pa.dataset.Dataset) -> Table:
154-
"""Turn a pyarrow Dataset into a Table."""
155-
return Table(df_internal.catalog.RawTable.from_dataset(dataset))
178+
"""Turn a :mod:`pyarrow.dataset` ``Dataset`` into a :class:`Table`."""
179+
return Table(dataset)
156180

157181
@property
158182
def schema(self) -> pa.Schema:
159183
"""Returns the schema associated with this table."""
160-
return self.table.schema
184+
return self._inner.schema
161185

162186
@property
163187
def kind(self) -> str:
164188
"""Returns the kind of table."""
165-
return self.table.kind
189+
return self._inner.kind
166190

167191

168192
class CatalogProvider(ABC):
@@ -219,14 +243,16 @@ def table(self, name: str) -> Table | None:
219243
"""Retrieve a specific table from this schema."""
220244
...
221245

222-
def register_table(self, name: str, table: Table) -> None: # noqa: B027
223-
"""Add a table from this schema.
246+
def register_table( # noqa: B027
247+
self, name: str, table: Table | TableProviderExportable | Any
248+
) -> None:
249+
"""Add a table to this schema.
224250
225251
This method is optional. If your schema provides a fixed list of tables, you do
226252
not need to implement this method.
227253
"""
228254

229-
def deregister_table(self, name, cascade: bool) -> None: # noqa: B027
255+
def deregister_table(self, name: str, cascade: bool) -> None: # noqa: B027
230256
"""Remove a table from this schema.
231257
232258
This method is optional. If your schema provides a fixed list of tables, you do

python/datafusion/context.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@
2929

3030
import pyarrow as pa
3131

32-
from datafusion.catalog import Catalog, CatalogProvider, Table
32+
from datafusion.catalog import Catalog
3333
from datafusion.dataframe import DataFrame
34-
from datafusion.expr import SortKey, sort_list_to_raw_sort_list
34+
from datafusion.expr import sort_list_to_raw_sort_list
3535
from datafusion.record_batch import RecordBatchStream
36-
from datafusion.user_defined import AggregateUDF, ScalarUDF, TableFunction, WindowUDF
3736

3837
from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
3938
from ._internal import SessionConfig as SessionConfigInternal
@@ -48,7 +47,15 @@
4847
import pandas as pd
4948
import polars as pl # type: ignore[import]
5049

50+
from datafusion.catalog import CatalogProvider, Table
51+
from datafusion.expr import SortKey
5152
from datafusion.plan import ExecutionPlan, LogicalPlan
53+
from datafusion.user_defined import (
54+
AggregateUDF,
55+
ScalarUDF,
56+
TableFunction,
57+
WindowUDF,
58+
)
5259

5360

5461
class ArrowStreamExportable(Protocol):
@@ -733,7 +740,7 @@ def from_polars(self, data: pl.DataFrame, name: str | None = None) -> DataFrame:
733740
# https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
734741
# is the discussion on how we arrived at adding register_view
735742
def register_view(self, name: str, df: DataFrame) -> None:
736-
"""Register a :py:class: `~datafusion.detaframe.DataFrame` as a view.
743+
"""Register a :py:class:`~datafusion.dataframe.DataFrame` as a view.
737744
738745
Args:
739746
name (str): The name to register the view under.
@@ -742,16 +749,21 @@ def register_view(self, name: str, df: DataFrame) -> None:
742749
view = df.into_view()
743750
self.ctx.register_table(name, view)
744751

745-
def register_table(self, name: str, table: Table) -> None:
746-
"""Register a :py:class: `~datafusion.catalog.Table` as a table.
752+
def register_table(
753+
self,
754+
name: str,
755+
table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
756+
) -> None:
757+
"""Register a :py:class:`~datafusion.Table` with this context.
747758
748-
The registered table can be referenced from SQL statement executed against.
759+
The registered table can be referenced from SQL statements executed against
760+
this context.
749761
750762
Args:
751763
name: Name of the resultant table.
752-
table: DataFusion table to add to the session context.
764+
table: Any object that can be converted into a :class:`Table`.
753765
"""
754-
self.ctx.register_table(name, table.table)
766+
self.ctx.register_table(name, table)
755767

756768
def deregister_table(self, name: str) -> None:
757769
"""Remove a table from the session."""
@@ -770,15 +782,17 @@ def register_catalog_provider(
770782
else:
771783
self.ctx.register_catalog_provider(name, provider)
772784

785+
@deprecated("Use register_table() instead.")
773786
def register_table_provider(
774-
self, name: str, provider: TableProviderExportable
787+
self,
788+
name: str,
789+
provider: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset,
775790
) -> None:
776791
"""Register a table provider.
777792
778-
This table provider must have a method called ``__datafusion_table_provider__``
779-
which returns a PyCapsule that exposes a ``FFI_TableProvider``.
793+
Deprecated: use :meth:`register_table` instead.
780794
"""
781-
self.ctx.register_table_provider(name, provider)
795+
self.register_table(name, provider)
782796

783797
def register_udtf(self, func: TableFunction) -> None:
784798
"""Register a user defined table function."""
@@ -1170,7 +1184,7 @@ def read_table(self, table: Table) -> DataFrame:
11701184
:py:class:`~datafusion.catalog.ListingTable`, create a
11711185
:py:class:`~datafusion.dataframe.DataFrame`.
11721186
"""
1173-
return DataFrame(self.ctx.read_table(table.table))
1187+
return DataFrame(self.ctx.read_table(table._inner))
11741188

11751189
def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
11761190
"""Execute the ``plan`` and return the results."""

0 commit comments

Comments
 (0)