Skip to content

Commit e86c3e3

Browse files
committed
feat: Support Parquet writer options
1 parent 7d8bcd8 commit e86c3e3

File tree

5 files changed

+623
-159
lines changed

5 files changed

+623
-159
lines changed

python/datafusion/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from . import functions, object_store, substrait, unparser
3232

3333
# The following imports are okay to remain as opaque to the user.
34-
from ._internal import Config
34+
from ._internal import Config, ParquetWriterOptions
3535
from .catalog import Catalog, Database, Table
3636
from .common import (
3737
DFSchema,
@@ -42,7 +42,7 @@
4242
SessionContext,
4343
SQLOptions,
4444
)
45-
from .dataframe import DataFrame
45+
from .dataframe import DataFrame, ParquetColumnOptions
4646
from .expr import (
4747
Expr,
4848
WindowFrame,
@@ -66,6 +66,8 @@
6666
"ExecutionPlan",
6767
"Expr",
6868
"LogicalPlan",
69+
"ParquetColumnOptions",
70+
"ParquetWriterOptions",
6971
"RecordBatch",
7072
"RecordBatchStream",
7173
"RuntimeEnvBuilder",

python/datafusion/dataframe.py

Lines changed: 167 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
Iterable,
2929
Literal,
3030
Optional,
31-
Union,
3231
overload,
3332
)
3433

@@ -51,67 +50,58 @@
5150
from datafusion._internal import DataFrame as DataFrameInternal
5251
from datafusion._internal import expr as expr_internal
5352

54-
from enum import Enum
55-
53+
from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
54+
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
5655
from datafusion.expr import Expr, SortExpr, sort_or_default
5756

5857

59-
# excerpt from deltalake
60-
# https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163
61-
class Compression(Enum):
62-
"""Enum representing the available compression types for Parquet files."""
63-
64-
UNCOMPRESSED = "uncompressed"
65-
SNAPPY = "snappy"
66-
GZIP = "gzip"
67-
BROTLI = "brotli"
68-
LZ4 = "lz4"
69-
# lzo is not implemented yet
70-
# https://github.com/apache/arrow-rs/issues/6970
71-
# LZO = "lzo"
72-
ZSTD = "zstd"
73-
LZ4_RAW = "lz4_raw"
74-
75-
@classmethod
76-
def from_str(cls: type[Compression], value: str) -> Compression:
77-
"""Convert a string to a Compression enum value.
78-
79-
Args:
80-
value: The string representation of the compression type.
81-
82-
Returns:
83-
The Compression enum lowercase value.
84-
85-
Raises:
86-
ValueError: If the string does not match any Compression enum value.
87-
"""
88-
try:
89-
return cls(value.lower())
90-
except ValueError as err:
91-
valid_values = str([item.value for item in Compression])
92-
error_msg = f"""
93-
{value} is not a valid Compression.
94-
Valid values are: {valid_values}
95-
"""
96-
raise ValueError(error_msg) from err
97-
98-
def get_default_level(self) -> Optional[int]:
99-
"""Get the default compression level for the compression type.
58+
class ParquetColumnOptions:
59+
"""Parquet options for individual columns.
60+
61+
Contains the available options that can be applied for an individual Parquet column,
62+
replacing the provided options in the `write_parquet`.
63+
64+
Attributes:
65+
encoding: Sets encoding for the column path. Valid values are: `plain`,
66+
`plain_dictionary`, `rle`, `bit_packed`, `delta_binary_packed`,
67+
`delta_length_byte_array`, `delta_byte_array`, `rle_dictionary`, and
68+
`byte_stream_split`. These values are not case-sensitive. If `None`, uses
69+
the default parquet options
70+
dictionary_enabled: Sets if dictionary encoding is enabled for the column path.
71+
If `None`, uses the default parquet options
72+
compression: Sets default parquet compression codec for the column path. Valid
73+
values are `uncompressed`, `snappy`, `gzip(level)`, `lzo`, `brotli(level)`,
74+
`lz4`, `zstd(level)`, and `lz4_raw`. These values are not case-sensitive. If
75+
`None`, uses the default parquet options.
76+
statistics_enabled: Sets if statistics are enabled for the column Valid values
77+
are: `none`, `chunk`, and `page` These values are not case sensitive. If
78+
`None`, uses the default parquet options.
79+
bloom_filter_enabled: Sets if bloom filter is enabled for the column path. If
80+
`None`, uses the default parquet options.
81+
bloom_filter_fpp: Sets bloom filter false positive probability for the column
82+
path. If `None`, uses the default parquet options.
83+
bloom_filter_ndv: Sets bloom filter number of distinct values. If `None`, uses
84+
the default parquet options.
85+
"""
10086

101-
Returns:
102-
The default compression level for the compression type.
103-
"""
104-
# GZIP, BROTLI default values from deltalake repo
105-
# https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163
106-
# ZSTD default value from delta-rs
107-
# https://github.com/apache/datafusion-python/pull/981#discussion_r1904789223
108-
if self == Compression.GZIP:
109-
return 6
110-
if self == Compression.BROTLI:
111-
return 1
112-
if self == Compression.ZSTD:
113-
return 4
114-
return None
87+
def __init__(
88+
self,
89+
encoding: Optional[str] = None,
90+
dictionary_enabled: Optional[bool] = None,
91+
compression: Optional[str] = None,
92+
statistics_enabled: Optional[str] = None,
93+
bloom_filter_enabled: Optional[bool] = None,
94+
bloom_filter_fpp: Optional[float] = None,
95+
bloom_filter_ndv: Optional[int] = None,
96+
) -> None:
97+
"""Initialize the ParquetColumnOptions."""
98+
self.encoding = encoding
99+
self.dictionary_enabled = dictionary_enabled
100+
self.compression = compression
101+
self.statistics_enabled = statistics_enabled
102+
self.bloom_filter_enabled = bloom_filter_enabled
103+
self.bloom_filter_fpp = bloom_filter_fpp
104+
self.bloom_filter_ndv = bloom_filter_ndv
115105

