Skip to content

Commit 630835b

Browse files
AdamGSlwwmanningdanking
authored
feat: Layout metadata reader and column statistics (#1455)
Adds a dedicated file metadata reader and uses it to provide DataFusion with file-level statistics. --------- Co-authored-by: Will Manning <[email protected]> Co-authored-by: Daniel King <[email protected]>
1 parent 866d892 commit 630835b

File tree

16 files changed

+486
-40
lines changed

16 files changed

+486
-40
lines changed

bench-vortex/src/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
3333
use vortex::{ArrayData, IntoArrayData, IntoCanonical};
3434

3535
static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
36-
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));
36+
LazyLock::new(|| Arc::new(IoDispatcher::default()));
3737

3838
pub const BATCH_SIZE: usize = 65_536;
3939

vortex-buffer/src/string.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ impl From<String> for BufferString {
4444
}
4545
}
4646

47+
impl From<&str> for BufferString {
48+
fn from(value: &str) -> Self {
49+
Self(Buffer::from(String::from(value).into_bytes()))
50+
}
51+
}
52+
4753
impl TryFrom<Buffer> for BufferString {
4854
type Error = Utf8Error;
4955

vortex-datafusion/src/persistent/format.rs

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::any::Any;
2-
use std::sync::Arc;
2+
use std::sync::{Arc, RwLock};
33

44
use arrow_schema::{Schema, SchemaRef};
55
use async_trait::async_trait;
@@ -9,18 +9,25 @@ use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
99
use datafusion::execution::SessionState;
1010
use datafusion_common::parsers::CompressionTypeVariant;
1111
use datafusion_common::stats::Precision;
12-
use datafusion_common::{not_impl_err, DataFusionError, Result as DFResult, Statistics};
12+
use datafusion_common::{
13+
not_impl_err, ColumnStatistics, DataFusionError, Result as DFResult, Statistics,
14+
};
1315
use datafusion_expr::Expr;
1416
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
1517
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
1618
use datafusion_physical_plan::ExecutionPlan;
1719
use object_store::{ObjectMeta, ObjectStore};
1820
use vortex_array::arrow::infer_schema;
1921
use vortex_array::Context;
20-
use vortex_file::{read_initial_bytes, VORTEX_FILE_EXTENSION};
21-
use vortex_io::ObjectStoreReadAt;
22+
use vortex_file::metadata::MetadataFetcher;
23+
use vortex_file::{
24+
read_initial_bytes, LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache,
25+
Scan, VORTEX_FILE_EXTENSION,
26+
};
27+
use vortex_io::{IoDispatcher, ObjectStoreReadAt};
2228

2329
use super::execution::VortexExec;
30+
use super::statistics::array_to_col_statistics;
2431
use crate::can_be_pushed_down;
2532

2633
#[derive(Debug, Default)]
@@ -86,13 +93,48 @@ impl FileFormat for VortexFormat {
8693
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
8794
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
8895
let layout = initial_read.fb_layout()?;
96+
let dtype = initial_read.lazy_dtype().map_err(|e| {
97+
DataFusionError::External(Box::new(
98+
e.with_context("Failed to fetch dtype from initial read"),
99+
))
100+
})?;
89101
let row_count = layout.row_count();
90102

91-
let stats = Statistics {
92-
num_rows: Precision::Exact(row_count as usize),
93-
total_byte_size: Precision::Absent,
94-
column_statistics: Statistics::unknown_column(&table_schema),
95-
};
103+
let layout_deserializer =
104+
LayoutDeserializer::new(Context::default().into(), LayoutContext::default().into());
105+
let layout_message_cache = Arc::new(RwLock::new(LayoutMessageCache::new()));
106+
let relative_message_cache =
107+
RelativeLayoutCache::new(layout_message_cache.clone(), dtype.into());
108+
109+
let root_layout = vortex_file::read_layout_from_initial(
110+
&initial_read,
111+
&layout_deserializer,
112+
Scan::empty(),
113+
relative_message_cache,
114+
)?;
115+
116+
let io = IoDispatcher::default();
117+
let mut stats = Statistics::new_unknown(&table_schema);
118+
stats.num_rows = Precision::Exact(row_count as usize);
119+
120+
let metadata_table =
121+
MetadataFetcher::fetch(os_read_at, io.into(), root_layout, layout_message_cache)
122+
.await?;
123+
124+
if let Some(metadata) = metadata_table {
125+
let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
126+
127+
for col_stats in metadata.into_iter() {
128+
let col_stats = match col_stats {
129+
Some(array) => array_to_col_statistics(array.try_into()?)?,
130+
None => ColumnStatistics::new_unknown(),
131+
};
132+
133+
column_statistics.push(col_stats);
134+
}
135+
136+
stats.column_statistics = column_statistics;
137+
}
96138

97139
Ok(stats)
98140
}

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ pub mod config;
22
pub mod execution;
33
pub mod format;
44
pub mod opener;
5+
pub mod statistics;

vortex-datafusion/src/persistent/opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use vortex_io::{IoDispatcher, ObjectStoreReadAt};
1414

1515
/// Share an IO dispatcher across all DataFusion instances.
1616
static IO_DISPATCHER: LazyLock<Arc<IoDispatcher>> =
17-
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));
17+
LazyLock::new(|| Arc::new(IoDispatcher::default()));
1818

