Skip to content

Commit 0c7cb2a

Browse files
authored
Parquet: Do not compress v2 data page when compress is bad quality (#8257)
# Which issue does this PR close? - Closes #8256 . # Rationale for this change Do not compress v2 data page when compress is bad quality ( compressed size is greater or equal to uncompressed_size ) # What changes are included in this PR? Discard compression when it's too large # Are these changes tested? Covered by existing # Are there any user-facing changes? No
1 parent e5ead92 commit 0c7cb2a

File tree

1 file changed

+44
-4
lines changed
  • parquet/src/column/writer

1 file changed

+44
-4
lines changed

parquet/src/column/writer/mod.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,12 +1104,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11041104
rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
11051105

11061106
// Data Page v2 compresses values only.
1107-
match self.compressor {
1107+
let is_compressed = match self.compressor {
11081108
Some(ref mut cmpr) => {
1109+
let buffer_len = buffer.len();
11091110
cmpr.compress(&values_data.buf, &mut buffer)?;
1111+
if uncompressed_size <= buffer.len() - buffer_len {
1112+
buffer.truncate(buffer_len);
1113+
buffer.extend_from_slice(&values_data.buf);
1114+
false
1115+
} else {
1116+
true
1117+
}
11101118
}
1111-
None => buffer.extend_from_slice(&values_data.buf),
1112-
}
1119+
None => {
1120+
buffer.extend_from_slice(&values_data.buf);
1121+
false
1122+
}
1123+
};
11131124

11141125
let data_page = Page::DataPageV2 {
11151126
buf: buffer.into(),
@@ -1119,7 +1130,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11191130
num_rows: self.page_metrics.num_buffered_rows,
11201131
def_levels_byte_len: def_levels_byte_len as u32,
11211132
rep_levels_byte_len: rep_levels_byte_len as u32,
1122-
is_compressed: self.compressor.is_some(),
1133+
is_compressed,
11231134
statistics: page_statistics,
11241135
};
11251136

@@ -4236,4 +4247,33 @@ mod tests {
42364247
.unwrap();
42374248
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
42384249
}
4250+
4251+
#[test]
4252+
fn test_page_v2_snappy_compression_fallback() {
4253+
// Test that PageV2 sets is_compressed to false when Snappy compression increases data size
4254+
let page_writer = TestPageWriter {};
4255+
4256+
// Create WriterProperties with PageV2 and Snappy compression
4257+
let props = WriterProperties::builder()
4258+
.set_writer_version(WriterVersion::PARQUET_2_0)
4259+
// Disable dictionary to ensure data is written directly
4260+
.set_dictionary_enabled(false)
4261+
.set_compression(Compression::SNAPPY)
4262+
.build();
4263+
4264+
let mut column_writer =
4265+
get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4266+
4267+
// Create small, simple data that Snappy compression will likely increase in size
4268+
// due to compression overhead for very small data
4269+
let values = vec![ByteArray::from("a")];
4270+
4271+
column_writer.write_batch(&values, None, None).unwrap();
4272+
4273+
let result = column_writer.close().unwrap();
4274+
assert_eq!(
4275+
result.metadata.uncompressed_size(),
4276+
result.metadata.compressed_size()
4277+
);
4278+
}
42394279
}

0 commit comments

Comments
 (0)