Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ jobs:

- name: just check
run: just check

- name: upload docs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: docs
path: target/doc
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ wit-bindgen = "0.44"
[workspace.lints.rust]
missing_copy_implementations = "deny"
missing_debug_implementations = "deny"
missing_docs = "deny"
rust_2018_idioms = { level = "deny", priority = -1 }
unexpected_cfgs = { level = "deny", check-cfg = ['cfg(tokio_unstable)'] }
unexpected_cfgs = "deny"
unreachable_pub = "deny"
unused_crate_dependencies = "deny"

Expand All @@ -48,7 +49,7 @@ undocumented_unsafe_blocks = "deny"
[workspace.lints.rustdoc]
bare_urls = "deny"
broken_intra_doc_links = "deny"
private_intra_doc_links = "allow"
private_intra_doc_links = "deny"

[patch.crates-io]
# use same DataFusion fork as InfluxDB
Expand Down
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ check-rust-test $RUST_BACKTRACE="1": check-rust-build
# build Rust docs
check-rust-doc:
@echo ::group::check-rust-doc
cargo doc --document-private-items --no-deps --all-features --workspace
cargo doc --document-private-items --all-features --workspace
@echo ::endgroup::

# dry-run Rust benchmarks
Expand Down
22 changes: 22 additions & 0 deletions arrow2bytes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
//! Convert [`arrow`] types to/from bytes.
//!
//! This uses the [Arrow IPC] schema.
//!
//!
//! [Arrow IPC]: https://arrow.apache.org/docs/format/IPC.html
use std::{io::Cursor, sync::Arc};

use arrow::{
Expand All @@ -12,6 +18,11 @@ use arrow::{
},
};

