Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@
SessionContext,
SQLOptions,
)
from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
from .dataframe import (
DataFrame,
DataFrameWriteOptions,
InsertOp,
ParquetColumnOptions,
ParquetWriterOptions,
)
from .dataframe_formatter import configure_formatter
from .expr import (
Expr,
Expand Down Expand Up @@ -75,9 +81,11 @@
"Config",
"DFSchema",
"DataFrame",
"DataFrameWriteOptions",
"Database",
"ExecutionPlan",
"Expr",
"InsertOp",
"LogicalPlan",
"ParquetColumnOptions",
"ParquetWriterOptions",
Expand Down
115 changes: 108 additions & 7 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
from typing_extensions import deprecated # Python 3.12

from datafusion._internal import DataFrame as DataFrameInternal
from datafusion._internal import DataFrameWriteOptions as DataFrameWriteOptionsInternal
from datafusion._internal import InsertOp as InsertOpInternal
from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
from datafusion.expr import (
Expr,
SortExpr,
SortKey,
ensure_expr,
ensure_expr_list,
Expand Down Expand Up @@ -925,21 +928,31 @@ def except_all(self, other: DataFrame) -> DataFrame:
"""
return DataFrame(self.df.except_all(other.df))

def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None:
def write_csv(
self,
path: str | pathlib.Path,
with_header: bool = False,
write_options: DataFrameWriteOptions | None = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a CSV file.

Args:
path: Path of the CSV file to write.
with_header: If true, output the CSV header row.
write_options: Options that impact how the DataFrame is written.
"""
self.df.write_csv(str(path), with_header)
raw_write_options = (
write_options._raw_write_options if write_options is not None else None
)
self.df.write_csv(str(path), with_header, raw_write_options)

@overload
def write_parquet(
self,
path: str | pathlib.Path,
compression: str,
compression_level: int | None = None,
write_options: DataFrameWriteOptions | None = None,
) -> None: ...

@overload
Expand All @@ -948,6 +961,7 @@ def write_parquet(
path: str | pathlib.Path,
compression: Compression = Compression.ZSTD,
compression_level: int | None = None,
write_options: DataFrameWriteOptions | None = None,
) -> None: ...

@overload
Expand All @@ -956,16 +970,20 @@ def write_parquet(
path: str | pathlib.Path,
compression: ParquetWriterOptions,
compression_level: None = None,
write_options: DataFrameWriteOptions | None = None,
) -> None: ...

def write_parquet(
self,
path: str | pathlib.Path,
compression: Union[str, Compression, ParquetWriterOptions] = Compression.ZSTD,
compression_level: int | None = None,
write_options: DataFrameWriteOptions | None = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.

LZO compression is not yet implemented in arrow-rs and is therefore excluded.

Args:
path: Path of the Parquet file to write.
compression: Compression type to use. Default is "ZSTD".
Expand All @@ -977,10 +995,10 @@ def write_parquet(
- "lz4": LZ4 compression.
- "lz4_raw": LZ4_RAW compression.
- "zstd": Zstandard compression.
Note: LZO is not yet implemented in arrow-rs and is therefore excluded.
compression_level: Compression level to use. For ZSTD, the
recommended range is 1 to 22, with the default being 4. Higher levels
provide better compression but slower speed.
write_options: Options that impact how the DataFrame is written.
"""
if isinstance(compression, ParquetWriterOptions):
if compression_level is not None:
Expand All @@ -998,10 +1016,21 @@ def write_parquet(
):
compression_level = compression.get_default_level()

self.df.write_parquet(str(path), compression.value, compression_level)
raw_write_options = (
write_options._raw_write_options if write_options is not None else None
)
self.df.write_parquet(
str(path),
compression.value,
compression_level,
raw_write_options,
)

def write_parquet_with_options(
self, path: str | pathlib.Path, options: ParquetWriterOptions
self,
path: str | pathlib.Path,
options: ParquetWriterOptions,
write_options: DataFrameWriteOptions | None = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.

Expand All @@ -1010,6 +1039,7 @@ def write_parquet_with_options(
Args:
path: Path of the Parquet file to write.
options: Sets the writer parquet options (see `ParquetWriterOptions`).
write_options: Options that impact how the DataFrame is written.
"""
options_internal = ParquetWriterOptionsInternal(
options.data_pagesize_limit,
Expand Down Expand Up @@ -1046,19 +1076,45 @@ def write_parquet_with_options(
bloom_filter_ndv=opts.bloom_filter_ndv,
)

raw_write_options = (
write_options._raw_write_options if write_options is not None else None
)
self.df.write_parquet_with_options(
str(path),
options_internal,
column_specific_options_internal,
raw_write_options,
)

def write_json(self, path: str | pathlib.Path) -> None:
def write_json(
self,
path: str | pathlib.Path,
write_options: DataFrameWriteOptions | None = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a JSON file.

Args:
path: Path of the JSON file to write.
write_options: Options that impact how the DataFrame is written.
"""
raw_write_options = (
write_options._raw_write_options if write_options is not None else None
)
self.df.write_json(str(path), write_options=raw_write_options)

def write_table(
self, table_name: str, write_options: DataFrameWriteOptions | None = None
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a table.

The table must be registered with the session to perform this operation.
Not all table providers support writing operations. See the individual
implementations for details.
"""
self.df.write_json(str(path))
raw_write_options = (
write_options._raw_write_options if write_options is not None else None
)
self.df.write_table(table_name, raw_write_options)

def to_arrow_table(self) -> pa.Table:
"""Execute the :py:class:`DataFrame` and convert it into an Arrow Table.
Expand Down Expand Up @@ -1206,3 +1262,48 @@ def fill_null(self, value: Any, subset: list[str] | None = None) -> DataFrame:
- For columns not in subset, the original column is kept unchanged
"""
return DataFrame(self.df.fill_null(value, subset))


class InsertOp(Enum):
"""Insert operation mode.

These modes are used by the table writing feature to define how record
batches should be written to a table.
"""

APPEND = InsertOpInternal.APPEND
"""Appends new rows to the existing table without modifying any existing rows."""

REPLACE = InsertOpInternal.REPLACE
"""Replace existing rows that collide with the inserted rows.

Replacement is typically based on a unique key or primary key.
"""

OVERWRITE = InsertOpInternal.OVERWRITE
"""Overwrites all existing rows in the table with the new rows."""


class DataFrameWriteOptions:
"""Writer options for DataFrame.

There is no guarantee the table provider supports all writer options.
See the individual implementation and documentation for details.
"""

def __init__(
self,
insert_operation: InsertOp | None = None,
single_file_output: bool = False,
partition_by: str | Sequence[str] | None = None,
sort_by: Expr | SortExpr | Sequence[Expr] | Sequence[SortExpr] | None = None,
) -> None:
"""Instantiate writer options for DataFrame."""
if isinstance(partition_by, str):
partition_by = [partition_by]

sort_by_raw = sort_list_to_raw_sort_list(sort_by)

self._raw_write_options = DataFrameWriteOptionsInternal(
insert_operation, single_file_output, partition_by, sort_by_raw
)
Comment on lines +1310 to +1312
Copy link
Contributor

@kosiew kosiew Oct 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can't pass insert_operation directly here.

eg this test will fail

def test_dataframe_write_options_accepts_insert_op() -> None:
    """DataFrameWriteOptions should accept InsertOp enums."""

    try:
        DataFrameWriteOptions(insert_operation=InsertOp.REPLACE)
    except TypeError as exc:  
        pytest.fail(f"DataFrameWriteOptions rejected InsertOp: {exc}")

75 changes: 72 additions & 3 deletions python/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import ctypes
import datetime
import itertools
import os
import re
import threading
Expand All @@ -40,6 +41,7 @@
from datafusion import (
functions as f,
)
from datafusion.dataframe import DataFrameWriteOptions
from datafusion.dataframe_formatter import (
DataFrameHtmlFormatter,
configure_formatter,
Expand All @@ -58,9 +60,7 @@ def ctx():


@pytest.fixture
def df():
ctx = SessionContext()

def df(ctx):
# create a RecordBatch and a new DataFrame from it
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6]), pa.array([8, 5, 8])],
Expand Down Expand Up @@ -1830,6 +1830,56 @@ def test_write_csv(ctx, df, tmp_path, path_to_str):
assert result == expected


sort_by_cases = [
(None, [1, 2, 3], "unsorted"),
(column("c"), [2, 1, 3], "single_column_expr"),
(column("a").sort(ascending=False), [3, 2, 1], "single_sort_expr"),
([column("c"), column("b")], [2, 1, 3], "list_col_expr"),
(
[column("c").sort(ascending=False), column("b").sort(ascending=False)],
[3, 1, 2],
"list_sort_expr",
),
]

formats = ["csv", "json", "parquet", "table"]


@pytest.mark.parametrize(
("format", "sort_by", "expected_a"),
[
pytest.param(format, sort_by, expected_a, id=f"{format}_{test_id}")
for format, (sort_by, expected_a, test_id) in itertools.product(
formats, sort_by_cases
)
],
)
def test_write_files_with_options(
ctx, df, tmp_path, format, sort_by, expected_a
) -> None:
write_options = DataFrameWriteOptions(sort_by=sort_by)

if format == "csv":
df.write_csv(tmp_path, with_header=True, write_options=write_options)
ctx.register_csv("test_table", tmp_path)
elif format == "json":
df.write_json(tmp_path, write_options=write_options)
ctx.register_json("test_table", tmp_path)
elif format == "parquet":
df.write_parquet(tmp_path, write_options=write_options)
ctx.register_parquet("test_table", tmp_path)
elif format == "table":
batch = pa.RecordBatch.from_arrays([[], [], []], schema=df.schema())
ctx.register_record_batches("test_table", [[batch]])
ctx.table("test_table").show()
df.write_table("test_table", write_options=write_options)

result = ctx.table("test_table").to_pydict()["a"]
ctx.table("test_table").show()

assert result == expected_a


@pytest.mark.parametrize("path_to_str", [True, False])
def test_write_json(ctx, df, tmp_path, path_to_str):
path = str(tmp_path) if path_to_str else tmp_path
Expand Down Expand Up @@ -2322,6 +2372,25 @@ def test_write_parquet_options_error(df, tmp_path):
df.write_parquet(str(tmp_path), options, compression_level=1)


def test_write_table(ctx, df):
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3])],
names=["a"],
)

ctx.register_record_batches("t", [[batch]])

df = ctx.table("t").with_column("a", column("a") * literal(-1))

ctx.table("t").show()

df.write_table("t")
result = ctx.table("t").sort(column("a")).collect()[0][0].to_pylist()
expected = [-3, -2, -1, 1, 2, 3]

assert result == expected


def test_dataframe_export(df) -> None:
# Guarantees that we have the canonical implementation
# reading our dataframe export
Expand Down
Loading
Loading