Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f9cafb8
refactor: improve DataFrame expression handling, type checking, and docs
kosiew Sep 1, 2025
91167b0
refactor: unify expression and sorting logic; improve docs and error …
kosiew Sep 1, 2025
54687a2
feat: add ensure_expr helper for validation; refine expression handli…
kosiew Sep 1, 2025
f591617
Refactor and enhance expression handling, test coverage, and document…
kosiew Sep 2, 2025
31a648f
refactor: update docstring for sort_or_default function to clarify it…
kosiew Sep 2, 2025
37307b0
fix Ruff errors
kosiew Sep 2, 2025
05cd237
refactor: update type hints to use typing.Union for better clarity an…
kosiew Sep 2, 2025
28619d9
fix Ruff errors
kosiew Sep 2, 2025
9adbf4f
refactor: simplify type hints by removing unnecessary imports for typ…
kosiew Sep 2, 2025
0a27617
refactor: update type hints for rex_type and types methods to improve…
kosiew Sep 2, 2025
92bc68e
refactor: remove unnecessary type ignore comments from rex_type and t…
kosiew Sep 2, 2025
7258428
docs: update section title for clarity on DataFrame method arguments
kosiew Sep 15, 2025
c3e2a04
docs: clarify description of DataFrame methods accepting column names
kosiew Sep 15, 2025
38043fa
docs: add note to clarify function documentation reference for DataFr…
kosiew Sep 15, 2025
d78fdef
docs: remove outdated information about predicate acceptance in DataF…
kosiew Sep 15, 2025
992d619
refactor: simplify type hint for expr_list parameter in expr_list_to_…
kosiew Sep 15, 2025
bd0f57e
docs: clarify usage of datafusion.col and datafusion.lit in DataFrame…
kosiew Sep 15, 2025
2b813bf
docs: clarify usage of col() and lit() in DataFrame filter examples
kosiew Sep 15, 2025
93c81fa
Merge branch 'main' into col-1214
kosiew Sep 15, 2025
9aa9985
Fix ruff errors
kosiew Sep 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/source/user-guide/dataframe/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,53 @@ DataFusion's DataFrame API offers a wide range of operations:
# Drop columns
df = df.drop("temporary_column")

String Columns and Expressions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the title here is misleading. "String Columns" to me would mean columns that contain string values. I think maybe we should call this something like "Function arguments taking column names" or "Column names as function arguments"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will correct this.

------------------------------

Some ``DataFrame`` methods accept plain strings when an argument refers to an
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend "plain strings" -> "column names"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will correct this.

existing column. These include:

* :py:meth:`~datafusion.DataFrame.select`
* :py:meth:`~datafusion.DataFrame.sort`
* :py:meth:`~datafusion.DataFrame.drop`
* :py:meth:`~datafusion.DataFrame.join` (``on`` argument)
* :py:meth:`~datafusion.DataFrame.aggregate` (grouping columns)

Note that :py:meth:`~datafusion.DataFrame.join_on` expects ``col()``/``column()`` expressions rather than plain strings.

For such methods, you can pass column names directly:

.. code-block:: python

from datafusion import col, functions as f

df.sort('id')
df.aggregate('id', [f.count(col('value'))])

The same operation can also be written with explicit column expressions, using either ``col()`` or ``column()``:

.. code-block:: python

from datafusion import col, column, functions as f

df.sort(col('id'))
df.aggregate(column('id'), [f.count(col('value'))])

Note that ``column()`` is an alias of ``col()``, so you can use either name; the example above shows both in action.

Whenever an argument represents an expression—such as in
:py:meth:`~datafusion.DataFrame.filter` or
:py:meth:`~datafusion.DataFrame.with_column`—use ``col()`` to reference columns
and wrap constant values with ``lit()`` (also available as ``literal()``):

.. code-block:: python

from datafusion import col, lit
df.filter(col('age') > lit(21))

Without ``lit()`` DataFusion would treat ``21`` as a column name rather than a
constant value.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this statement true? df.filter(col('age') > 21) would treat 21 as a column name? I think that's a change in how the comparison operator works.

Copy link
Contributor Author

@kosiew kosiew Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right.

The comparison operators on Expr automatically convert any non-Expr value into a literal expression.
I will correct the documentation


Terminal Operations
-------------------

Expand Down
63 changes: 39 additions & 24 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,31 @@
import warnings
from typing import TYPE_CHECKING, Any, Protocol

import pyarrow as pa

try:
from warnings import deprecated # Python 3.13+
except ImportError:
from typing_extensions import deprecated # Python 3.12

import pyarrow as pa

from datafusion.catalog import Catalog, CatalogProvider, Table
from datafusion.dataframe import DataFrame
from datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list
from datafusion.expr import SortKey, sort_list_to_raw_sort_list
from datafusion.record_batch import RecordBatchStream
from datafusion.user_defined import AggregateUDF, ScalarUDF, TableFunction, WindowUDF