116106

117107
class DataFrame:
@@ -704,38 +694,135 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None
704694
def write_parquet(
705695
self,
706696
path: str | pathlib.Path,
707-
compression: Union[str, Compression] = Compression.ZSTD,
708-
compression_level: int | None = None,
697+
data_pagesize_limit: int = 1024 * 1024,
698+
write_batch_size: int = 1024,
699+
writer_version: str = "1.0",
700+
skip_arrow_metadata: bool = False,
701+
compression: Optional[str] = "zstd(3)",
702+
dictionary_enabled: Optional[bool] = True,
703+
dictionary_page_size_limit: int = 1024 * 1024,
704+
statistics_enabled: Optional[str] = "page",
705+
max_row_group_size: int = 1024 * 1024,
706+
created_by: str = "datafusion-python",
707+
column_index_truncate_length: Optional[int] = 64,
708+
statistics_truncate_length: Optional[int] = None,
709+
data_page_row_count_limit: int = 20_000,
710+
encoding: Optional[str] = None,
711+
bloom_filter_on_write: bool = False,
712+
bloom_filter_fpp: Optional[float] = None,
713+
bloom_filter_ndv: Optional[int] = None,
714+
allow_single_file_parallelism: bool = True,
715+
maximum_parallel_row_group_writers: int = 1,
716+
maximum_buffered_record_batches_per_stream: int = 2,
717+
column_specific_options: Optional[dict[str, ParquetColumnOptions]] = None,
709718
) -> None:
710719
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.
711720
712721
Args:
713722
path: Path of the Parquet file to write.
714-
compression: Compression type to use. Default is "ZSTD".
715-
Available compression types are:
723+
data_pagesize_limit: Sets best effort maximum size of data page in bytes.
724+
write_batch_size: Sets write_batch_size in bytes.
725+
writer_version: Sets parquet writer version. Valid values are `1.0` and
726+
`2.0`.
727+
skip_arrow_metadata: Skip encoding the embedded arrow metadata in the
728+
KV_meta.
729+
compression: Compression type to use. Default is "zstd(3)".
730+
Available compression types are
716731
- "uncompressed": No compression.
717732
- "snappy": Snappy compression.
718-
- "gzip": Gzip compression.
719-
- "brotli": Brotli compression.
733+
- "gzip(n)": Gzip compression with level n.
734+
- "brotli(n)": Brotli compression with level n.
720735
- "lz4": LZ4 compression.
721736
- "lz4_raw": LZ4_RAW compression.
722-
- "zstd": Zstandard compression.
723-
Note: LZO is not yet implemented in arrow-rs and is therefore excluded.
724-
compression_level: Compression level to use. For ZSTD, the
725-
recommended range is 1 to 22, with the default being 4. Higher levels
726-
provide better compression but slower speed.
727-
"""
728-
# Convert string to Compression enum if necessary
729-
if isinstance(compression, str):
730-
compression = Compression.from_str(compression)
731-
732-
if (
733-
compression in {Compression.GZIP, Compression.BROTLI, Compression.ZSTD}
734-
and compression_level is None
735-
):
736-
compression_level = compression.get_default_level()
737+
- "zstd(n)": Zstandard compression with level n.
738+
dictionary_enabled: Sets if dictionary encoding is enabled. If None, uses
739+
the default parquet writer setting.
740+
dictionary_page_size_limit: Sets best effort maximum dictionary page size,
741+
in bytes.
742+
statistics_enabled: Sets if statistics are enabled for any column Valid
743+
values are `none`, `chunk`, and `page`. If None, uses the default
744+
parquet writer setting.
745+
max_row_group_size: Target maximum number of rows in each row group
746+
(defaults to 1M rows). Writing larger row groups requires more memory to
747+
write, but can get better compression and be faster to read.
748+
created_by: Sets "created by" property.
749+
column_index_truncate_length: Sets column index truncate length.
750+
statistics_truncate_length: Sets statistics truncate length. If None, uses
751+
the default parquet writer setting.
752+
data_page_row_count_limit: Sets best effort maximum number of rows in a data
753+
page.
754+
encoding: Sets default encoding for any column. Valid values are `plain`,
755+
`plain_dictionary`, `rle`, `bit_packed`, `delta_binary_packed`,
756+
`delta_length_byte_array`, `delta_byte_array`, `rle_dictionary`, and
757+
`byte_stream_split`. If None, uses the default parquet writer setting.
758+
bloom_filter_on_write: Write bloom filters for all columns when creating
759+
parquet files.
760+
bloom_filter_fpp: Sets bloom filter false positive probability. If None,
761+
uses the default parquet writer setting
762+
bloom_filter_ndv: Sets bloom filter number of distinct values. If None, uses
763+
the default parquet writer setting.
764+
allow_single_file_parallelism: Controls whether DataFusion will attempt to
765+
speed up writing parquet files by serializing them in parallel. Each
766+
column in each row group in each output file are serialized in parallel
767+
leveraging a maximum possible core count of n_files * n_row_groups *
768+
n_columns.
769+
maximum_parallel_row_group_writers: By default parallel parquet writer is
770+
tuned for minimum memory usage in a streaming execution plan. You may
771+
see a performance benefit when writing large parquet files by increasing
772+
`maximum_parallel_row_group_writers` and
773+
`maximum_buffered_record_batches_per_stream` if your system has idle
774+
cores and can tolerate additional memory usage. Boosting these values is
775+
likely worthwhile when writing out already in-memory data, such as from
776+
a cached data frame.
777+
maximum_buffered_record_batches_per_stream: See
778+
`maximum_parallel_row_group_writers`.
779+
column_specific_options: Overrides options for specific columns. If a column
780+
is not a part of this dictionary, it will use the parameters provided in
781+
the `write_parquet`.
782+
"""
783+
options_internal = ParquetWriterOptionsInternal(
784+
data_pagesize_limit,
785+
write_batch_size,
786+
writer_version,
787+
skip_arrow_metadata,
788+
compression,
789+
dictionary_enabled,
790+
dictionary_page_size_limit,
791+
statistics_enabled,
792+
max_row_group_size,
793+
created_by,
794+
column_index_truncate_length,
795+
statistics_truncate_length,
796+
data_page_row_count_limit,
797+
encoding,
798+
bloom_filter_on_write,
799+
bloom_filter_fpp,
800+
bloom_filter_ndv,
801+
allow_single_file_parallelism,
802+
maximum_parallel_row_group_writers,
803+
maximum_buffered_record_batches_per_stream,
804+
)
805+
806+
if column_specific_options is None:
807+
column_specific_options = {}
808+
809+
column_specific_options_internal = {}
810+
for column, opts in column_specific_options.items():
811+
column_specific_options_internal[column] = ParquetColumnOptionsInternal(
812+
bloom_filter_enabled=opts.bloom_filter_enabled,
813+
encoding=opts.encoding,
814+
dictionary_enabled=opts.dictionary_enabled,
815+
compression=opts.compression,
816+
statistics_enabled=opts.statistics_enabled,
817+
bloom_filter_fpp=opts.bloom_filter_fpp,
818+
bloom_filter_ndv=opts.bloom_filter_ndv,
819+
)
737820

738-
self.df.write_parquet(str(path), compression.value, compression_level)
821+
self.df.write_parquet(
822+
str(path),
823+
options_internal,
824+
column_specific_options_internal,
825+
)
739826

740827
def write_json(self, path: str | pathlib.Path) -> None:
741828
"""Execute the :py:class:`DataFrame` and write the results to a JSON file.

0 commit comments

Comments
 (0)