Skip to content

Commit 066138f

Browse files
authored
feature: multi-file-scan thread-safe iterator (#3863)
This PR implements a multi-scan operator which can be driven from multiple threads through `MultiScanIterator`s. Work-stealing will be implemented in a follow up. Further, the implementation will to a global conversion cache shared across all threads. --------- Signed-off-by: Alexander Droste <[email protected]>
1 parent 51d026c commit 066138f

File tree

38 files changed

+401
-178
lines changed

38 files changed

+401
-178
lines changed

Cargo.lock

Lines changed: 25 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ members = [
2323
"vortex-metrics",
2424
"vortex-proto",
2525
"vortex-python",
26+
"vortex-scan",
2627
"vortex-scalar",
2728
"vortex-tui",
2829
"vortex-utils",
@@ -80,7 +81,7 @@ cfg-if = "1"
8081
chrono = "0.4.41"
8182
clap = "4.5"
8283
compio = { version = "0.15", features = ["io-uring"], default-features = false }
83-
crossbeam-queue = "0.3"
84+
crossbeam-queue = "0.3.12"
8485
crossterm = "0.28"
8586
dashmap = "6.1.0"
8687
datafusion = { version = "48", default-features = false }
@@ -209,6 +210,7 @@ vortex-pco = { version = "0.1.0", path = "./encodings/pco", default-features = f
209210
vortex-proto = { version = "0.1.0", path = "./vortex-proto", default-features = false }
210211
vortex-runend = { version = "0.1.0", path = "./encodings/runend", default-features = false }
211212
vortex-scalar = { version = "0.1.0", path = "./vortex-scalar", default-features = false }
213+
vortex-scan = { version = "0.1.0", path = "./vortex-scan", default-features = false }
212214
vortex-sequence = { version = "0.1.0", path = "encodings/sequence", default-features = false }
213215
vortex-sparse = { version = "0.1.0", path = "./encodings/sparse", default-features = false }
214216
vortex-tui = { version = "0.1.0", path = "./vortex-tui", default-features = false }

vortex-datafusion/src/persistent/opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ use tokio::runtime::Handle;
1616
use vortex::ArrayRef;
1717
use vortex::error::VortexError;
1818
use vortex::expr::{ExprRef, VortexExpr};
19-
use vortex::file::scan::ScanBuilder;
2019
use vortex::layout::LayoutReader;
2120
use vortex::metrics::VortexMetrics;
21+
use vortex::scan::ScanBuilder;
2222

2323
use super::cache::VortexFileCache;
2424

vortex-duckdb/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ src/cpp.rs
55

66
# Symlinked DuckDB source is ignored. It is downloaded, extracted and symlinked as part of build.rs
77
duckdb
8+
!src/duckdb

vortex-duckdb/src/exporter/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ mod sequence;
1212
mod temporal;
1313
mod varbinview;
1414

15-
use cache::*;
15+
pub use cache::ConversionCache;
1616
use itertools::Itertools;
1717
use vortex::arrays::{ConstantVTable, StructArray, TemporalArray};
1818
use vortex::dtype::datetime::is_temporal_ext_type;

vortex-duckdb/src/scan.rs

Lines changed: 68 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,23 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::path::PathBuf;
5+
use std::sync::Arc;
56
use std::sync::atomic::AtomicU64;
67
use std::sync::atomic::Ordering::SeqCst;
7-
use std::sync::*;
88

9-
use atomic::AtomicBool;
109
use bitvec::macros::internal::funty::Fundamental;
11-
use crossbeam_queue::SegQueue;
10+
use vortex::ToCanonical;
1211
use vortex::dtype::FieldNames;
1312
use vortex::error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
1413
use vortex::expr::{ExprRef, and, and_collect, lit, root, select};
1514
use vortex::file::{VortexFile, VortexOpenOptions};
15+
use vortex::scan::{MultiScan, MultiScanIterator};
1616

1717
use crate::convert::{try_from_bound_expression, try_from_table_filter};
1818
use crate::duckdb::{
1919
BindInput, BindResult, DataChunk, Expression, LogicalType, TableFunction, TableInitInput,
2020
};
21-
use crate::exporter::ArrayIteratorExporter;
21+
use crate::exporter::{ArrayExporter, ConversionCache};
2222

2323
pub struct VortexBindData {
2424
first_file: VortexFile,
@@ -54,17 +54,16 @@ impl std::fmt::Debug for VortexBindData {
5454
}
5555

5656
pub struct VortexGlobalData {
57-
file_paths: SegQueue<PathBuf>,
58-
is_first_file_processed: AtomicBool,
59-
/// We currently use a conversion cache to cache converted arrays, this id is used to
60-
/// ensure that each cache created has a unique id used to name those arrays
61-
conversion_cache_id: AtomicU64,
62-
filter_expr: Option<ExprRef>,
63-
projection_expr: ExprRef,
57+
multi_scan: MultiScan,
58+
cache_id: AtomicU64,
6459
}
6560

6661
pub struct VortexLocalData {
67-
exporter: Option<ArrayIteratorExporter>,
62+
multi_scan_iterator: MultiScanIterator,
63+
exporter: Option<ArrayExporter>,
64+
65+
// TODO(Alex): replace with global conversion cache
66+
conversion_cache: ConversionCache,
6867
}
6968

7069
#[derive(Debug)]
@@ -148,16 +147,6 @@ fn extract_table_filter_expr(
148147
Ok(Some(filter_expr))
149148
}
150149

151-
/// Creates a lock-free queue populated with file paths from bind data.
152-
fn create_file_paths_queue(bind_data: &VortexBindData) -> SegQueue<PathBuf> {
153-
let file_paths = SegQueue::new();
154-
// Skip the first file as it is opened during bind.
155-
for path in bind_data.file_paths.iter().skip(1) {
156-
file_paths.push(path.clone());
157-
}
158-
file_paths
159-
}
160-
161150
impl TableFunction for VortexTableFunction {
162151
type BindData = VortexBindData;
163152
type GlobalState = VortexGlobalData;
@@ -210,82 +199,92 @@ impl TableFunction for VortexTableFunction {
210199
}
211200

212201
fn scan(
213-
bind_data: &Self::BindData,
202+
_bind_data: &Self::BindData,
214203
local_state: &mut Self::LocalState,
215204
global_state: &mut Self::GlobalState,
216205
chunk: &mut DataChunk,
217206
) -> VortexResult<()> {
218-
let exporter_for_file =
219-
|file: &VortexFile, id: u64| -> VortexResult<ArrayIteratorExporter> {
220-
let array_iterator = file
221-
.scan()?
222-
.with_projection(global_state.projection_expr.clone())
223-
.with_some_filter(global_state.filter_expr.clone())
224-
.into_array_iter()
225-
.map_err(|e| vortex_err!("Failed to create array iterator: {}", e))?;
226-
227-
Ok(ArrayIteratorExporter::new(Box::new(array_iterator), id))
228-
};
229-
230207
loop {
231208
if local_state.exporter.is_none() {
232-
if !global_state.is_first_file_processed.swap(true, SeqCst) {
233-
let cache_id = global_state.conversion_cache_id.fetch_add(1, SeqCst);
234-
local_state.exporter =
235-
Some(exporter_for_file(&bind_data.first_file, cache_id)?);
236-
}
237-
// Retrieve a file path from the shared lock-free queue.
238-
else if let Some(file_path) = global_state.file_paths.pop() {
239-
let file = VortexOpenOptions::file()
240-
.open_blocking(&file_path)
241-
.map_err(|e| vortex_err!("Failed to open Vortex file: {}", e))?;
242-
243-
let cache_id = global_state.conversion_cache_id.fetch_add(1, SeqCst);
244-
local_state.exporter = Some(exporter_for_file(&file, cache_id)?);
245-
} else {
246-
// If the exporter is None and there are no more files to process, signal that the scan finished.
247-
chunk.set_len(0);
209+
let Some(array_result) = local_state.multi_scan_iterator.next() else {
248210
return Ok(());
249-
}
211+
};
212+
213+
// TODO(Alex): replace with global conversion cache
214+
local_state.conversion_cache =
215+
ConversionCache::new(global_state.cache_id.fetch_add(1, SeqCst));
216+
217+
local_state.exporter = Some(ArrayExporter::try_new(
218+
&array_result?.to_struct()?,
219+
&mut local_state.conversion_cache,
220+
)?);
250221
}
251222

252-
let Some(ref mut exporter) = local_state.exporter else {
253-
vortex_bail!("ArrayIteratorExporter is not set")
254-
};
223+
let exporter = local_state
224+
.exporter
225+
.as_mut()
226+
.vortex_expect("exporter should exist");
255227

256-
let is_data_left_to_scan = !exporter
257-
.export(chunk)
258-
.map_err(|e| vortex_err!("Failed to export data: {}", e))?;
228+
let has_more_data = exporter.export(chunk)?;
259229

260-
if is_data_left_to_scan {
230+
if !has_more_data {
231+
// This exporter is fully consumed.
261232
local_state.exporter = None;
262233
} else {
263-
assert!(!chunk.is_empty());
264-
return Ok(());
234+
break;
265235
}
266236
}
237+
238+
assert!(!chunk.is_empty());
239+
240+
Ok(())
267241
}
268242

269243
fn init_global(init_input: &TableInitInput<Self>) -> VortexResult<Self::GlobalState> {
270244
let bind_data = init_input.bind_data();
271-
let file_paths = create_file_paths_queue(bind_data);
272245
let projection_expr = extract_projection_expr(init_input);
273246
let filter_expr = extract_table_filter_expr(init_input, init_input.column_ids())?;
247+
let is_first_file_queued = Arc::new(std::sync::atomic::AtomicBool::new(false));
248+
249+
let closures = bind_data.file_paths.clone().into_iter().map(move |path| {
250+
let first_file = bind_data.first_file.clone();
251+
let filter_expr = filter_expr.clone();
252+
let projection_expr = projection_expr.clone();
253+
let is_first_file_queued = is_first_file_queued.clone();
254+
255+
move || {
256+
let file = if !is_first_file_queued.swap(true, SeqCst) {
257+
// The first path from `file_paths` is skipped as the first
258+
// file was already opened during bind.
259+
first_file
260+
} else {
261+
VortexOpenOptions::file()
262+
.open_blocking(&path)
263+
.vortex_expect("Failed to open Vortex file")
264+
};
265+
266+
file.scan()
267+
.vortex_expect("Failed to create scan builder")
268+
.with_some_filter(filter_expr)
269+
.with_projection(projection_expr)
270+
}
271+
});
274272

275273
Ok(VortexGlobalData {
276-
file_paths,
277-
is_first_file_processed: AtomicBool::new(false),
278-
conversion_cache_id: AtomicU64::new(0),
279-
filter_expr,
280-
projection_expr,
274+
multi_scan: MultiScan::new().with_scan_builders(closures),
275+
cache_id: AtomicU64::new(0),
281276
})
282277
}
283278

284279
fn init_local(
285280
_init: &TableInitInput<Self>,
286-
_global: &mut Self::GlobalState,
281+
global: &mut Self::GlobalState,
287282
) -> VortexResult<Self::LocalState> {
288-
Ok(VortexLocalData { exporter: None })
283+
Ok(VortexLocalData {
284+
multi_scan_iterator: global.multi_scan.new_scan_iterator(),
285+
exporter: None,
286+
conversion_cache: ConversionCache::new(global.cache_id.fetch_add(1, SeqCst)),
287+
})
289288
}
290289

291290
fn pushdown_complex_filter(

0 commit comments

Comments
 (0)