from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
from ._internal import SessionConfig as SessionConfigInternal
from ._internal import SessionContext as SessionContextInternal
from ._internal import SQLOptions as SQLOptionsInternal
from ._internal import expr as expr_internal

if TYPE_CHECKING:
import pathlib
from collections.abc import Sequence

import pandas as pd
import polars as pl
import polars as pl # type: ignore[import]

from datafusion.plan import ExecutionPlan, LogicalPlan

Expand Down Expand Up @@ -553,7 +555,7 @@ def register_listing_table(
table_partition_cols: list[tuple[str, str | pa.DataType]] | None = None,
file_extension: str = ".parquet",
schema: pa.Schema | None = None,
file_sort_order: list[list[Expr | SortExpr]] | None = None,
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
) -> None:
"""Register multiple files as a single table.

Expand All @@ -567,23 +569,20 @@ def register_listing_table(
table_partition_cols: Partition columns.
file_extension: File extension of the provided table.
schema: The data source schema.
file_sort_order: Sort order for the file.
file_sort_order: Sort order for the file. Each sort key can be
specified as a column name (``str``), an expression
(``Expr``), or a ``SortExpr``.
"""
if table_partition_cols is None:
table_partition_cols = []
table_partition_cols = self._convert_table_partition_cols(table_partition_cols)
file_sort_order_raw = (
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
if file_sort_order is not None
else None
)
self.ctx.register_listing_table(
name,
str(path),
table_partition_cols,
file_extension,
schema,
file_sort_order_raw,
self._convert_file_sort_order(file_sort_order),
)

def sql(self, query: str, options: SQLOptions | None = None) -> DataFrame:
Expand Down Expand Up @@ -808,7 +807,7 @@ def register_parquet(
file_extension: str = ".parquet",
skip_metadata: bool = True,
schema: pa.Schema | None = None,
file_sort_order: list[list[SortExpr]] | None = None,
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
) -> None:
"""Register a Parquet file as a table.

Expand All @@ -827,7 +826,9 @@ def register_parquet(
that may be in the file schema. This can help avoid schema
conflicts due to metadata.
schema: The data source schema.
file_sort_order: Sort order for the file.
file_sort_order: Sort order for the file. Each sort key can be
specified as a column name (``str``), an expression
(``Expr``), or a ``SortExpr``.
"""
if table_partition_cols is None:
table_partition_cols = []
Expand All @@ -840,9 +841,7 @@ def register_parquet(
file_extension,
skip_metadata,
schema,
[sort_list_to_raw_sort_list(exprs) for exprs in file_sort_order]
if file_sort_order is not None
else None,
self._convert_file_sort_order(file_sort_order),
)

def register_csv(
Expand Down Expand Up @@ -1099,7 +1098,7 @@ def read_parquet(
file_extension: str = ".parquet",
skip_metadata: bool = True,
schema: pa.Schema | None = None,
file_sort_order: list[list[Expr | SortExpr]] | None = None,
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
) -> DataFrame:
"""Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.

Expand All @@ -1116,19 +1115,17 @@ def read_parquet(
schema: An optional schema representing the parquet files. If None,
the parquet reader will try to infer it based on data in the
file.
file_sort_order: Sort order for the file.
file_sort_order: Sort order for the file. Each sort key can be
specified as a column name (``str``), an expression
(``Expr``), or a ``SortExpr``.

Returns:
DataFrame representation of the read Parquet files
"""
if table_partition_cols is None:
table_partition_cols = []
table_partition_cols = self._convert_table_partition_cols(table_partition_cols)
file_sort_order = (
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
if file_sort_order is not None
else None
)
file_sort_order = self._convert_file_sort_order(file_sort_order)
return DataFrame(
self.ctx.read_parquet(
str(path),
Expand Down Expand Up @@ -1179,6 +1176,24 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
"""Execute the ``plan`` and return the results."""
return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions))

@staticmethod
def _convert_file_sort_order(
file_sort_order: Sequence[Sequence[SortKey]] | None,
) -> list[list[expr_internal.SortExpr]] | None:
"""Convert nested ``SortKey`` sequences into raw sort expressions.

Each ``SortKey`` can be a column name string, an ``Expr``, or a
``SortExpr`` and will be converted using
:func:`datafusion.expr.sort_list_to_raw_sort_list`.
"""
# Convert each ``SortKey`` in the provided sort order to the low-level
# representation expected by the Rust bindings.
return (
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
if file_sort_order is not None
else None
)

@staticmethod
def _convert_table_partition_cols(
table_partition_cols: list[tuple[str, str | pa.DataType]],
Expand Down
Loading
Loading