/// Convert an [`Array`] to bytes.
///
/// This is done by encoding writing this as a [`RecordBatch`] with a single [`Field`].
///
/// See [`bytes2array`] for the reverse method.
pub fn array2bytes(array: ArrayRef) -> Vec<u8> {
let buffer = Vec::new();

Expand All @@ -28,6 +39,9 @@ pub fn array2bytes(array: ArrayRef) -> Vec<u8> {
writer.into_inner().expect("writing to buffer never fails")
}

/// Decodes [`Array`] from bytes.
///
/// See [`array2bytes`] for the reverse method and the format description.
pub fn bytes2array(bytes: &[u8]) -> Result<ArrayRef, ArrowError> {
let bytes = Cursor::new(bytes);
let mut reader = StreamReader::try_new(bytes, None)?;
Expand All @@ -48,12 +62,20 @@ pub fn bytes2array(bytes: &[u8]) -> Result<ArrayRef, ArrowError> {
Ok(array)
}

/// Encodes [`DataType`] as bytes.
///
/// This is done by embedding the [`DataType`] into a [`Schema`] with a single [`Field`].
///
/// See [`bytes2datatype`] for the reverse method.
pub fn datatype2bytes(dt: DataType) -> Vec<u8> {
let schema = Schema::new(vec![Field::new("a", dt, false)]);
let fb = IpcSchemaEncoder::new().schema_to_fb(&schema);
fb.finished_data().to_owned()
}

/// Decodes [`DataType`] from bytes.
///
/// See [`datatype2bytes`] for the reverse method and format description.
pub fn bytes2datatype(bytes: &[u8]) -> Result<DataType, ArrowError> {
let ipc_schema =
root_as_schema(bytes).map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?;
Expand Down
3 changes: 3 additions & 0 deletions arrow2bytes/tests/array.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Docs are not strictly required for tests.
#![expect(missing_docs)]

use std::sync::Arc;

use arrow::{
Expand Down
3 changes: 3 additions & 0 deletions arrow2bytes/tests/datatype.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Docs are not strictly required for tests.
#![expect(missing_docs)]

use std::sync::Arc;

use arrow::{
Expand Down
13 changes: 13 additions & 0 deletions guests/python/build.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
//! Build script for [CPython]+[`pyo3`]-based UDFs.
//!
//! This ensures two things:
//! - **linking:** Set up correct linker arguments. This only happens when the `WASI_SDK_LINK_PATH` environment
//! variable is set. Otherwise we assume that this is NOT a WASM build (e.g. during an ordinary `cargo check`) and
//! that [`pyo3`] manages the linking itself.
//! - **root file system:** If the `PYO3_CROSS_LIB_DIR` environment variable is set, we assume that we must package
//! the [Python Standard Library].
//!
//!
//! [CPython]: https://www.python.org/
//! [Python Standard Library]: https://docs.python.org/3/library/index.html
//! [`pyo3`]: https://pyo3.rs/
use std::{fs::File, path::PathBuf};

fn main() {
Expand Down
10 changes: 9 additions & 1 deletion guests/python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! [CPython]+[`pyo3`]-based UDFs.
//!
//!
//! [CPython]: https://www.python.org/
//! [`pyo3`]: https://pyo3.rs/
use std::sync::Arc;

use arrow::datatypes::DataType;
Expand Down Expand Up @@ -72,9 +77,12 @@ impl ScalarUDFImpl for Test {
}
}

#[allow(clippy::allow_attributes, clippy::const_is_empty)]
fn root() -> Option<Vec<u8>> {
// The build script will ALWAYS set this environment variable, but if we don't bundle the standard lib the file
// will simply be empty.
const ROOT_TAR: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/python-lib.tar"));
Some(ROOT_TAR.to_vec())
(!ROOT_TAR.is_empty()).then(|| ROOT_TAR.to_vec())
}

fn udfs(_source: String) -> DataFusionResult<Vec<Arc<dyn ScalarUDFImpl>>> {
Expand Down
2 changes: 2 additions & 0 deletions guests/rust/examples/add_one.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Example Scalar UDF that just implements "add one".

// unused-crate-dependencies false positives
#![expect(unused_crate_dependencies)]

Expand Down
5 changes: 5 additions & 0 deletions guests/rust/src/bindings.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! Auto-generated bindings based on WIT.

// bindgen always generates a few undocumented items
#![expect(missing_docs)]

use wit_bindgen::generate;

generate!({
Expand Down
1 change: 1 addition & 0 deletions guests/rust/src/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Conversion routes from/to [WIT types](crate::bindings).
use arrow::{
array::ArrayRef,
datatypes::{DataType, Field, FieldRef},
Expand Down
50 changes: 49 additions & 1 deletion guests/rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,59 @@
//! Implements the Rust guest glue code for [DataFusion] UDFs.
//!
//!
//! [DataFusion]: https://datafusion.apache.org/
pub mod bindings;
pub mod conversion;
pub mod wrapper;

/// Export UDFs to WebAssembly.
///
/// # Example
/// Make sure your crate has the correct type:
///
/// ```toml
/// [lib]
/// crate-type = ["cdylib"]
/// ```
///
/// The implement two functions:
/// - **root file system:** Return the bytes of a [tar] file that contains the root filesystem for your WebAssembly
/// payload. The file-system will be provides as read-only.
/// - **scalar UDFs:** A method that takes a string -- which it may use it or not -- and returns a list of
/// [`ScalarUDFImpl`]s.
///
/// ```rust
/// # use std::sync::Arc;
/// #
/// # use datafusion_common::error::DataFusionError;
/// # use datafusion_expr::ScalarUDFImpl;
/// #
/// # use datafusion_udf_wasm_guest::export;
/// #
/// fn root() -> Option<Vec<u8>> {
/// // root file system is optional
/// None
/// }
///
/// fn udfs(source: String) -> Result<Vec<Arc<dyn ScalarUDFImpl>>, DataFusionError> {
/// // You may use the provided source code to generate UDFs on-the-fly.
/// todo!()
/// }
///
/// export! {
/// root_fs_tar: root,
/// scalar_udfs: udfs,
/// }
/// ```
///
///
/// [`ScalarUDFImpl`]: datafusion_expr::ScalarUDFImpl
/// [tar]: https://en.wikipedia.org/wiki/Tar_(computing)
#[macro_export]
macro_rules! export {
{
root_fs_tar: $root_fs_tar:ident,
scalar_udfs: $scalar_udfs:ident,
scalar_udfs: $scalar_udfs:ident$(,)?
} => {

#[derive(Debug)]
Expand Down Expand Up @@ -43,6 +90,7 @@ macro_rules! export {

// create dummy function for other targets to suppress "unused" warnings
#[cfg(not(target_os = "wasi"))]
#[doc(hidden)]
pub fn _export_stuff() -> impl $crate::bindings::exports::datafusion_udf_wasm::udf::types::Guest {
Implementation
}
Expand Down
9 changes: 9 additions & 0 deletions guests/rust/src/wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
//! Wrapper for [DataFusion] types so that the implement the respective [WIT interfaces](crate::bindings).
//!
//!
//! [DataFusion]: https://datafusion.apache.org/
use std::sync::Arc;

use crate::bindings::exports::datafusion_udf_wasm::udf::types as wit_types;
use arrow::datatypes::DataType;
use datafusion_expr::ScalarUDFImpl;

/// Wraps a [`ScalarUDFImpl`] so that it implements the [WIT definition].
///
///
/// [WIT definition]: wit_types::GuestScalarUdf
#[derive(Debug)]
pub struct ScalarUdfWrapper(Arc<dyn ScalarUDFImpl>);

impl ScalarUdfWrapper {
/// Create new wrapper from [`ScalarUDFImpl`].
pub fn new(udf: Arc<dyn ScalarUDFImpl>) -> Self {
Self(udf)
}
Expand Down
11 changes: 11 additions & 0 deletions host/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Host-code for WebAssembly-based [DataFusion] UDFs.
//!
//!
//! [DataFusion]: https://datafusion.apache.org/
use std::{any::Any, io::Cursor, ops::DerefMut, sync::Arc};

use arrow::datatypes::DataType;
Expand Down Expand Up @@ -71,6 +75,12 @@ pub struct WasmComponentPrecompiled {
}

impl WasmComponentPrecompiled {
/// Pre-compile WASM payload.
///
/// Accepts a WASM payload in [binary format].
///
///
/// [binary format]: https://webassembly.github.io/spec/core/binary/index.html
pub async fn new(wasm_binary: Arc<[u8]>) -> DataFusionResult<Self> {
tokio::task::spawn_blocking(move || {
let engine = Engine::new(
Expand Down Expand Up @@ -109,6 +119,7 @@ impl std::fmt::Debug for WasmComponentPrecompiled {
}
}

/// A [`ScalarUDFImpl`] that wraps a WebAssembly payload.
pub struct WasmScalarUdf {
store: Arc<Mutex<Store<WasmStateImpl>>>,
bindings: Arc<bindings::Datafusion>,
Expand Down
8 changes: 6 additions & 2 deletions host/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
// unused-crate-dependencies false positives
#![expect(unused_crate_dependencies)]
#![expect(
// Docs are not strictly required for tests.
missing_docs,
// unused-crate-dependencies false positives
unused_crate_dependencies,
)]

mod integration_tests;