Skip to content

Commit d1efb54

Browse files
authored
Truncate stats on write (#3272)
When truncating min we take the prefix and for max we increment the value until it doesn't overflow. Needs some tests Signed-off-by: Robert Kruszewski <[email protected]>
1 parent c63de4e commit d1efb54

File tree

32 files changed

+1154
-169
lines changed

32 files changed

+1154
-169
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

duckdb-vortex/src/include/vortex_common.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ struct ArrayStreamSink {
139139

140140
~ArrayStreamSink() {
141141
// "should dctor a sink, before closing it
142-
D_ASSERT(sink == nullptr);
142+
// If you throw during writes then the stack will be unwound and the destructor is going to be called before the
143+
// close method is invoked thus triggering following assertion failure and will clobber the exception printing
144+
// D_ASSERT(sink == nullptr);
143145
}
144146

145147
vx_array_sink *sink;

vortex-array/src/builders/decimal.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl<T: NativeDecimalType> DecimalBuilder<T> {
107107
}
108108
}
109109

110-
impl<T: NativeDecimalType + Default + Send + 'static> ArrayBuilder for DecimalBuilder<T> {
110+
impl<T: NativeDecimalType> ArrayBuilder for DecimalBuilder<T> {
111111
fn as_any(&self) -> &dyn Any {
112112
self
113113
}

vortex-array/src/builders/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use extension::*;
4343
pub use list::*;
4444
pub use null::*;
4545
pub use primitive::*;
46+
pub use struct_::*;
4647
pub use varbinview::*;
4748
use vortex_dtype::{DType, match_each_native_ptype};
4849
use vortex_error::{VortexResult, vortex_bail, vortex_err};
@@ -53,7 +54,6 @@ use vortex_scalar::{
5354
};
5455

5556
use crate::arrays::precision_to_storage_size;
56-
use crate::builders::struct_::StructBuilder;
5757
use crate::{Array, ArrayRef};
5858

5959
pub trait ArrayBuilder: Send {

vortex-array/src/serde.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::iter;
33
use std::sync::Arc;
44

55
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6-
use itertools::Itertools;
76
use vortex_buffer::{Alignment, ByteBuffer};
87
use vortex_dtype::{DType, TryFromBytes};
98
use vortex_error::{
@@ -44,12 +43,10 @@ impl dyn Array + '_ {
4443
options: &SerializeOptions,
4544
) -> VortexResult<Vec<ByteBuffer>> {
4645
// Collect all array buffers
47-
let mut array_buffers = vec![];
48-
for a in self.depth_first_traversal() {
49-
for buffer in a.buffers() {
50-
array_buffers.push(buffer);
51-
}
52-
}
46+
let array_buffers = self
47+
.depth_first_traversal()
48+
.flat_map(|f| f.buffers())
49+
.collect::<Vec<_>>();
5350

5451
// Allocate result buffers, including a possible padding buffer for each.
5552
let mut buffers = vec![];
@@ -183,28 +180,28 @@ impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
183180
// Assign buffer indices for all child arrays.
184181
let nbuffers = u16::try_from(self.array.nbuffers())
185182
.vortex_expect("Array can have at most u16::MAX buffers");
186-
let child_buffer_idx = self.buffer_idx + nbuffers;
183+
let mut child_buffer_idx = self.buffer_idx + nbuffers;
187184

188-
let children = self
185+
let children = &self
189186
.array
190187
.children()
191188
.iter()
192-
.scan(child_buffer_idx, |buffer_idx, child| {
189+
.map(|child| {
193190
// Update the number of buffers required.
194191
let msg = ArrayNodeFlatBuffer {
195192
ctx: self.ctx,
196193
array: child,
197-
buffer_idx: *buffer_idx,
194+
buffer_idx: child_buffer_idx,
198195
}
199196
.write_flatbuffer(fbb);
200-
*buffer_idx = u16::try_from(child.nbuffers_recursive())
197+
child_buffer_idx = u16::try_from(child.nbuffers_recursive())
201198
.ok()
202-
.and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
199+
.and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
203200
.vortex_expect("Too many buffers (u16) for Array");
204-
Some(msg)
201+
msg
205202
})
206-
.collect_vec();
207-
let children = Some(fbb.create_vector(&children));
203+
.collect::<Vec<_>>();
204+
let children = Some(fbb.create_vector(children));
208205

209206
let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
210207
let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));

vortex-array/src/stats/array.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ impl StatsSetRef<'_> {
156156

157157
pub fn compute_all(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
158158
let mut stats_set = StatsSet::default();
159-
for stat in stats {
160-
if let Some(s) = self.compute_stat(*stat)? {
161-
stats_set.set(*stat, Precision::exact(s))
159+
for &stat in stats {
160+
if let Some(s) = self.compute_stat(stat)? {
161+
stats_set.set(stat, Precision::exact(s))
162162
}
163163
}
164164
Ok(stats_set)

vortex-array/src/stats/flatbuffers.rs

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,57 @@
1-
use enum_iterator::all;
21
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
3-
use vortex_error::VortexError;
4-
use vortex_flatbuffers::{ReadFlatBuffer, WriteFlatBuffer};
2+
use vortex_error::{VortexError, vortex_bail};
3+
use vortex_flatbuffers::{ReadFlatBuffer, WriteFlatBuffer, array as fba};
54
use vortex_scalar::ScalarValue;
65

76
use super::traits::{StatsProvider, StatsProviderExt};
87
use crate::stats::{Precision, Stat, StatsSet};
98

109
impl WriteFlatBuffer for StatsSet {
11-
type Target<'t> = crate::flatbuffers::ArrayStats<'t>;
10+
type Target<'t> = fba::ArrayStats<'t>;
1211

1312
/// All statistics written must be exact
1413
fn write_flatbuffer<'fb>(
1514
&self,
1615
fbb: &mut FlatBufferBuilder<'fb>,
1716
) -> WIPOffset<Self::Target<'fb>> {
18-
let min = self
17+
let (min_precision, min) = self
1918
.get(Stat::Min)
20-
.and_then(Precision::as_exact)
21-
.map(|min| fbb.create_vector(&min.to_protobytes::<Vec<u8>>()));
19+
.map(|sum| {
20+
(
21+
if sum.is_exact() {
22+
fba::Precision::Exact
23+
} else {
24+
fba::Precision::Inexact
25+
},
26+
Some(fbb.create_vector(&sum.into_inner().to_protobytes::<Vec<u8>>())),
27+
)
28+
})
29+
.unwrap_or_else(|| (fba::Precision::Inexact, None));
2230

23-
let max = self
31+
let (max_precision, max) = self
2432
.get(Stat::Max)
25-
.and_then(Precision::as_exact)
26-
.map(|max| fbb.create_vector(&max.to_protobytes::<Vec<u8>>()));
33+
.map(|sum| {
34+
(
35+
if sum.is_exact() {
36+
fba::Precision::Exact
37+
} else {
38+
fba::Precision::Inexact
39+
},
40+
Some(fbb.create_vector(&sum.into_inner().to_protobytes::<Vec<u8>>())),
41+
)
42+
})
43+
.unwrap_or_else(|| (fba::Precision::Inexact, None));
2744

2845
let sum = self
2946
.get(Stat::Sum)
3047
.and_then(Precision::as_exact)
3148
.map(|sum| fbb.create_vector(&sum.to_protobytes::<Vec<u8>>()));
3249

33-
let stat_args = &crate::flatbuffers::ArrayStatsArgs {
50+
let stat_args = &fba::ArrayStatsArgs {
3451
min,
52+
min_precision,
3553
max,
54+
max_precision,
3655
sum,
3756
is_sorted: self
3857
.get_as::<bool>(Stat::IsSorted)
@@ -54,20 +73,20 @@ impl WriteFlatBuffer for StatsSet {
5473
.and_then(Precision::as_exact),
5574
};
5675

57-
crate::flatbuffers::ArrayStats::create(fbb, stat_args)
76+
fba::ArrayStats::create(fbb, stat_args)
5877
}
5978
}
6079

6180
impl ReadFlatBuffer for StatsSet {
62-
type Source<'a> = crate::flatbuffers::ArrayStats<'a>;
81+
type Source<'a> = fba::ArrayStats<'a>;
6382
type Error = VortexError;
6483

6584
fn read_flatbuffer<'buf>(
6685
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
6786
) -> Result<Self, Self::Error> {
6887
let mut stats_set = StatsSet::default();
6988

70-
for stat in all::<Stat>() {
89+
for stat in Stat::all() {
7190
match stat {
7291
Stat::IsConstant => {
7392
if let Some(is_constant) = fb.is_constant() {
@@ -89,17 +108,27 @@ impl ReadFlatBuffer for StatsSet {
89108
}
90109
Stat::Max => {
91110
if let Some(max) = fb.max() {
111+
let value = ScalarValue::from_protobytes(max.bytes())?;
92112
stats_set.set(
93113
Stat::Max,
94-
Precision::Exact(ScalarValue::from_protobytes(max.bytes())?),
114+
match fb.max_precision() {
115+
fba::Precision::Exact => Precision::Exact(value),
116+
fba::Precision::Inexact => Precision::Inexact(value),
117+
_ => vortex_bail!("Corrupted max_precision field"),
118+
},
95119
);
96120
}
97121
}
98122
Stat::Min => {
99123
if let Some(min) = fb.min() {
124+
let value = ScalarValue::from_protobytes(min.bytes())?;
100125
stats_set.set(
101126
Stat::Min,
102-
Precision::Exact(ScalarValue::from_protobytes(min.bytes())?),
127+
match fb.min_precision() {
128+
fba::Precision::Exact => Precision::Exact(value),
129+
fba::Precision::Inexact => Precision::Inexact(value),
130+
_ => vortex_bail!("Corrupted min_precision field"),
131+
},
103132
);
104133
}
105134
}

vortex-array/src/stats/mod.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::hash::Hash;
55

66
use arrow_buffer::bit_iterator::BitIterator;
77
use arrow_buffer::{BooleanBufferBuilder, MutableBuffer};
8-
use enum_iterator::{Sequence, last};
8+
use enum_iterator::{Sequence, all, last};
99
use log::debug;
1010
use num_enum::{IntoPrimitive, TryFromPrimitive};
1111
pub use stats_set::*;
@@ -37,19 +37,6 @@ pub const PRUNING_STATS: &[Stat] = &[
3737
Stat::NaNCount,
3838
];
3939

40-
/// Stats to keep when serializing arrays to layouts
41-
pub const STATS_TO_WRITE: &[Stat] = &[
42-
Stat::Min,
43-
Stat::Max,
44-
Stat::NullCount,
45-
Stat::NaNCount,
46-
Stat::Sum,
47-
Stat::IsConstant,
48-
Stat::IsSorted,
49-
Stat::IsStrictSorted,
50-
Stat::UncompressedSizeInBytes,
51-
];
52-
5340
#[derive(
5441
Debug,
5542
Clone,
@@ -236,6 +223,10 @@ impl Stat {
236223
Self::NaNCount => "nan_count",
237224
}
238225
}
226+
227+
pub fn all() -> impl Iterator<Item = Stat> {
228+
all::<Self>()
229+
}
239230
}
240231

241232
pub fn as_stat_bitset_bytes(stats: &[Stat]) -> Vec<u8> {

vortex-array/src/stats/precision.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ where
2525
{
2626
fn clone(&self) -> Self {
2727
match self {
28-
Self::Exact(e) => Self::Exact(e.clone()),
29-
Self::Inexact(ie) => Self::Inexact(ie.clone()),
28+
Exact(e) => Exact(e.clone()),
29+
Inexact(ie) => Inexact(ie.clone()),
3030
}
3131
}
3232
}
@@ -129,7 +129,8 @@ impl<T> Precision<T> {
129129
Ok(precision)
130130
}
131131

132-
pub(crate) fn into_inner(self) -> T {
132+
/// Unwrap the underlying value
133+
pub fn into_inner(self) -> T {
133134
match self {
134135
Exact(val) | Inexact(val) => val,
135136
}

vortex-buffer/src/string.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ impl BufferString {
2727
pub fn into_inner(self) -> ByteBuffer {
2828
self.0
2929
}
30+
31+
/// Returns reference to the inner [`ByteBuffer`].
32+
pub fn inner(&self) -> &ByteBuffer {
33+
&self.0
34+
}
3035
}
3136

3237
impl Debug for BufferString {

0 commit comments

Comments
 (0)