Skip to content

Commit 35f0667

Browse files
Support varbinview buffer passing to_duckdb (#2791)
1 parent 44da00f commit 35f0667

File tree

10 files changed

+249
-16
lines changed

10 files changed

+249
-16
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

duckdb-vortex/duckdb

duckdb-vortex/duckdb-rs

duckdb-vortex/src/include/vortex_extension.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class VortexExtension : public Extension {
88
public:
99
void Load(DuckDB &db) override;
1010
std::string Name() override;
11-
std::string Version() const override;
11+
std::string Version() const override;
1212
};
1313

1414
} // namespace duckdb

duckdb-vortex/src/vortex_extension.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,8 @@
1111

1212
#include "vortex.h"
1313

14-
#include <expr/expr.hpp>
15-
16-
extern "C" {
17-
const char *vortex_duckdb_hello();
18-
}
14+
#include "expr/expr.hpp"
15+
#include "rust_vector_buffer.hpp"
1916

2017
#ifndef DUCKDB_EXTENSION_MAIN
2118
#error DUCKDB_EXTENSION_MAIN not defined

vortex-duckdb/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ crate-type = ["staticlib", "cdylib", "rlib"]
2424
arrow-array = { workspace = true }
2525
itertools = { workspace = true }
2626
vortex-array = { workspace = true }
27+
vortex-buffer = { workspace = true }
2728
vortex-dict = { workspace = true }
2829
vortex-dtype = { workspace = true }
2930
vortex-error = { workspace = true }

vortex-duckdb/src/buffer/mod.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use std::os::raw::c_void;
2+
3+
use duckdb::ffi::duckdb_vector;
4+
use vortex_buffer::ByteBuffer;
5+
6+
#[derive(Clone)]
7+
#[repr(C)]
8+
pub struct FFIDuckDBBufferInternal {
9+
pub inner: Box<ByteBuffer>,
10+
}
11+
12+
// This CANNOT be copied or cloned since the void* inner is actually an Arc, and the ref counter
13+
// will be incorrect.
14+
#[repr(C)]
15+
pub struct ExternalBuffer {
16+
pub inner: *mut c_void,
17+
}
18+
19+
impl From<FFIDuckDBBufferInternal> for ExternalBuffer {
20+
fn from(buffer: FFIDuckDBBufferInternal) -> Self {
21+
let ptr = Box::into_raw(buffer.inner) as *mut c_void;
22+
ExternalBuffer { inner: ptr }
23+
}
24+
}
25+
26+
impl From<ExternalBuffer> for FFIDuckDBBufferInternal {
27+
fn from(buffer: ExternalBuffer) -> Self {
28+
let inner: Box<ByteBuffer> = unsafe { Box::from_raw(buffer.inner.cast()) };
29+
FFIDuckDBBufferInternal { inner }
30+
}
31+
}
32+
33+
// This will free a single FFIDuckDBBuffer, however due to cloning there might be more
34+
// references to the underlying ByteBuffer that will not be freed in this call.
35+
#[unsafe(no_mangle)]
36+
pub unsafe extern "C" fn ExternalBuffer_free(buffer: *mut ExternalBuffer) {
37+
let internal: Box<FFIDuckDBBufferInternal> = unsafe { Box::from_raw(buffer.cast()) };
38+
drop(internal)
39+
}
40+
41+
#[repr(C)]
42+
#[allow(dead_code)]
43+
pub struct CppVectorBuffer {
44+
pub ptr: *mut c_void,
45+
}
46+
47+
#[allow(dead_code)]
48+
unsafe extern "C" {
49+
pub fn NewCppVectorBuffer(
50+
buffer: *mut ExternalBuffer,
51+
free: unsafe extern "C" fn(*mut ExternalBuffer),
52+
) -> *mut CppVectorBuffer;
53+
54+
pub fn AssignBufferToVec(vector: duckdb_vector, buffer: *mut CppVectorBuffer);
55+
}
56+
57+
pub unsafe fn new_cpp_vector_buffer(buffer: *mut ExternalBuffer) -> *mut CppVectorBuffer {
58+
unsafe { NewCppVectorBuffer(buffer, ExternalBuffer_free) }
59+
}
60+
61+
#[cfg(test)]
62+
mod tests {
63+
64+
use vortex_buffer::ByteBuffer;
65+
66+
use crate::buffer::{ExternalBuffer, FFIDuckDBBufferInternal};
67+
68+
#[test]
69+
fn test_buff_drop() {
70+
let buffer = FFIDuckDBBufferInternal {
71+
inner: Box::new(ByteBuffer::from(vec![1, 2, 3])),
72+
};
73+
74+
assert!(buffer.inner.inner().is_unique());
75+
76+
let buffer_er: ExternalBuffer = buffer.clone().into();
77+
let buffer_back: FFIDuckDBBufferInternal = buffer_er.into();
78+
79+
assert!(!buffer_back.inner.inner().is_unique());
80+
assert!(!buffer.inner.inner().is_unique());
81+
}
82+
}