1919
pub struct VortexFileOpener {
2020
pub ctx: Arc<Context>,
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use arrow_array::cast::AsArray;
2+
use arrow_array::types::UInt64Type;
3+
use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
4+
use datafusion_common::stats::Precision;
5+
use datafusion_common::ColumnStatistics;
6+
use datafusion_expr::Accumulator;
7+
use vortex_array::array::StructArray;
8+
use vortex_array::variants::StructArrayTrait as _;
9+
use vortex_array::IntoCanonical;
10+
use vortex_error::VortexResult;
11+
12+
pub fn array_to_col_statistics(array: StructArray) -> VortexResult<ColumnStatistics> {
13+
let mut stats = ColumnStatistics::new_unknown();
14+
15+
if let Some(null_count_array) = array.field_by_name("null_count") {
16+
let array = null_count_array.into_canonical()?.into_arrow()?;
17+
let array = array.as_primitive::<UInt64Type>();
18+
19+
let null_count = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();
20+
stats.null_count = Precision::Exact(null_count as usize);
21+
}
22+
23+
if let Some(max_value_array) = array.field_by_name("max") {
24+
let array = max_value_array.into_canonical()?.into_arrow()?;
25+
let mut acc = MaxAccumulator::try_new(array.data_type())?;
26+
acc.update_batch(&[array])?;
27+
28+
let max_val = acc.evaluate()?;
29+
stats.max_value = Precision::Exact(max_val)
30+
}
31+
32+
if let Some(min_value_array) = array.field_by_name("min") {
33+
let array = min_value_array.into_canonical()?.into_arrow()?;
34+
let mut acc = MinAccumulator::try_new(array.data_type())?;
35+
acc.update_batch(&[array])?;
36+
37+
let max_val = acc.evaluate()?;
38+
stats.min_value = Precision::Exact(max_val)
39+
}
40+
41+
Ok(stats)
42+
}

vortex-file/Cargo.toml

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,7 @@ futures-executor = { workspace = true }
2424
futures-util = { workspace = true }
2525
itertools = { workspace = true }
2626
once_cell = { workspace = true }
27-
tokio = { workspace = true, features = [
28-
"io-util",
29-
"fs",
30-
"rt-multi-thread",
31-
] }
27+
tokio = { workspace = true, features = ["io-util", "fs", "rt-multi-thread"] }
3228
tracing = { workspace = true, optional = true }
3329
vortex-array = { workspace = true }
3430
vortex-buffer = { workspace = true }
@@ -50,8 +46,5 @@ workspace = true
5046

5147
[features]
5248
futures = ["futures-util/io", "vortex-io/futures"]
53-
object_store = [
54-
"vortex-error/object_store",
55-
"vortex-io/object_store",
56-
]
49+
object_store = ["vortex-error/object_store", "vortex-io/object_store"]
5750
tracing = ["dep:tracing", "vortex-io/tracing"]

vortex-file/src/read/builder/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
165165
.transpose()?;
166166

167167
// Default: fallback to single-threaded tokio dispatcher.
168-
let io_dispatcher = self
169-
.io_dispatcher
170-
.unwrap_or_else(|| Arc::new(IoDispatcher::new_tokio(1)));
168+
let io_dispatcher = self.io_dispatcher.unwrap_or_default();
171169

172170
Ok(VortexFileArrayStream::new(
173171
self.read_at,

vortex-file/src/read/layouts/chunked.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::BTreeSet;
2-
use std::sync::RwLock;
2+
use std::sync::{OnceLock, RwLock};
33

44
use bytes::Bytes;
55
use itertools::Itertools;
@@ -14,7 +14,7 @@ use crate::read::cache::RelativeLayoutCache;
1414
use crate::read::mask::RowMask;
1515
use crate::{
1616
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator,
17-
Scan, CHUNKED_LAYOUT_ID,
17+
MetadataRead, Scan, CHUNKED_LAYOUT_ID,
1818
};
1919

2020
#[derive(Default, Debug)]
@@ -84,7 +84,7 @@ impl ChunkedLayoutBuilder {
8484
self.fb_bytes.clone(),
8585
metadata_fb._tab.loc(),
8686
// TODO(robert): Create stats projection
87-
Scan::new(None),
87+
Scan::empty(),
8888
self.message_cache.unknown_dtype(METADATA_LAYOUT_PART_ID),
8989
)
9090
})
@@ -170,6 +170,7 @@ pub struct ChunkedLayoutReader {
170170
layouts: Vec<RangedLayoutReader>,
171171
metadata_layout: Option<Box<dyn LayoutReader>>,
172172
in_progress_ranges: InProgressLayoutRanges,
173+
cached_metadata: OnceLock<ArrayData>,
173174
}
174175

175176
impl ChunkedLayoutReader {
@@ -181,6 +182,7 @@ impl ChunkedLayoutReader {
181182
layouts,
182183
metadata_layout,
183184
in_progress_ranges: RwLock::new(HashMap::new()),
185+
cached_metadata: OnceLock::new(),
184186
}
185187
}
186188

@@ -234,7 +236,6 @@ impl ChunkedLayoutReader {
234236
self.layouts.len()
235237
}
236238

237-
#[allow(dead_code)]
238239
pub fn metadata_layout(&self) -> Option<&dyn LayoutReader> {
239240
self.metadata_layout.as_deref()
240241
}
@@ -277,6 +278,29 @@ impl LayoutReader for ChunkedLayoutReader {
277278
Ok(None)
278279
}
279280
}
281+
282+
fn read_metadata(&self) -> VortexResult<MetadataRead> {
283+
match self.metadata_layout() {
284+
None => Ok(MetadataRead::None),
285+
Some(metadata_layout) => {
286+
if let Some(md) = self.cached_metadata.get() {
287+
return Ok(MetadataRead::Batches(vec![Some(md.clone())]));
288+
}
289+
290+
match metadata_layout
291+
.read_selection(&RowMask::new_valid_between(0, self.n_chunks()))?
292+
{
293+
Some(BatchRead::Batch(array)) => {
294+
// We don't care if the write failed
295+
_ = self.cached_metadata.set(array.clone());
296+
Ok(MetadataRead::Batches(vec![Some(array)]))
297+
}
298+
Some(BatchRead::ReadMore(messages)) => Ok(MetadataRead::ReadMore(messages)),
299+
None => Ok(MetadataRead::None),
300+
}
301+
}
302+
}
303+
}
280304
}
281305

