Skip to content

Commit 14cd71e

Browse files
Smotrovalamb
andauthored
feat: add compression level configuration for JSON/CSV writers (#18954)
## Which issue does this PR close? Closes #18947 ## Rationale for this change Currently, DataFusion uses default compression levels when writing compressed JSON and CSV files. For ZSTD, this means level 3, which prioritizes speed over compression ratio. Users working with large datasets who want to optimize for storage costs or network transfer have no way to increase the compression level. This is particularly important for cloud data lake scenarios where storage and egress costs can be significant. ## What changes are included in this PR? - Add `compression_level: Option<u32>` field to `JsonOptions` and `CsvOptions` in `config.rs` - Add `convert_async_writer_with_level()` method to `FileCompressionType` (non-breaking API extension) - Keep original `convert_async_writer()` as a convenience wrapper for backward compatibility - Update `JsonWriterOptions` and `CsvWriterOptions` with `compression_level` field - Update `ObjectWriterBuilder` to support compression level - Update JSON and CSV sinks to pass compression level through the write pipeline - Update proto definitions and conversions for serialization support - Fix unrelated unused import warning in `udf.rs` (conditional compilation for debug-only imports) ## Are these changes tested? The changes follow the existing patterns used throughout the codebase. The implementation was verified by: - Building successfully with `cargo build` - Running existing tests with `cargo test --package datafusion-proto` - All 131 proto integration tests pass ## Are there any user-facing changes? Yes, users can now specify compression level when writing JSON/CSV files: ```rust use datafusion::common::config::JsonOptions; use datafusion::common::parsers::CompressionTypeVariant; let json_opts = JsonOptions { compression: CompressionTypeVariant::ZSTD, compression_level: Some(9), // Higher compression ..Default::default() }; ``` **Supported compression levels:** - ZSTD: 1-22 (default: 3) - GZIP: 0-9 (default: 6) - BZIP2: 1-9 (default: 9) - XZ: 0-9 (default: 6) **This is a non-breaking change** - the original `convert_async_writer()` method signature is preserved for backward compatibility. Co-authored-by: Andrew Lamb <[email protected]>
1 parent 1e4bd75 commit 14cd71e

File tree

16 files changed

+195
-8
lines changed

16 files changed

+195
-8
lines changed

datafusion/common/src/config.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,6 +1725,7 @@ config_field!(bool, value => default_config_transform(value.to_lowercase().as_st
17251725
config_field!(usize);
17261726
config_field!(f64);
17271727
config_field!(u64);
1728+
config_field!(u32);
17281729

17291730
impl ConfigField for u8 {
17301731
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
@@ -2844,6 +2845,14 @@ config_namespace! {
28442845
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
28452846
pub newlines_in_values: Option<bool>, default = None
28462847
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2848+
/// Compression level for the output file. The valid range depends on the
2849+
/// compression algorithm:
2850+
/// - ZSTD: 1 to 22 (default: 3)
2851+
/// - GZIP: 0 to 9 (default: 6)
2852+
/// - BZIP2: 0 to 9 (default: 6)
2853+
/// - XZ: 0 to 9 (default: 6)
2854+
/// If not specified, the default level for the compression algorithm is used.
2855+
pub compression_level: Option<u32>, default = None
28472856
pub schema_infer_max_rec: Option<usize>, default = None
28482857
pub date_format: Option<String>, default = None
28492858
pub datetime_format: Option<String>, default = None
@@ -2966,6 +2975,14 @@ impl CsvOptions {
29662975
self
29672976
}
29682977

2978+
/// Set the compression level for the output file.
2979+
/// The valid range depends on the compression algorithm.
2980+
/// If not specified, the default level for the algorithm is used.
2981+
pub fn with_compression_level(mut self, level: u32) -> Self {
2982+
self.compression_level = Some(level);
2983+
self
2984+
}
2985+
29692986
/// The delimiter character.
29702987
pub fn delimiter(&self) -> u8 {
29712988
self.delimiter
@@ -2991,6 +3008,14 @@ config_namespace! {
29913008
/// Options controlling JSON format
29923009
pub struct JsonOptions {
29933010
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
3011+
/// Compression level for the output file. The valid range depends on the
3012+
/// compression algorithm:
3013+
/// - ZSTD: 1 to 22 (default: 3)
3014+
/// - GZIP: 0 to 9 (default: 6)
3015+
/// - BZIP2: 0 to 9 (default: 6)
3016+
/// - XZ: 0 to 9 (default: 6)
3017+
/// If not specified, the default level for the compression algorithm is used.
3018+
pub compression_level: Option<u32>, default = None
29943019
pub schema_infer_max_rec: Option<usize>, default = None
29953020
}
29963021
}

datafusion/common/src/file_options/csv_writer.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ pub struct CsvWriterOptions {
3131
/// Compression to apply after ArrowWriter serializes RecordBatches.
3232
/// This compression is applied by DataFusion not the ArrowWriter itself.
3333
pub compression: CompressionTypeVariant,
34+
/// Compression level for the output file.
35+
pub compression_level: Option<u32>,
3436
}
3537

3638
impl CsvWriterOptions {
@@ -41,6 +43,20 @@ impl CsvWriterOptions {
4143
Self {
4244
writer_options,
4345
compression,
46+
compression_level: None,
47+
}
48+
}
49+
50+
/// Create a new `CsvWriterOptions` with the specified compression level.
51+
pub fn new_with_level(
52+
writer_options: WriterBuilder,
53+
compression: CompressionTypeVariant,
54+
compression_level: u32,
55+
) -> Self {
56+
Self {
57+
writer_options,
58+
compression,
59+
compression_level: Some(compression_level),
4460
}
4561
}
4662
}
@@ -81,6 +97,7 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
8197
Ok(CsvWriterOptions {
8298
writer_options: builder,
8399
compression: value.compression,
100+
compression_level: value.compression_level,
84101
})
85102
}
86103
}

datafusion/common/src/file_options/json_writer.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,26 @@ use crate::{
2727
#[derive(Clone, Debug)]
2828
pub struct JsonWriterOptions {
2929
pub compression: CompressionTypeVariant,
30+
pub compression_level: Option<u32>,
3031
}
3132

3233
impl JsonWriterOptions {
3334
pub fn new(compression: CompressionTypeVariant) -> Self {
34-
Self { compression }
35+
Self {
36+
compression,
37+
compression_level: None,
38+
}
39+
}
40+
41+
/// Create a new `JsonWriterOptions` with the specified compression and level.
42+
pub fn new_with_level(
43+
compression: CompressionTypeVariant,
44+
compression_level: u32,
45+
) -> Self {
46+
Self {
47+
compression,
48+
compression_level: Some(compression_level),
49+
}
3550
}
3651
}
3752

@@ -41,6 +56,7 @@ impl TryFrom<&JsonOptions> for JsonWriterOptions {
4156
fn try_from(value: &JsonOptions) -> Result<Self> {
4257
Ok(JsonWriterOptions {
4358
compression: value.compression,
59+
compression_level: value.compression_level,
4460
})
4561
}
4662
}

datafusion/datasource-csv/src/file_format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,7 @@ impl FileSink for CsvSink {
780780
context,
781781
serializer,
782782
self.writer_options.compression.into(),
783+
self.writer_options.compression_level,
783784
object_store,
784785
demux_task,
785786
file_stream_rx,

datafusion/datasource-json/src/file_format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ impl FileSink for JsonSink {
373373
context,
374374
serializer,
375375
self.writer_options.compression.into(),
376+
self.writer_options.compression_level,
376377
object_store,
377378
demux_task,
378379
file_stream_rx,

datafusion/datasource/src/file_compression_type.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,22 +155,60 @@ impl FileCompressionType {
155155
}
156156

157157
/// Wrap the given `BufWriter` so that it performs compressed writes
158-
/// according to this `FileCompressionType`.
158+
/// according to this `FileCompressionType` using the default compression level.
159159
pub fn convert_async_writer(
160160
&self,
161161
w: BufWriter,
162162
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
163+
self.convert_async_writer_with_level(w, None)
164+
}
165+
166+
/// Wrap the given `BufWriter` so that it performs compressed writes
167+
/// according to this `FileCompressionType`.
168+
///
169+
/// If `compression_level` is `Some`, the encoder will use the specified
170+
/// compression level. If `None`, the default level for each algorithm is used.
171+
pub fn convert_async_writer_with_level(
172+
&self,
173+
w: BufWriter,
174+
compression_level: Option<u32>,
175+
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
176+
#[cfg(feature = "compression")]
177+
use async_compression::Level;
178+
163179
Ok(match self.variant {
164180
#[cfg(feature = "compression")]
165-
GZIP => Box::new(GzipEncoder::new(w)),
181+
GZIP => match compression_level {
182+
Some(level) => {
183+
Box::new(GzipEncoder::with_quality(w, Level::Precise(level as i32)))
184+
}
185+
None => Box::new(GzipEncoder::new(w)),
186+
},
166187
#[cfg(feature = "compression")]
167-
BZIP2 => Box::new(BzEncoder::new(w)),
188+
BZIP2 => match compression_level {
189+
Some(level) => {
190+
Box::new(BzEncoder::with_quality(w, Level::Precise(level as i32)))
191+
}
192+
None => Box::new(BzEncoder::new(w)),
193+
},
168194
#[cfg(feature = "compression")]
169-
XZ => Box::new(XzEncoder::new(w)),
195+
XZ => match compression_level {
196+
Some(level) => {
197+
Box::new(XzEncoder::with_quality(w, Level::Precise(level as i32)))
198+
}
199+
None => Box::new(XzEncoder::new(w)),
200+
},
170201
#[cfg(feature = "compression")]
171-
ZSTD => Box::new(ZstdEncoder::new(w)),
202+
ZSTD => match compression_level {
203+
Some(level) => {
204+
Box::new(ZstdEncoder::with_quality(w, Level::Precise(level as i32)))
205+
}
206+
None => Box::new(ZstdEncoder::new(w)),
207+
},
172208
#[cfg(not(feature = "compression"))]
173209
GZIP | BZIP2 | XZ | ZSTD => {
210+
// compression_level is not used when compression feature is disabled
211+
let _ = compression_level;
174212
return Err(DataFusionError::NotImplemented(
175213
"Compression feature is not enabled".to_owned(),
176214
));

datafusion/datasource/src/write/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ pub struct ObjectWriterBuilder {
131131
object_store: Arc<dyn ObjectStore>,
132132
/// The size of the buffer for the object writer.
133133
buffer_size: Option<usize>,
134+
/// The compression level for the object writer.
135+
compression_level: Option<u32>,
134136
}
135137

136138
impl ObjectWriterBuilder {
@@ -145,6 +147,7 @@ impl ObjectWriterBuilder {
145147
location: location.clone(),
146148
object_store,
147149
buffer_size: None,
150+
compression_level: None,
148151
}
149152
}
150153

@@ -202,6 +205,22 @@ impl ObjectWriterBuilder {
202205
self.buffer_size
203206
}
204207

208+
/// Set compression level for object writer.
209+
pub fn set_compression_level(&mut self, compression_level: Option<u32>) {
210+
self.compression_level = compression_level;
211+
}
212+
213+
/// Set compression level for object writer, returning the builder.
214+
pub fn with_compression_level(mut self, compression_level: Option<u32>) -> Self {
215+
self.compression_level = compression_level;
216+
self
217+
}
218+
219+
/// Currently specified compression level.
220+
pub fn get_compression_level(&self) -> Option<u32> {
221+
self.compression_level
222+
}
223+
205224
/// Return a writer object that writes to the object store location.
206225
///
207226
/// If a buffer size has not been set, the default buffer buffer size will
@@ -215,13 +234,15 @@ impl ObjectWriterBuilder {
215234
location,
216235
object_store,
217236
buffer_size,
237+
compression_level,
218238
} = self;
219239

220240
let buf_writer = match buffer_size {
221241
Some(size) => BufWriter::with_capacity(object_store, location, size),
222242
None => BufWriter::new(object_store, location),
223243
};
224244

225-
file_compression_type.convert_async_writer(buf_writer)
245+
file_compression_type
246+
.convert_async_writer_with_level(buf_writer, compression_level)
226247
}
227248
}

datafusion/datasource/src/write/orchestration.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ pub async fn spawn_writer_tasks_and_join(
248248
context: &Arc<TaskContext>,
249249
serializer: Arc<dyn BatchSerializer>,
250250
compression: FileCompressionType,
251+
compression_level: Option<u32>,
251252
object_store: Arc<dyn ObjectStore>,
252253
demux_task: SpawnedTask<Result<()>>,
253254
mut file_stream_rx: DemuxedStreamReceiver,
@@ -273,6 +274,7 @@ pub async fn spawn_writer_tasks_and_join(
273274
.execution
274275
.objectstore_writer_buffer_size,
275276
))
277+
.with_compression_level(compression_level)
276278
.build()?;
277279

278280
if tx_file_bundle

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,12 +461,14 @@ message CsvOptions {
461461
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
462462
bytes terminator = 17; // Optional terminator character as a byte
463463
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
464+
optional uint32 compression_level = 19; // Optional compression level
464465
}
465466

466467
// Options controlling CSV format
467468
message JsonOptions {
468469
CompressionTypeVariant compression = 1; // Compression type
469470
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference
471+
optional uint32 compression_level = 3; // Optional compression level
470472
}
471473

472474
message TableParquetOptions {

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
908908
double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
909909
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
910910
compression: proto_opts.compression().into(),
911+
compression_level: proto_opts.compression_level,
911912
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
912913
date_format: (!proto_opts.date_format.is_empty())
913914
.then(|| proto_opts.date_format.clone()),
@@ -1095,6 +1096,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
10951096
let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
10961097
Ok(JsonOptions {
10971098
compression: compression.into(),
1099+
compression_level: proto_opts.compression_level,
10981100
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
10991101
})
11001102
}

0 commit comments

Comments
 (0)