Skip to content

Commit 4855ff2

Browse files
authored
feat: Add uncompressed size stat for Vortex/Datafusion (#1512)
As a followup to #1455 as @robert3005 noted.
1 parent 6619052 commit 4855ff2

File tree

3 files changed

+74
-15
lines changed

3 files changed

+74
-15
lines changed

vortex-datafusion/src/persistent/format.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@ use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
1717
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
1818
use datafusion_physical_plan::ExecutionPlan;
1919
use object_store::{ObjectMeta, ObjectStore};
20+
use vortex_array::array::StructArray;
2021
use vortex_array::arrow::infer_schema;
2122
use vortex_array::Context;
2223
use vortex_file::metadata::MetadataFetcher;
2324
use vortex_file::{
24-
read_initial_bytes, LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache,
25-
Scan, VORTEX_FILE_EXTENSION,
25+
read_initial_bytes, read_layout_from_initial, LayoutContext, LayoutDeserializer,
26+
LayoutMessageCache, RelativeLayoutCache, Scan, VORTEX_FILE_EXTENSION,
2627
};
2728
use vortex_io::{IoDispatcher, ObjectStoreReadAt};
2829

2930
use super::execution::VortexExec;
30-
use super::statistics::array_to_col_statistics;
31+
use super::statistics::{array_to_col_statistics, uncompressed_col_size};
3132
use crate::can_be_pushed_down;
3233

3334
#[derive(Debug, Default)]
@@ -108,7 +109,7 @@ impl FileFormat for VortexFormat {
108109
let relative_message_cache =
109110
RelativeLayoutCache::new(layout_message_cache.clone(), dtype.into());
110111

111-
let root_layout = vortex_file::read_layout_from_initial(
112+
let root_layout = read_layout_from_initial(
112113
&initial_read,
113114
&layout_deserializer,
114115
Scan::empty(),
@@ -125,17 +126,26 @@ impl FileFormat for VortexFormat {
125126

126127
if let Some(metadata) = metadata_table {
127128
let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
129+
let mut total_size = 0_u64;
128130

129131
for col_stats in metadata.into_iter() {
130132
let col_stats = match col_stats {
131-
Some(array) => array_to_col_statistics(array.try_into()?)?,
133+
Some(array) => {
134+
let col_metadata_array = StructArray::try_from(array)?;
135+
let col_stats = array_to_col_statistics(&col_metadata_array)?;
136+
137+
total_size +=
138+
uncompressed_col_size(&col_metadata_array)?.unwrap_or_default();
139+
col_stats
140+
}
132141
None => ColumnStatistics::new_unknown(),
133142
};
134143

135144
column_statistics.push(col_stats);
136145
}
137146

138147
stats.column_statistics = column_statistics;
148+
stats.total_byte_size = Precision::Inexact(total_size as usize);
139149
}
140150

141151
Ok(stats)

vortex-datafusion/src/persistent/statistics.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use vortex_array::variants::StructArrayTrait as _;
99
use vortex_array::IntoCanonical;
1010
use vortex_error::VortexResult;
1111

12-
pub fn array_to_col_statistics(array: StructArray) -> VortexResult<ColumnStatistics> {
12+
pub fn array_to_col_statistics(array: &StructArray) -> VortexResult<ColumnStatistics> {
1313
let mut stats = ColumnStatistics::new_unknown();
1414

1515
if let Some(null_count_array) = array.field_by_name("null_count") {
@@ -40,3 +40,17 @@ pub fn array_to_col_statistics(array: StructArray) -> VortexResult<ColumnStatist
4040

4141
Ok(stats)
4242
}
43+
44+
pub fn uncompressed_col_size(array: &StructArray) -> VortexResult<Option<u64>> {
45+
match array.field_by_name("uncompressed_size") {
46+
None => Ok(None),
47+
Some(array) => {
48+
let array = array.into_canonical()?.into_arrow()?;
49+
let array = array.as_primitive::<UInt64Type>();
50+
51+
let uncompressed_size = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();
52+
53+
Ok(Some(uncompressed_size))
54+
}
55+
}
56+
}

vortex-file/src/write/metadata_accumulators.rs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ struct BoolAccumulator {
4141
minima: UnwrappedStatAccumulator<bool>,
4242
true_count: UnwrappedStatAccumulator<u64>,
4343
null_count: UnwrappedStatAccumulator<u64>,
44+
uncompressed_size: UnwrappedStatAccumulator<u64>,
4445
}
4546

4647
impl BoolAccumulator {
@@ -50,6 +51,10 @@ impl BoolAccumulator {
5051
minima: UnwrappedStatAccumulator::new(Stat::Min, "min".into()),
5152
true_count: UnwrappedStatAccumulator::new(Stat::TrueCount, "true_count".into()),
5253
null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into()),
54+
uncompressed_size: UnwrappedStatAccumulator::new(
55+
Stat::UncompressedSizeInBytes,
56+
"uncompressed_size".into(),
57+
),
5358
}
5459
}
5560
}
@@ -60,6 +65,7 @@ impl MetadataAccumulator for BoolAccumulator {
6065
self.minima.push_chunk(array);
6166
self.true_count.push_chunk(array);
6267
self.null_count.push_chunk(array);
68+
self.uncompressed_size.push_chunk(array);
6369
}
6470

6571
fn into_array(self: Box<Self>) -> VortexResult<Option<ArrayData>> {
@@ -68,6 +74,7 @@ impl MetadataAccumulator for BoolAccumulator {
6874
self.minima.into_column(),
6975
self.true_count.into_column(),
7076
self.null_count.into_column(),
77+
self.uncompressed_size.into_column(),
7178
]
7279
.into_iter()
7380
.flatten()
@@ -90,6 +97,7 @@ struct StandardAccumulator<T> {
9097
maxima: UnwrappedStatAccumulator<T>,
9198
minima: UnwrappedStatAccumulator<T>,
9299
null_count: UnwrappedStatAccumulator<u64>,
100+
uncompressed_size: UnwrappedStatAccumulator<u64>,
93101
}
94102

95103
impl<T> StandardAccumulator<T> {
@@ -98,6 +106,10 @@ impl<T> StandardAccumulator<T> {
98106
maxima: UnwrappedStatAccumulator::new(Stat::Max, "max".into()),
99107
minima: UnwrappedStatAccumulator::new(Stat::Min, "min".into()),
100108
null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into()),
109+
uncompressed_size: UnwrappedStatAccumulator::new(
110+
Stat::UncompressedSizeInBytes,
111+
"uncompressed_size".into(),
112+
),
101113
}
102114
}
103115
}
@@ -111,13 +123,15 @@ where
111123
self.maxima.push_chunk(array);
112124
self.minima.push_chunk(array);
113125
self.null_count.push_chunk(array);
126+
self.uncompressed_size.push_chunk(array);
114127
}
115128

116129
fn into_array(self: Box<Self>) -> VortexResult<Option<ArrayData>> {
117130
let (names, fields): (Vec<FieldName>, Vec<ArrayData>) = [
118131
self.maxima.into_column(),
119132
self.minima.into_column(),
120133
self.null_count.into_column(),
134+
self.uncompressed_size.into_column(),
121135
]
122136
.into_iter()
123137
.flatten()
@@ -134,29 +148,38 @@ where
134148
}
135149
}
136150

137-
/// A minimal accumulator which only tracks null counts.
151+
/// A minimal accumulator which only tracks null counts and total uncompressed size.
138152
struct BasicAccumulator {
139153
null_count: UnwrappedStatAccumulator<u64>,
154+
uncompressed_size: UnwrappedStatAccumulator<u64>,
140155
}
141156

142157
impl BasicAccumulator {
143158
fn new() -> Self {
144159
Self {
145160
null_count: UnwrappedStatAccumulator::new(Stat::NullCount, "null_count".into()),
161+
uncompressed_size: UnwrappedStatAccumulator::new(
162+
Stat::UncompressedSizeInBytes,
163+
"uncompressed_size".into(),
164+
),
146165
}
147166
}
148167
}
149168

150169
impl MetadataAccumulator for BasicAccumulator {
151170
fn push_chunk(&mut self, array: &ArrayData) {
152-
self.null_count.push_chunk(array)
171+
self.null_count.push_chunk(array);
172+
self.uncompressed_size.push_chunk(array);
153173
}
154174

155175
fn into_array(self: Box<Self>) -> VortexResult<Option<ArrayData>> {
156-
let (names, fields): (Vec<FieldName>, Vec<ArrayData>) = [self.null_count.into_column()]
157-
.into_iter()
158-
.flatten()
159-
.unzip();
176+
let (names, fields): (Vec<FieldName>, Vec<ArrayData>) = [
177+
self.null_count.into_column(),
178+
self.uncompressed_size.into_column(),
179+
]
180+
.into_iter()
181+
.flatten()
182+
.unzip();
160183
if fields.is_empty() {
161184
Ok(None)
162185
} else {
@@ -246,7 +269,16 @@ mod tests {
246269
StructArray::try_from(Box::new(bool_accumulator).into_array().unwrap().unwrap())
247270
.unwrap();
248271
assert_eq!(struct_array.len(), 1);
249-
assert_field_names(&struct_array, &["max", "min", "true_count", "null_count"]);
272+
assert_field_names(
273+
&struct_array,
274+
&[
275+
"max",
276+
"min",
277+
"true_count",
278+
"null_count",
279+
"uncompressed_size",
280+
],
281+
);
250282
}
251283

252284
#[test]
@@ -263,7 +295,10 @@ mod tests {
263295
)
264296
.unwrap();
265297
assert_eq!(struct_array.len(), 1);
266-
assert_field_names(&struct_array, &["max", "min", "null_count"]);
298+
assert_field_names(
299+
&struct_array,
300+
&["max", "min", "null_count", "uncompressed_size"],
301+
);
267302
}
268303

269304
#[test]
@@ -328,6 +363,6 @@ mod tests {
328363
StructArray::try_from(Box::new(basic_accumulator).into_array().unwrap().unwrap())
329364
.unwrap();
330365
assert_eq!(struct_array.len(), 1);
331-
assert_field_names(&struct_array, &["null_count"]);
366+
assert_field_names(&struct_array, &["null_count", "uncompressed_size"]);
332367
}
333368
}

0 commit comments

Comments
 (0)