Skip to content

Commit 72e3cbd

Browse files
committed
wire duckdb
Signed-off-by: Onur Satici <[email protected]>
1 parent 6dafd16 commit 72e3cbd

File tree

3 files changed

+44
-18
lines changed

3 files changed

+44
-18
lines changed

vortex-duckdb/src/copy.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use vortex::io::runtime::Task;
2525
use vortex::io::runtime::current::CurrentThreadWorkerPool;
2626
use vortex::io::session::RuntimeSessionExt;
2727

28-
use crate::RUNTIME;
2928
use crate::SESSION;
3029
use crate::convert::data_chunk_to_vortex;
3130
use crate::convert::from_duckdb_table;
@@ -53,7 +52,7 @@ pub struct GlobalState {
5352
// Note that this is optional and without it, we would only drive the task when DuckDB calls
5453
// into us, and we call `RUNTIME.block_on`.
5554
#[allow(dead_code)]
56-
worker_pool: CurrentThreadWorkerPool,
55+
worker_pool: Option<CurrentThreadWorkerPool>,
5756
}
5857

5958
impl CopyFunction for VortexCopyFunction {
@@ -86,7 +85,7 @@ impl CopyFunction for VortexCopyFunction {
8685
chunk: &mut DataChunk,
8786
) -> VortexResult<()> {
8887
let chunk = data_chunk_to_vortex(bind_data.fields.names(), chunk);
89-
RUNTIME.block_on(async {
88+
crate::blocking_runtime().block_on(async {
9089
init_global
9190
.sink
9291
.as_mut()
@@ -103,7 +102,7 @@ impl CopyFunction for VortexCopyFunction {
103102
_bind_data: &Self::BindData,
104103
init_global: &mut Self::GlobalState,
105104
) -> VortexResult<()> {
106-
RUNTIME.block_on(async {
105+
crate::blocking_runtime().block_on(async {
107106
if let Some(sink) = init_global.sink.take() {
108107
drop(sink)
109108
}
@@ -131,11 +130,8 @@ impl CopyFunction for VortexCopyFunction {
131130
SESSION.write_options().write(&mut file, array_stream).await
132131
});
133132

134-
let worker_pool = RUNTIME.new_pool();
135-
worker_pool.set_workers_to_available_parallelism();
136-
137133
Ok(GlobalState {
138-
worker_pool,
134+
worker_pool: None,
139135
write_task: Mutex::new(Some(writer)),
140136
sink: Some(sink),
141137
})

vortex-duckdb/src/lib.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
use std::ffi::CStr;
77
use std::ffi::c_char;
88
use std::sync::LazyLock;
9+
use std::sync::OnceLock;
910

1011
use vortex::VortexSessionDefault;
1112
use vortex::error::VortexExpect;
1213
use vortex::error::VortexResult;
1314
use vortex::io::runtime::BlockingRuntime;
15+
use vortex::io::runtime::Handle;
1416
use vortex::io::runtime::current::CurrentThreadRuntime;
17+
pub use vortex::io::runtime::current::CurrentThreadRuntime as DuckdbDefaultRuntime;
1518
use vortex::io::session::RuntimeSessionExt;
1619
use vortex::session::VortexSession;
1720

@@ -40,9 +43,35 @@ mod copy;
4043
mod e2e_test;
4144

4245
// A global runtime for Vortex operations within DuckDB.
43-
static RUNTIME: LazyLock<CurrentThreadRuntime> = LazyLock::new(CurrentThreadRuntime::new);
44-
static SESSION: LazyLock<VortexSession> =
45-
LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle()));
46+
static DEFAULT_RUNTIME: LazyLock<CurrentThreadRuntime> = LazyLock::new(CurrentThreadRuntime::new);
47+
static RUNTIME_OVERRIDE: OnceLock<Handle> = OnceLock::new();
48+
static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
49+
let handle = RUNTIME_OVERRIDE
50+
.get()
51+
.cloned()
52+
.unwrap_or_else(|| DEFAULT_RUNTIME.handle());
53+
VortexSession::default().with_handle(handle)
54+
});
55+
56+
/// Configure the runtime handle used by the DuckDB extension.
57+
///
58+
/// Must be called before the session is first accessed (e.g., before registering table functions).
59+
pub fn configure_runtime(handle: Handle) {
60+
drop(RUNTIME_OVERRIDE.set(handle));
61+
}
62+
63+
/// Returns the effective runtime handle used by the DuckDB extension.
64+
pub fn runtime_handle() -> Handle {
65+
RUNTIME_OVERRIDE
66+
.get()
67+
.cloned()
68+
.unwrap_or_else(|| DEFAULT_RUNTIME.handle())
69+
}
70+
71+
/// Returns the blocking runtime used for synchronous work.
72+
pub fn blocking_runtime() -> &'static CurrentThreadRuntime {
73+
&DEFAULT_RUNTIME
74+
}
4675

4776
/// Register Vortex extension configuration options with DuckDB.
4877
/// This must be called before `register_table_functions` to take effect.

vortex-duckdb/src/scan.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ use vortex::file::VortexOpenOptions;
4343
use vortex::io::runtime::BlockingRuntime;
4444
use vortex::io::runtime::current::ThreadSafeIterator;
4545

46-
use crate::RUNTIME;
4746
use crate::SESSION;
47+
use crate::blocking_runtime;
48+
use crate::runtime_handle;
4849
use crate::convert::try_from_bound_expression;
4950
use crate::convert::try_from_table_filter;
5051
use crate::duckdb;
@@ -274,9 +275,8 @@ impl TableFunction for VortexTableFunction {
274275

275276
log::trace!("running scan with max_threads {max_threads}");
276277

277-
let (file_urls, _metadata) = RUNTIME.block_on(Compat::new(expand_glob(
278-
file_glob_string.as_ref().as_string(),
279-
)))?;
278+
let (file_urls, _metadata) = blocking_runtime()
279+
.block_on(Compat::new(expand_glob(file_glob_string.as_ref().as_string())))?;
280280

281281
// The first file is skipped in `create_file_paths_queue`.
282282
let Some(first_file_url) = file_urls.first() else {
@@ -285,7 +285,7 @@ impl TableFunction for VortexTableFunction {
285285

286286
let footer_cache = FooterCache::new(ctx.object_cache());
287287
let entry = footer_cache.entry(first_file_url.as_ref());
288-
let first_file = RUNTIME.block_on(async move {
288+
let first_file = blocking_runtime().block_on(async move {
289289
let options = entry.apply_to_file(SESSION.open_options());
290290
let file = open_file(first_file_url.clone(), options).await?;
291291
entry.put_if_absent(|| file.footer().clone());
@@ -373,7 +373,8 @@ impl TableFunction for VortexTableFunction {
373373
let client_context = init_input.client_context()?;
374374
let object_cache = client_context.object_cache();
375375

376-
let handle = RUNTIME.handle();
376+
let scan_handle = runtime_handle();
377+
let handle = scan_handle.clone();
377378
let first_file = bind_data.first_file.clone();
378379
let scan_streams = stream::iter(bind_data.file_urls.clone())
379380
.enumerate()
@@ -423,7 +424,7 @@ impl TableFunction for VortexTableFunction {
423424
.filter_map(|result| async move { result.transpose() });
424425

425426
Ok(VortexGlobalData {
426-
iterator: RUNTIME.block_on_stream_thread_safe(move |_| MultiScan {
427+
iterator: blocking_runtime().block_on_stream_thread_safe(move |_| MultiScan {
427428
streams: scan_streams.boxed(),
428429
streams_finished: false,
429430
select_all: Default::default(),

0 commit comments

Comments
 (0)