vortex-duckdb/src/convert/array/mod.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
mod data_chunk_adaptor;
2+
mod varbinview;
23

34
use arrow_array::ArrayRef as ArrowArrayRef;
45
use duckdb::core::{DataChunkHandle, SelectionVector};
56
use duckdb::vtab::arrow::{
67
WritableVector, flat_vector_to_arrow_array, write_arrow_array_to_vector,
78
};
8-
use vortex_array::arrays::StructArray;
9+
use vortex_array::arrays::{
10+
ChunkedArray, ChunkedEncoding, StructArray, VarBinViewArray, VarBinViewEncoding,
11+
};
912
use vortex_array::arrow::FromArrowArray;
1013
use vortex_array::compute::{take, to_arrow_preferred, try_cast};
1114
use vortex_array::validity::Validity;
@@ -32,6 +35,18 @@ pub fn to_duckdb(array: ArrayRef, chunk: &mut dyn WritableVector) -> VortexResul
3235
let value = constant.to_duckdb_scalar();
3336
chunk.flat_vector().assign_to_constant(&value);
3437
Ok(())
38+
} else if array.is_encoding(ChunkedEncoding.id()) {
39+
array
40+
.as_any()
41+
.downcast_ref::<ChunkedArray>()
42+
.vortex_expect("chunk checked")
43+
.to_duckdb(chunk)
44+
} else if array.is_encoding(VarBinViewEncoding.id()) {
45+
array
46+
.as_any()
47+
.downcast_ref::<VarBinViewArray>()
48+
.vortex_expect("varbinview id checked")
49+
.to_duckdb(chunk)
3550
} else if array.is_encoding(DictEncoding.id()) {
3651
array
3752
.as_any()
@@ -43,6 +58,17 @@ pub fn to_duckdb(array: ArrayRef, chunk: &mut dyn WritableVector) -> VortexResul
4358
}
4459
}
4560

