Skip to content

Commit e11d4ff

Browse files
authored
feat: don't write leading/trailing zero histograms into file (#1372)
fixes #1362
1 parent f7d0b79 commit e11d4ff

File tree

6 files changed

+74
-8
lines changed

6 files changed

+74
-8
lines changed

vortex-array/src/data/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl ArrayData {
5151
metadata,
5252
buffer,
5353
children,
54-
stats_map: Arc::new(RwLock::new(statistics)),
54+
stats_set: Arc::new(RwLock::new(statistics)),
5555
};
5656

5757
let array = ArrayData(InnerArrayData::Owned(data));

vortex-array/src/data/owned.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub(crate) struct OwnedArrayData {
1919
pub(crate) metadata: Arc<dyn ArrayMetadata>,
2020
pub(crate) buffer: Option<Buffer>,
2121
pub(crate) children: Arc<[ArrayData]>,
22-
pub(crate) stats_map: Arc<RwLock<StatsSet>>,
22+
pub(crate) stats_set: Arc<RwLock<StatsSet>>,
2323
}
2424

2525
impl OwnedArrayData {
@@ -92,7 +92,7 @@ impl OwnedArrayData {
9292

9393
impl Statistics for OwnedArrayData {
9494
fn get(&self, stat: Stat) -> Option<Scalar> {
95-
self.stats_map
95+
self.stats_set
9696
.read()
9797
.unwrap_or_else(|_| {
9898
vortex_panic!(
@@ -105,14 +105,14 @@ impl Statistics for OwnedArrayData {
105105
}
106106

107107
fn to_set(&self) -> StatsSet {
108-
self.stats_map
108+
self.stats_set
109109
.read()
110110
.unwrap_or_else(|_| vortex_panic!("Failed to acquire read lock on stats map"))
111111
.clone()
112112
}
113113

114114
fn set(&self, stat: Stat, value: Scalar) {
115-
self.stats_map
115+
self.stats_set
116116
.write()
117117
.unwrap_or_else(|_| {
118118
vortex_panic!(
@@ -124,6 +124,13 @@ impl Statistics for OwnedArrayData {
124124
.set(stat, value);
125125
}
126126

127+
fn clear(&self, stat: Stat) {
128+
self.stats_set
129+
.write()
130+
.unwrap_or_else(|_| vortex_panic!("Failed to acquire write lock on stats map"))
131+
.clear(stat);
132+
}
133+
127134
fn compute(&self, stat: Stat) -> Option<Scalar> {
128135
if let Some(s) = self.get(stat) {
129136
return Some(s);
@@ -133,14 +140,21 @@ impl Statistics for OwnedArrayData {
133140
.with_dyn(|a| a.compute_statistics(stat))
134141
.ok()?;
135142

136-
self.stats_map
143+
self.stats_set
137144
.write()
138145
.unwrap_or_else(|_| {
139146
vortex_panic!("Failed to write to stats map while computing {}", stat)
140147
})
141148
.extend(computed);
142149
self.get(stat)
143150
}
151+
152+
fn retain_only(&self, stats: &[Stat]) {
153+
self.stats_set
154+
.write()
155+
.unwrap_or_else(|_| vortex_panic!("Failed to acquire write lock on stats map"))
156+
.retain_only(stats);
157+
}
144158
}
145159

146160
impl From<OwnedArrayData> for ArrayData {

vortex-array/src/data/viewed.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,15 @@ impl Statistics for ViewedArrayData {
213213
/// We want to avoid any sort of allocation on instantiation of the ArrayView, so we
214214
/// do not allocate a stats_set to cache values.
215215
fn set(&self, _stat: Stat, _value: Scalar) {
216-
// We cannot set stats on a view
216+
// We cannot modify stats on a view
217+
}
218+
219+
fn clear(&self, _stat: Stat) {
220+
// We cannot modify stats on a view
221+
}
222+
223+
fn retain_only(&self, _stats: &[Stat]) {
224+
// We cannot modify stats on a view
217225
}
218226

219227
fn compute(&self, stat: Stat) -> Option<Scalar> {

vortex-array/src/stats/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod flatbuffers;
1818
mod statsset;
1919

2020
/// Statistics that are used for pruning files (i.e., we want to ensure they are computed when compressing/writing).
21-
pub(crate) const PRUNING_STATS: &[Stat] = &[Stat::Min, Stat::Max, Stat::TrueCount, Stat::NullCount];
21+
pub const PRUNING_STATS: &[Stat] = &[Stat::Min, Stat::Max, Stat::TrueCount, Stat::NullCount];
2222

2323
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence, Enum)]
2424
#[non_exhaustive]
@@ -44,6 +44,7 @@ pub enum Stat {
4444
TrueCount,
4545
/// The number of null values in the array
4646
NullCount,
47+
/// The uncompressed size of the array in bytes
4748
UncompressedSizeInBytes,
4849
}
4950

@@ -95,8 +96,12 @@ pub trait Statistics {
9596
/// Get all existing statistics
9697
fn to_set(&self) -> StatsSet;
9798

99+
/// Set the value of the statistic
98100
fn set(&self, stat: Stat, value: Scalar);
99101

102+
/// Clear the value of the statistic
103+
fn clear(&self, stat: Stat);
104+
100105
/// Computes the value of the stat if it's not present
101106
fn compute(&self, stat: Stat) -> Option<Scalar>;
102107

@@ -111,6 +116,8 @@ pub trait Statistics {
111116
}
112117
Ok(stats_set)
113118
}
119+
120+
fn retain_only(&self, stats: &[Stat]);
114121
}
115122

116123
pub trait ArrayStatistics {

vortex-array/src/stats/statsset.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use core::mem;
2+
13
use enum_iterator::all;
24
use enum_map::EnumMap;
35
use itertools::{EitherOrBoth, Itertools};
@@ -130,6 +132,13 @@ impl StatsSet {
130132
self.values[stat] = None;
131133
}
132134

135+
pub fn retain_only(&mut self, stats: &[Stat]) {
136+
let mut old_map = mem::take(&mut self.values);
137+
for stat in stats {
138+
self.values[*stat] = old_map[*stat].take();
139+
}
140+
}
141+
133142
/// Merge stats set `other` into `self`, with the semantic assumption that `other`
134143
/// contains stats from an array that is *appended* to the array represented by `self`.
135144
pub fn merge_ordered(&mut self, other: &Self) -> &Self {

vortex-file/src/write/writer.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{io, mem};
33
use flatbuffers::FlatBufferBuilder;
44
use futures::TryStreamExt;
55
use vortex_array::array::{ChunkedArray, StructArray};
6+
use vortex_array::stats::{ArrayStatistics, Stat};
67
use vortex_array::stream::ArrayStream;
78
use vortex_array::{ArrayDType as _, ArrayData};
89
use vortex_buffer::io_buf::IoBuf;
@@ -19,6 +20,18 @@ use crate::write::metadata_accumulators::{new_metadata_accumulator, MetadataAccu
1920
use crate::write::postscript::Postscript;
2021
use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
2122

23+
const STATS_TO_WRITE: &[Stat] = &[
24+
Stat::Min,
25+
Stat::Max,
26+
Stat::TrueCount,
27+
Stat::NullCount,
28+
Stat::RunCount,
29+
Stat::IsConstant,
30+
Stat::IsSorted,
31+
Stat::IsStrictSorted,
32+
Stat::UncompressedSizeInBytes,
33+
];
34+
2235
pub struct VortexFileWriter<W> {
2336
msgs: MessageWriter<W>,
2437

@@ -217,7 +230,13 @@ impl ColumnWriter {
217230

218231
while let Some(chunk) = stream.try_next().await? {
219232
rows_written += chunk.len() as u64;
233+
234+
// accumulate the stats for the stats table
220235
self.metadata.push_chunk(&chunk);
236+
237+
// clear the stats that we don't want to serialize into the file
238+
chunk.statistics().retain_only(STATS_TO_WRITE);
239+
221240
msgs.write_batch(chunk).await?;
222241
offsets.push(msgs.tell());
223242
row_offsets.push(rows_written);
@@ -292,11 +311,13 @@ mod tests {
292311
use flatbuffers::FlatBufferBuilder;
293312
use futures_executor::block_on;
294313
use vortex_array::array::{PrimitiveArray, StructArray, VarBinArray};
314+
use vortex_array::stats::PRUNING_STATS;
295315
use vortex_array::validity::Validity;
296316
use vortex_array::IntoArrayData;
297317
use vortex_flatbuffers::WriteFlatBuffer;
298318

299319
use crate::write::postscript::Postscript;
320+
use crate::write::writer::STATS_TO_WRITE;
300321
use crate::{VortexFileWriter, V1_FOOTER_FBS_SIZE};
301322

302323
#[test]
@@ -328,4 +349,11 @@ mod tests {
328349

329350
assert_eq!(buffer[buffer_begin..buffer_end].len(), V1_FOOTER_FBS_SIZE);
330351
}
352+
353+
#[test]
354+
fn stats_to_write() {
355+
for stat in PRUNING_STATS {
356+
assert!(STATS_TO_WRITE.contains(stat));
357+
}
358+
}
331359
}

0 commit comments

Comments
 (0)