Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ typedef struct {

const idx_t INVALID_IDX = UINT64_MAX;

extern void vortex_wait_and_fetch(void *task_ptr);

typedef struct {
idx_t partition_index;
// Either INVALID_IDX or position of column in output for file_index column
Expand All @@ -116,7 +118,10 @@ typedef struct {

duckdb_vx_data (*init_local)(void *init_global_data);

void (*function)(void *init_global_data,
// If chunk was exported, return nullptr.
// On error set error_out and return nullptr.
// If we're blocked on input, return task ptr.
void* (*function)(void *init_global_data,
void *init_local_data,
duckdb_data_chunk data_chunk_out,
duckdb_vx_error *error_out);
Expand Down
22 changes: 21 additions & 1 deletion vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ DUCKDB_INCLUDES_BEGIN
#include "duckdb/function/table_function.hpp"
#include "duckdb/main/capi/capi_internal.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parallel/async_result.hpp"
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
DUCKDB_INCLUDES_END

Expand Down Expand Up @@ -249,6 +250,16 @@ init_local(ExecutionContext &, TableFunctionInitInput &input, GlobalTableFunctio
return make_uniq<CTableLocalData>(std::move(cdata));
}

struct VortexWaitTask final : AsyncTask {
explicit VortexWaitTask(void *task_ptr) : task_ptr(task_ptr) {
}
void Execute() override {
vortex_wait_and_fetch(task_ptr);
}

void *task_ptr;
};

void function(ClientContext &, TableFunctionInput &input, DataChunk &output) {
const auto &bind = input.bind_data->Cast<CTableBindData>();

Expand All @@ -257,10 +268,19 @@ void function(ClientContext &, TableFunctionInput &input, DataChunk &output) {

duckdb_data_chunk chunk = reinterpret_cast<duckdb_data_chunk>(&output);
duckdb_vx_error error_out = nullptr;
bind.info.vtab.function(ffi_global, ffi_local, chunk, &error_out);

void *const task_ptr = bind.info.vtab.function(ffi_global, ffi_local, chunk, &error_out);
if (error_out) {
throw InvalidInputException(IntoErrString(error_out));
}
if (!task_ptr) { // Chunk was exported
return;
}

// We're blocked on IO
vector<unique_ptr<AsyncTask>> tasks(1);
tasks[0] = make_uniq<VortexWaitTask>(task_ptr);
input.async_result = AsyncResult(std::move(tasks));
}

void c_pushdown_complex_filter(ClientContext &,
Expand Down
2 changes: 2 additions & 0 deletions vortex-duckdb/include/vortex.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const char *vortex_version_rust(void);
*/
const char *vortex_extension_version_rust(void);

void vortex_wait_and_fetch(void *self_ptr);

#ifdef __cplusplus
}
#endif
Expand Down
44 changes: 39 additions & 5 deletions vortex-duckdb/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
//! pushdown, cardinality, and partitioning.

use std::cmp::max;
use std::ffi::c_void;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -47,6 +49,7 @@ use vortex::file::v2::FileStatsLayoutReader;
use vortex::io::kanal_ext::KanalExt;
use vortex::io::runtime::BlockingRuntime;
use vortex::io::runtime::current::ThreadSafeIterator;
use vortex::io::runtime::current::TryRecv;
use vortex::layout::layouts::row_idx::row_idx;
use vortex::layout::scan::multi::MultiLayoutChild;
use vortex::layout::scan::multi::MultiLayoutDataSource;
Expand Down Expand Up @@ -78,6 +81,7 @@ use crate::duckdb::DuckdbStringMapRef;
use crate::duckdb::ExpressionRef;
use crate::duckdb::LogicalType;
use crate::duckdb::PartitionData;
use crate::duckdb::ScanResult;
use crate::duckdb::TableFilterClass;
use crate::duckdb::TableFilterSetRef;
use crate::duckdb::TableFunction;
Expand Down Expand Up @@ -158,7 +162,8 @@ impl Debug for DataSourceBindData {
}
}

type DataSourceIterator = ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>;
type DataSourceItem = VortexResult<(ArrayRef, Arc<ConversionCache>)>;
type DataSourceIterator = ThreadSafeIterator<DataSourceItem>;

/// Global scan state for driving a `DataSource` scan through DuckDB.
pub struct DataSourceGlobal {
Expand All @@ -170,12 +175,28 @@ pub struct DataSourceGlobal {
file_row_number_column_pos: Option<usize>,
}

struct WaitCtx {
results_rx: kanal::AsyncReceiver<DataSourceItem>,
pending: Arc<Mutex<Option<DataSourceItem>>>,
}

#[unsafe(no_mangle)]
pub unsafe extern "C-unwind" fn vortex_wait_and_fetch(self_ptr: *mut c_void) {
let ctx = unsafe { Box::from_raw(self_ptr.cast::<WaitCtx>()) };
RUNTIME.block_on(async {
if let Ok(item) = ctx.results_rx.recv().await {
*ctx.pending.lock().unwrap() = Some(item);
}
});
}

/// Per-thread local scan state.
pub struct DataSourceLocal {
iterator: DataSourceIterator,
exporter: Option<ArrayExporter>,
partition_index: u64,
file_index: usize,
pending: Arc<Mutex<Option<DataSourceItem>>>,
}

/// Returns scan progress as a percentage (0.0–100.0).
Expand Down Expand Up @@ -456,19 +477,32 @@ impl<T: DataSourceTableFunction> TableFunction for T {
exporter: None,
partition_index: 0,
file_index: 0,
pending: Arc::new(Mutex::new(None)),
}
}

fn scan(
local_state: &mut Self::LocalState,
global_state: &Self::GlobalState,
chunk: &mut DataChunkRef,
) -> VortexResult<()> {
) -> VortexResult<ScanResult> {
loop {
if local_state.exporter.is_none() {
let mut ctx = SESSION.create_execution_ctx();
let Some(result) = local_state.iterator.next() else {
return Ok(());
let result = if let Some(item) = local_state.pending.lock().unwrap().take() {
item
} else {
match local_state.iterator.try_recv() {
TryRecv::Item(item) => item,
TryRecv::Empty => {
let wait = Box::new(WaitCtx {
results_rx: local_state.iterator.receiver(),
pending: Arc::clone(&local_state.pending),
});
return Ok(ScanResult::Blocked(Box::into_raw(wait).cast()));
}
TryRecv::Closed => return Ok(ScanResult::Exported),
}
};
let (array_result, conversion_cache) = result?;
let array_result = array_result.optimize_recursive(ctx.session())?;
Expand Down Expand Up @@ -530,7 +564,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
.reference_value(&Value::from(local_state.file_index as u64));
}

Ok(())
Ok(ScanResult::Exported)
}

fn table_scan_progress(global_state: &Self::GlobalState) -> f64 {
Expand Down
29 changes: 17 additions & 12 deletions vortex-duckdb/src/duckdb/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ pub struct PartitionData {
pub file_index: usize,
}

pub enum ScanResult {
Exported,
Blocked(*mut c_void),
}
unsafe impl Send for ScanResult {}

#[derive(Debug, Default)]
pub struct ColumnStatistics {
pub min: Option<Value>,
Expand Down Expand Up @@ -81,12 +87,12 @@ pub trait TableFunction: Sized + Debug {
/// registered as a VIEW.
fn statistics(bind_data: &Self::BindData, column_index: usize) -> Option<ColumnStatistics>;

/// The function is called during query execution and is responsible for producing the output
/// The function is called during query execution and is responsible for producing the output.
fn scan(
init_local: &mut Self::LocalState,
init_global: &Self::GlobalState,
chunk: &mut DataChunkRef,
) -> VortexResult<()>;
) -> VortexResult<ScanResult>;

/// Initialize the global operator state of the function.
///
Expand Down Expand Up @@ -241,23 +247,22 @@ unsafe extern "C-unwind" fn function<T: TableFunction>(
local_init_data: *mut c_void,
output: cpp::duckdb_data_chunk,
error_out: *mut cpp::duckdb_vx_error,
) {
) -> *mut c_void {
let global_init_data = unsafe { global_init_data.cast::<T::GlobalState>().as_ref() }
.vortex_expect("global_init_data null pointer");
let local_init_data = unsafe { local_init_data.cast::<T::LocalState>().as_mut() }
.vortex_expect("local_init_data null pointer");
let data_chunk = unsafe { DataChunk::borrow_mut(output) };

match T::scan(local_init_data, global_init_data, data_chunk) {
Ok(()) => {
// The data chunk is already filled by the function.
// No need to do anything here.
Ok(ScanResult::Exported) => ptr::null_mut(),
Ok(ScanResult::Blocked(task_ptr)) => task_ptr,
Err(e) => {
let msg = e.to_string();
unsafe {
error_out.write(cpp::duckdb_vx_error_create(msg.as_ptr().cast(), msg.len()));
}
ptr::null_mut()
}
Err(e) => unsafe {
error_out.write(cpp::duckdb_vx_error_create(
e.to_string().as_ptr().cast(),
e.to_string().len(),
));
},
}
}
20 changes: 20 additions & 0 deletions vortex-io/src/runtime/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,26 @@ pub struct ThreadSafeIterator<T> {
results: kanal::AsyncReceiver<T>,
}

pub enum TryRecv<T> {
Item(T),
Empty,
Closed,
}

impl<T> ThreadSafeIterator<T> {
pub fn receiver(&self) -> kanal::AsyncReceiver<T> {
self.results.clone()
}

pub fn try_recv(&self) -> TryRecv<T> {
match self.results.try_recv() {
Ok(Some(v)) => TryRecv::Item(v),
Ok(None) => TryRecv::Empty,
Err(_) => TryRecv::Closed,
}
}
}

// Manual clone implementation since `T` does not need to be `Clone`.
impl<T> Clone for ThreadSafeIterator<T> {
fn clone(&self) -> Self {
Expand Down
Loading