61+
impl ToDuckDB for ChunkedArray {
62+
fn to_duckdb(&self, chunk: &mut dyn WritableVector) -> VortexResult<()> {
63+
// TODO(joe): support multi-chunk arrays without canonical.
64+
if self.chunks().len() > 1 {
65+
to_arrow_preferred(self)?.to_duckdb(chunk)
66+
} else {
67+
to_duckdb(self.chunks()[0].clone(), chunk)
68+
}
69+
}
70+
}
71+
4672
impl ToDuckDB for DictArray {
4773
fn to_duckdb(&self, chunk: &mut dyn WritableVector) -> VortexResult<()> {
4874
// If the values fit into a single vector, we can efficiently delay the take operation.
@@ -133,7 +159,6 @@ impl FromDuckDB<SizedFlatVector> for ArrayRef {
133159

134160
#[cfg(test)]
135161
mod tests {
136-
137162
use duckdb::core::{DataChunkHandle, LogicalTypeHandle, LogicalTypeId};
138163
use vortex_array::arrays::{
139164
BoolArray, ConstantArray, PrimitiveArray, StructArray, VarBinArray,
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use std::ffi::c_char;
2+
3+
use duckdb::vtab::arrow::WritableVector;
4+
use itertools::Itertools;
5+
use vortex_array::arrays::{BinaryView, Inlined, VarBinViewArray};
6+
use vortex_buffer::ByteBuffer;
7+
use vortex_error::VortexResult;
8+
9+
use crate::ToDuckDB;
10+
use crate::buffer::{
11+
AssignBufferToVec, ExternalBuffer, FFIDuckDBBufferInternal, new_cpp_vector_buffer,
12+
};
13+
14+
// This is the C++ string view struct
15+
// private:
16+
// union {
17+
// struct {
18+
// uint32_t length;
19+
// char prefix[4];
20+
// char *ptr;
21+
// } pointer;
22+
// struct {
23+
// uint32_t length;
24+
// char inlined[12];
25+
// } inlined;
26+
// } value;
27+
// };
28+
29+
#[derive(Clone, Copy)]
30+
#[repr(C, align(16))]
31+
// See `BinaryView`
32+
pub union PtrBinaryView {
33+
// Numeric representation. This is logically `u128`, but we split it into the high and low
34+
// bits to preserve the alignment.
35+
le_bytes: [u8; 16],
36+
37+
// Inlined representation: strings <= 12 bytes
38+
inlined: Inlined,
39+
40+
// Reference type: strings > 12 bytes.
41+
_ref: PtrRef,
42+
}
43+
44+
#[derive(Clone, Copy, Debug)]
45+
#[repr(C, align(8))]
46+
pub struct PtrRef {
47+
size: u32,
48+
prefix: [u8; 4],
49+
ptr: *const c_char,
50+
}
51+
52+
fn binary_view_to_ptr_binary_view<'a>(
53+
view: impl Iterator<Item = &'a BinaryView>,
54+
buffers: &[ByteBuffer],
55+
used_buffers: &mut [bool],
56+
) -> Vec<PtrBinaryView> {
57+
view.map(|v| {
58+
if v.is_inlined() {
59+
PtrBinaryView {
60+
inlined: *v.as_inlined(),
61+
}
62+
} else {
63+
let view = v.as_view();
64+
used_buffers[view.buffer_index() as usize] = true;
65+
PtrBinaryView {
66+
_ref: PtrRef {
67+
size: v.len(),
68+
prefix: *view.prefix(),
69+
// TODO(joe) verify this.
70+
ptr: unsafe {
71+
buffers[view.buffer_index() as usize]
72+
.as_ptr()
73+
.add(view.offset() as usize)
74+
.cast()
75+
},
76+
},
77+
}
78+
}
79+
})
80+
.collect_vec()
81+
}
82+
83+
impl ToDuckDB for VarBinViewArray {
84+
fn to_duckdb(&self, chunk: &mut dyn WritableVector) -> VortexResult<()> {
85+
let buffers = self.buffers();
86+
let mut buffer_used = vec![false; buffers.len()];
87+
88+
let views: Vec<PtrBinaryView> = binary_view_to_ptr_binary_view(
89+
self.views().iter(),
90+
buffers,
91+
buffer_used.as_mut_slice(),
92+
);
93+
94+
let vec = chunk.flat_vector();
95+
buffers
96+
.iter()
97+
.enumerate()
98+
.filter(|&(idx, _buf)| buffer_used[idx])
99+
.map(|(_idx, buf)| buf.clone())
100+
.for_each(|b| {
101+
// Each buffer is wrapped with a C++ VectorBuffer wrapper which will
102+
// in turn call `FFIDuckDBBuffer_free` when it is cleaned up in C++ land.
103+
// Once all ptrs to the bytes are free the bytes can be freed.
104+
let buffer: *mut ExternalBuffer = Box::into_raw(Box::new(
105+
FFIDuckDBBufferInternal { inner: Box::new(b) }.into(),
106+
));
107+
let extern_buf = unsafe { new_cpp_vector_buffer(buffer) };
108+
// Adds an extra ref to the buffer which will outlive the `views`
109+
unsafe { AssignBufferToVec(vec.unowned_ptr(), extern_buf) };
110+
});
111+
112+
chunk.flat_vector().copy(views.as_slice());
113+
114+
Ok(())
115+
}
116+
}

vortex-duckdb/src/lib.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,28 @@
33

44
/// This is the default chunk size for duckdb.
55
/// It is best to return data chunks of this size to duckdb.
6+
/// 2048 is the default chunk size for duckdb.
67
pub const DUCKDB_STANDARD_VECTOR_SIZE: usize = 2048;
78

9+
mod buffer;
810
mod convert;
911

10-
use std::ffi::c_char;
11-
1212
pub use convert::{FromDuckDB, FromDuckDBType, ToDuckDB, ToDuckDBType, to_duckdb_chunk};
1313

14-
// To generate C decls to include in vortex_duckdb_extension.cpp,
14+
// Note: To generate C decls to include in vortex_duckdb_extension.cpp,
1515
// call `cbindgen` from `vortex/vortex-duckdb`.
16-
#[unsafe(no_mangle)]
17-
pub unsafe extern "C" fn vortex_duckdb_hello() -> *const c_char {
18-
c"Hello, world from Rust! ".as_ptr()
16+
17+
#[cfg(test)]
18+
mod tests {
19+
use duckdb::ffi::duckdb_vector_size;
20+
21+
use crate::DUCKDB_STANDARD_VECTOR_SIZE;
22+
23+
#[test]
24+
fn assert_duckdb_vector_size_matches() {
25+
assert_eq!(
26+
Ok(DUCKDB_STANDARD_VECTOR_SIZE),
27+
usize::try_from(unsafe { duckdb_vector_size() })
28+
);
29+
}
1930
}

0 commit comments

Comments
 (0)