Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,16 +620,25 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None
def write_parquet(
self,
path: str | pathlib.Path,
compression: str = "uncompressed",
compression: str = "ZSTD",
compression_level: int | None = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.

Args:
path: Path of the Parquet file to write.
compression: Compression type to use.
compression_level: Compression level to use.
"""
compression: Compression type to use. Default is "ZSTD".
compression_level: Compression level to use. For ZSTD, the
recommended range is 1 to 22, with the default being 4. Higher levels
provide better compression but slower speed.
"""
if compression == "ZSTD":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@kosiew kosiew Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ion-elgreco ,

I added the Compression Enum but omitted the check_valid_levels because these are already implemented in Rust DataFrame eg

"zstd" => Compression::ZSTD(
ZstdLevel::try_new(verify_compression_level(compression_level)? as i32)
.map_err(|e| PyValueError::new_err(format!("{e}")))?,
),

Compression levels are tested in:

@pytest.mark.parametrize(
"compression, compression_level",
[("gzip", 12), ("brotli", 15), ("zstd", 23), ("wrong", 12)],
)
def test_write_compressed_parquet_wrong_compression_level(
df, tmp_path, compression, compression_level
):
path = tmp_path
with pytest.raises(ValueError):
df.write_parquet(
str(path),
compression=compression,
compression_level=compression_level,

if compression_level is None:
# Default compression level for ZSTD is 4 like in delta-rs
# https://github.com/apache/datafusion-python/pull/981#discussion_r1899871918
compression_level = 4
elif not (1 <= compression_level <= 22):
raise ValueError("Compression level for ZSTD must be between 1 and 22")
self.df.write_parquet(str(path), compression, compression_level)

def write_json(self, path: str | pathlib.Path) -> None:
Expand Down
Loading