Skip to content

Commit d7904de

Browse files
authored
duckdb: file list via glob (#3532)
Signed-off-by: Alexander Droste <[email protected]>
1 parent 51786f1 commit d7904de

File tree

6 files changed

+154
-134
lines changed

6 files changed

+154
-134
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ cfg-if = "1"
8181
chrono = "0.4.40"
8282
clap = "4.5"
8383
compio = { version = "0.14", features = ["io-uring"], default-features = false }
84+
crossbeam-queue = "0.3"
8485
crossterm = "0.28"
8586
dashmap = "6.1.0"
8687
datafusion = { version = "47", default-features = false }
@@ -100,6 +101,7 @@ flume = "0.11"
100101
fsst-rs = "0.5.2"
101102
futures = { version = "0.3.31", default-features = false }
102103
futures-util = "0.3.31"
104+
glob = "0.3.2"
103105
goldenfile = "1"
104106
half = { version = "2.5", features = ["std", "num-traits"] }
105107
hashbrown = "0.15.1"

duckdb-vortex/src/vortex_scan.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,6 @@ void RegisterScanFunction(DatabaseInstance &instance) {
545545

546546
vortex_scan.init_global = [](ClientContext &context,
547547
TableFunctionInitInput &input) -> unique_ptr<GlobalTableFunctionState> {
548-
auto &nonbind = input.bind_data->Cast<ScanBindData>();
549548
auto &bind = input.bind_data->CastNoConst<ScanBindData>();
550549
auto global_state = make_uniq<ScanGlobalState>();
551550

vortex-duckdb-ext/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ crate-type = ["staticlib", "cdylib"]
2222
[dependencies]
2323
arrow-array = { workspace = true }
2424
bitvec = { workspace = true }
25+
crossbeam-queue = { workspace = true }
2526
duckdb = { version = "1.3.0", features = ["vtab-full"] }
27+
glob = { workspace = true }
2628
itertools = { workspace = true }
2729
num-traits = { workspace = true }
2830
tempfile = { workspace = true }

vortex-duckdb-ext/src/lib.rs

Lines changed: 0 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -55,110 +55,3 @@ pub extern "C" fn vortex_extension_version() -> *const c_char {
5555
}
5656
.as_ptr()
5757
}
58-
59-
#[cfg(test)]
60-
mod tests {
61-
use duckdb::Connection;
62-
use tempfile::NamedTempFile;
63-
use vortex::IntoArray;
64-
use vortex::arrays::{BoolArray, ConstantArray, PrimitiveArray, StructArray, VarBinArray};
65-
use vortex::file::VortexWriteOptions;
66-
use vortex::scalar::Scalar;
67-
use vortex::validity::Validity;
68-
69-
use crate::duckdb::Database;
70-
71-
fn database_connection() -> Connection {
72-
let db = Database::open_in_memory().unwrap();
73-
let connection = db.connect().unwrap();
74-
super::init(&connection).unwrap();
75-
unsafe { Connection::open_from_raw(db.as_ptr().cast()) }.unwrap()
76-
}
77-
78-
fn create_temp_file() -> NamedTempFile {
79-
NamedTempFile::new().unwrap()
80-
}
81-
82-
async fn write_vortex_file(field_name: &str, array: impl IntoArray) -> NamedTempFile {
83-
let temp_file_path = create_temp_file();
84-
85-
let struct_array = StructArray::from_fields(&[(field_name, array.into_array())]).unwrap();
86-
let file = tokio::fs::File::create(&temp_file_path).await.unwrap();
87-
VortexWriteOptions::default()
88-
.write(file, struct_array.to_array_stream())
89-
.await
90-
.unwrap();
91-
92-
temp_file_path
93-
}
94-
95-
fn scan_vortex_file<T>(tmp_file: NamedTempFile, query: &str) -> T
96-
where
97-
T: duckdb::types::FromSql,
98-
{
99-
let conn = database_connection();
100-
conn.prepare(query)
101-
.unwrap()
102-
.query_row([tmp_file.path().to_string_lossy()], |row| row.get(0))
103-
.unwrap()
104-
}
105-
106-
#[test]
107-
fn test_scan_function_registration() {
108-
let conn = database_connection();
109-
let result: String = conn
110-
.prepare(
111-
"SELECT function_name FROM duckdb_functions() WHERE function_name = 'vortex_scan'",
112-
)
113-
.unwrap()
114-
.query_row([], |row| row.get(0))
115-
.unwrap();
116-
assert_eq!(&result, "vortex_scan");
117-
}
118-
119-
#[tokio::test]
120-
async fn test_vortex_scan_strings() {
121-
let strings = VarBinArray::from(vec!["Hello", "Hi", "Hey"]);
122-
let file = write_vortex_file("strings", strings).await;
123-
let result: String =
124-
scan_vortex_file(file, "SELECT string_agg(strings, ',') FROM vortex_scan(?)");
125-
assert_eq!(result, "Hello,Hi,Hey");
126-
}
127-
128-
#[tokio::test]
129-
async fn test_vortex_scan_integers() {
130-
let numbers = PrimitiveArray::from_iter([1i32, 42, 100, -5, 0]);
131-
let file = write_vortex_file("number", numbers).await;
132-
let sum: i64 = scan_vortex_file(file, "SELECT SUM(number) FROM vortex_scan(?)");
133-
assert_eq!(sum, 138);
134-
}
135-
136-
#[tokio::test]
137-
async fn test_vortex_scan_floats() {
138-
let values = PrimitiveArray::from_iter([1.5f64, -2.5, 0.0, 42.42]);
139-
let file = write_vortex_file("value", values).await;
140-
let count: i64 =
141-
scan_vortex_file(file, "SELECT COUNT(*) FROM vortex_scan(?) WHERE value > 0");
142-
assert_eq!(count, 2);
143-
}
144-
145-
#[tokio::test]
146-
async fn test_vortex_scan_constant() {
147-
let constant = ConstantArray::new(Scalar::from(42i32), 100);
148-
let file = write_vortex_file("constant", constant).await;
149-
let value: i32 = scan_vortex_file(file, "SELECT constant FROM vortex_scan(?) LIMIT 1");
150-
assert_eq!(value, 42);
151-
}
152-
153-
#[tokio::test]
154-
async fn test_vortex_scan_booleans() {
155-
let flags = vec![true, false, true, true, false];
156-
let flags_array = BoolArray::new(flags.into(), Validity::NonNullable);
157-
let file = write_vortex_file("flag", flags_array).await;
158-
let true_count: i64 = scan_vortex_file(
159-
file,
160-
"SELECT COUNT(*) FROM vortex_scan(?) WHERE flag = true",
161-
);
162-
assert_eq!(true_count, 3);
163-
}
164-
}

vortex-duckdb-ext/src/scan.rs

Lines changed: 148 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use std::sync::atomic::AtomicBool;
1+
use std::path::PathBuf;
22

3+
use crossbeam_queue::SegQueue;
34
use vortex::error::{VortexResult, vortex_bail, vortex_err};
45
use vortex::file::{VortexFile, VortexOpenOptions};
56

@@ -9,14 +10,13 @@ use crate::duckdb::{
910
use crate::exporter::ArrayIteratorExporter;
1011

1112
pub struct VortexBindData {
12-
first_file: VortexFile,
13+
_first_file: VortexFile,
14+
file_list: SegQueue<PathBuf>,
1315
_column_names: Vec<String>,
1416
_column_types: Vec<LogicalType>,
1517
}
1618

17-
pub struct VortexGlobalData {
18-
done: AtomicBool,
19-
}
19+
pub struct VortexGlobalData {}
2020

2121
pub struct VortexLocalData {
2222
exporter: Option<ArrayIteratorExporter>,
@@ -63,7 +63,6 @@ impl TableFunction for VortexTableFunction {
6363
}
6464

6565
fn bind(input: &BindInput, result: &mut BindResult) -> VortexResult<Self::BindData> {
66-
// TODO: expand glob & assign to file list
6766
let file_glob_string = input
6867
.get_parameter(0)
6968
.ok_or_else(|| vortex_err!("Missing file glob parameter"))?;
@@ -80,8 +79,22 @@ impl TableFunction for VortexTableFunction {
8079
result.add_result_column(name, logical_type);
8180
}
8281

82+
let paths = match glob::glob(file_glob_string.as_string().to_str()?) {
83+
Ok(paths) => paths,
84+
Err(e) => vortex_bail!("Failed to glob files: {}", e),
85+
};
86+
87+
let file_list = SegQueue::new();
88+
for path in paths {
89+
match path {
90+
Ok(path) => file_list.push(path),
91+
Err(e) => vortex_bail!("Failed to glob files: {}", e),
92+
}
93+
}
94+
8395
Ok(VortexBindData {
84-
first_file,
96+
file_list,
97+
_first_file: first_file,
8598
_column_names: column_names,
8699
_column_types: column_types,
87100
})
@@ -90,23 +103,26 @@ impl TableFunction for VortexTableFunction {
90103
fn scan(
91104
bind_data: &Self::BindData,
92105
local_state: &mut Self::LocalState,
93-
global_state: &mut Self::GlobalState,
106+
_global_state: &mut Self::GlobalState,
94107
chunk: &mut DataChunk,
95108
) -> VortexResult<()> {
96-
if global_state.done.load(std::sync::atomic::Ordering::SeqCst) {
97-
// Signal to DuckDB that there's no work left by setting the chunk length to 0.
98-
chunk.set_len(0);
99-
return Ok(());
100-
}
101-
102109
if local_state.exporter.is_none() {
103-
let array_iter = bind_data
104-
.first_file
105-
.scan()?
106-
.into_array_iter()
107-
.map_err(|e| vortex_err!("Failed to create array iterator: {}", e))?;
110+
if let Some(file_path) = bind_data.file_list.pop() {
111+
let file = VortexOpenOptions::file()
112+
.open_blocking(&file_path)
113+
.map_err(|e| vortex_err!("Failed to open Vortex file: {}", e))?;
108114

109-
local_state.exporter = Some(ArrayIteratorExporter::new(Box::new(array_iter)));
115+
let array_iter = file
116+
.scan()?
117+
.into_array_iter()
118+
.map_err(|e| vortex_err!("Failed to create array iterator: {}", e))?;
119+
120+
local_state.exporter = Some(ArrayIteratorExporter::new(Box::new(array_iter)));
121+
} else {
122+
// If the exporter is None and there are no more files to process, signal that the scan finished.
123+
chunk.set_len(0);
124+
return Ok(());
125+
}
110126
}
111127

112128
let Some(ref mut exporter) = local_state.exporter else {
@@ -118,18 +134,14 @@ impl TableFunction for VortexTableFunction {
118134
.map_err(|e| vortex_err!("Failed to export data: {}", e))?;
119135

120136
if !is_data_left_to_scan {
121-
global_state
122-
.done
123-
.store(true, std::sync::atomic::Ordering::SeqCst);
137+
local_state.exporter = None;
124138
}
125139

126140
Ok(())
127141
}
128142

129143
fn init_global(_init: &TableInitInput<Self>) -> VortexResult<Self::GlobalState> {
130-
Ok(VortexGlobalData {
131-
done: AtomicBool::new(false),
132-
})
144+
Ok(VortexGlobalData {})
133145
}
134146

135147
fn init_local(
@@ -146,3 +158,113 @@ impl TableFunction for VortexTableFunction {
146158
Ok(false)
147159
}
148160
}
161+
162+
#[cfg(test)]
163+
mod tests {
164+
use duckdb::Connection;
165+
use tempfile::NamedTempFile;
166+
use vortex::IntoArray;
167+
use vortex::arrays::{BoolArray, ConstantArray, PrimitiveArray, StructArray, VarBinArray};
168+
use vortex::file::VortexWriteOptions;
169+
use vortex::scalar::Scalar;
170+
use vortex::validity::Validity;
171+
172+
use super::*;
173+
use crate::duckdb::Database;
174+
175+
fn database_connection() -> Connection {
176+
let db = Database::open_in_memory().unwrap();
177+
let connection = db.connect().unwrap();
178+
connection
179+
.register_table_function::<VortexTableFunction>(c"vortex_scan")
180+
.unwrap();
181+
unsafe { Connection::open_from_raw(db.as_ptr().cast()) }.unwrap()
182+
}
183+
184+
fn create_temp_file() -> NamedTempFile {
185+
NamedTempFile::new().unwrap()
186+
}
187+
188+
async fn write_vortex_file(field_name: &str, array: impl IntoArray) -> NamedTempFile {
189+
let temp_file_path = create_temp_file();
190+
191+
let struct_array = StructArray::from_fields(&[(field_name, array.into_array())]).unwrap();
192+
let file = tokio::fs::File::create(&temp_file_path).await.unwrap();
193+
VortexWriteOptions::default()
194+
.write(file, struct_array.to_array_stream())
195+
.await
196+
.unwrap();
197+
198+
temp_file_path
199+
}
200+
201+
fn scan_vortex_file<T>(tmp_file: NamedTempFile, query: &str) -> T
202+
where
203+
T: duckdb::types::FromSql,
204+
{
205+
let conn = database_connection();
206+
conn.prepare(query)
207+
.unwrap()
208+
.query_row([tmp_file.path().to_string_lossy()], |row| row.get(0))
209+
.unwrap()
210+
}
211+
212+
#[test]
213+
fn test_scan_function_registration() {
214+
let conn = database_connection();
215+
let result: String = conn
216+
.prepare(
217+
"SELECT function_name FROM duckdb_functions() WHERE function_name = 'vortex_scan'",
218+
)
219+
.unwrap()
220+
.query_row([], |row| row.get(0))
221+
.unwrap();
222+
assert_eq!(&result, "vortex_scan");
223+
}
224+
225+
#[tokio::test]
226+
async fn test_vortex_scan_strings() {
227+
let strings = VarBinArray::from(vec!["Hello", "Hi", "Hey"]);
228+
let file = write_vortex_file("strings", strings).await;
229+
let result: String =
230+
scan_vortex_file(file, "SELECT string_agg(strings, ',') FROM vortex_scan(?)");
231+
assert_eq!(result, "Hello,Hi,Hey");
232+
}
233+
234+
#[tokio::test]
235+
async fn test_vortex_scan_integers() {
236+
let numbers = PrimitiveArray::from_iter([1i32, 42, 100, -5, 0]);
237+
let file = write_vortex_file("number", numbers).await;
238+
let sum: i64 = scan_vortex_file(file, "SELECT SUM(number) FROM vortex_scan(?)");
239+
assert_eq!(sum, 138);
240+
}
241+
242+
#[tokio::test]
243+
async fn test_vortex_scan_floats() {
244+
let values = PrimitiveArray::from_iter([1.5f64, -2.5, 0.0, 42.42]);
245+
let file = write_vortex_file("value", values).await;
246+
let count: i64 =
247+
scan_vortex_file(file, "SELECT COUNT(*) FROM vortex_scan(?) WHERE value > 0");
248+
assert_eq!(count, 2);
249+
}
250+
251+
#[tokio::test]
252+
async fn test_vortex_scan_constant() {
253+
let constant = ConstantArray::new(Scalar::from(42i32), 100);
254+
let file = write_vortex_file("constant", constant).await;
255+
let value: i32 = scan_vortex_file(file, "SELECT constant FROM vortex_scan(?) LIMIT 1");
256+
assert_eq!(value, 42);
257+
}
258+
259+
#[tokio::test]
260+
async fn test_vortex_scan_booleans() {
261+
let flags = vec![true, false, true, true, false];
262+
let flags_array = BoolArray::new(flags.into(), Validity::NonNullable);
263+
let file = write_vortex_file("flag", flags_array).await;
264+
let true_count: i64 = scan_vortex_file(
265+
file,
266+
"SELECT COUNT(*) FROM vortex_scan(?) WHERE flag = true",
267+
);
268+
assert_eq!(true_count, 3);
269+
}
270+
}

0 commit comments

Comments
 (0)