282306
#[cfg(test)]

vortex-file/src/read/layouts/columnar.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ use vortex_array::stats::ArrayStatistics;
99
use vortex_array::validity::Validity;
1010
use vortex_array::{ArrayData, IntoArrayData};
1111
use vortex_dtype::field::Field;
12-
use vortex_dtype::FieldNames;
13-
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
12+
use vortex_dtype::{FieldName, FieldNames};
13+
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult};
1414
use vortex_expr::{Column, Select, VortexExpr};
1515
use vortex_flatbuffers::footer;
1616

1717
use crate::read::cache::{LazyDType, RelativeLayoutCache};
1818
use crate::read::expr_project::expr_project;
1919
use crate::read::mask::RowMask;
2020
use crate::{
21-
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, RowFilter, Scan,
21+
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MetadataRead, RowFilter, Scan,
2222
COLUMNAR_LAYOUT_ID,
2323
};
2424

@@ -203,6 +203,7 @@ pub struct ColumnarLayoutReader {
203203
expr: Option<Arc<dyn VortexExpr>>,
204204
// TODO(robert): This is a hack/optimization that tells us if we're reducing results with AND or not
205205
shortcircuit_siblings: bool,
206+
in_progress_metadata: RwLock<HashMap<FieldName, Option<ArrayData>>>,
206207
}
207208

208209
impl ColumnarLayoutReader {
@@ -220,9 +221,10 @@ impl ColumnarLayoutReader {
220221
Self {
221222
names,
222223
children,
223-
in_progress_ranges: RwLock::new(HashMap::new()),
224224
expr,
225225
shortcircuit_siblings,
226+
in_progress_ranges: RwLock::new(HashMap::new()),
227+
in_progress_metadata: RwLock::new(HashMap::new()),
226228
}
227229
}
228230
}
@@ -303,6 +305,44 @@ impl LayoutReader for ColumnarLayoutReader {
303305
Ok(Some(BatchRead::ReadMore(messages)))
304306
}
305307
}
308+
309+
fn read_metadata(&self) -> VortexResult<MetadataRead> {
310+
let mut in_progress_metadata = self
311+
.in_progress_metadata
312+
.write()
313+
.unwrap_or_else(|e| vortex_panic!("lock is poisoned: {e}"));
314+
let mut messages = Vec::default();
315+
316+
for (name, child_reader) in self.names.iter().zip(self.children.iter()) {
317+
match child_reader.read_metadata()? {
318+
MetadataRead::Batches(data) => {
319+
if data.len() != 1 {
320+
vortex_bail!("expected exactly one metadata array per-child");
321+
}
322+
in_progress_metadata.insert(name.clone(), data[0].clone());
323+
}
324+
MetadataRead::ReadMore(rm) => {
325+
messages.extend(rm);
326+
}
327+
MetadataRead::None => {
328+
in_progress_metadata.insert(name.clone(), None);
329+
}
330+
}
331+
}
332+
333+
// We're done reading
334+
if messages.is_empty() {
335+
let child_arrays = self
336+
.names
337+
.iter()
338+
.map(|name| in_progress_metadata[name].clone()) // TODO(Adam): Some columns might not have statistics
339+
.collect::<Vec<_>>();
340+
341+
Ok(MetadataRead::Batches(child_arrays))
342+
} else {
343+
Ok(MetadataRead::ReadMore(messages))
344+
}
345+
}
306346
}
307347

308348
#[cfg(test)]

0 commit comments

Comments
 (0)