diff --git a/Project.toml b/Project.toml index ae30467..f6eb74d 100644 --- a/Project.toml +++ b/Project.toml @@ -4,11 +4,13 @@ version = "0.5.3" [deps] Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" +FunctionWrappers = "069b7b12-0de2-55c6-9aab-29f3d0a68a2e" Preferences = "21216c6a-2e73-6563-6e65-726566657250" iceberg_rust_ffi_jll = "6bd5c94f-693c-53e3-983d-a09fad412f22" [compat] Arrow = "2.0" +FunctionWrappers = "1" Preferences = "1.3" iceberg_rust_ffi_jll = "0.7" julia = "1.10" diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 4cd95e9..5fbd003 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1523,7 +1523,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=5210ce1d93ff6cb70ef382b7573a9b9331ee4e52#5210ce1d93ff6cb70ef382b7573a9b9331ee4e52" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=34829106fbb6ef94785edd99f784ea767a4aae44#34829106fbb6ef94785edd99f784ea767a4aae44" dependencies = [ "anyhow", "apache-avro", @@ -1578,7 +1578,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=5210ce1d93ff6cb70ef382b7573a9b9331ee4e52#5210ce1d93ff6cb70ef382b7573a9b9331ee4e52" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=34829106fbb6ef94785edd99f784ea767a4aae44#34829106fbb6ef94785edd99f784ea767a4aae44" dependencies = [ "async-trait", "chrono", @@ -1597,11 +1597,12 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.7.0" +version = "0.7.1" dependencies = [ "anyhow", "arrow-array", "arrow-ipc", + "async-trait", "futures", "home", "iceberg", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index b6f9aa5..37776ea 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.7.0" +version = "0.7.1" edition = "2021" [lib] @@ -12,8 +12,8 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "5210ce1d93ff6cb70ef382b7573a9b9331ee4e52", features = ["storage-azdls"] } -iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "5210ce1d93ff6cb70ef382b7573a9b9331ee4e52" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44", features = ["storage-azdls"] } +iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" @@ -24,6 +24,7 @@ arrow-ipc = "57.1" tracing-subscriber = "0.3" tracing = "0.1" once_cell = "1.19" +async-trait = "0.1" # Pin home to 0.5.9 to support rustc < 1.88 # (0.5.11+ requires rustc 1.88+) home = "=0.5.9" diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 1d553e0..25ba077 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -1,10 +1,12 @@ /// Catalog support for iceberg_rust_ffi use crate::IcebergTable; use anyhow::Result; -use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent}; -use iceberg_catalog_rest::RestCatalogBuilder; +use async_trait::async_trait; +use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, NamespaceIdent, TableIdent}; +use iceberg_catalog_rest::{CustomAuthenticator, RestCatalog, RestCatalogBuilder}; use std::collections::HashMap; use std::ffi::{c_char, c_void}; +use std::sync::{Arc, Mutex}; // FFI exports use object_store_ffi::{ @@ -15,25 +17,189 @@ use object_store_ffi::{ use crate::util::{parse_c_string, parse_properties, parse_string_array}; use crate::PropertyEntry; +/// Callback function type for custom token authentication from FFI with token caching support +/// +/// The callback receives: +/// - auth_fn: opaque pointer to user context (e.g., Julia Ref to authenticator) +/// - token_data_ptr: output pointer where token string bytes pointer should be written +/// - token_len_ptr: output pointer where token string length should be written +/// - reuse_token_ptr: output pointer for reuse flag (1 = reuse previous token, 0 = new token) +/// +/// The callback is responsible for: +/// 1. Extracting the authenticator from auth_fn pointer +/// 2. Invoking the Julia authenticator function which returns Union{String, Nothing}: +/// - String: new token to cache and use +/// - nothing: signal to reuse the previously cached token +/// 3. Setting reuse_token_ptr to indicate the result: +/// - Write 1 if Julia returned nothing (reuse the previous token) +/// - Write 0 if Julia returned a String (new token data provided) +/// 4. If reuse flag is 0: write token_data_ptr (pointer to bytes) and token_len_ptr (length) +/// Note: Julia is responsible for keeping the data valid for the duration of the call. +/// Rust will copy the data immediately. +/// +/// Returns: +/// - 0 for success +/// - non-zero for error +pub type CustomAuthenticatorCallback = unsafe extern "C" fn( + auth_fn: *mut c_void, + token_data_ptr: *mut *mut c_char, + token_len_ptr: *mut usize, + reuse_token_ptr: *mut i32, +) -> i32; + +/// Rust implementation of CustomAuthenticator that calls a C callback with auth_fn pointer +/// Supports token caching to avoid unnecessary copying when Julia returns the same token +#[derive(Clone, Debug)] +struct FFITokenAuthenticator { + callback: CustomAuthenticatorCallback, + auth_fn: *mut c_void, + // Cached token to avoid copying when Julia signals to reuse + cached_token: Arc>>, +} + +// SAFETY: We trust that the Julia callback is thread-safe. +// The auth_fn pointer is opaque and its thread-safety is the caller's responsibility. +unsafe impl Send for FFITokenAuthenticator {} +unsafe impl Sync for FFITokenAuthenticator {} + +#[async_trait] +impl CustomAuthenticator for FFITokenAuthenticator { + async fn get_token(&self) -> iceberg::Result { + let mut token_data: *mut c_char = std::ptr::null_mut(); + let mut token_len: usize = 0; + let mut reuse_token_box = Box::new(0i32); + let reuse_token_ptr = reuse_token_box.as_mut() as *mut i32; + + let result = unsafe { + (self.callback)( + self.auth_fn, + &mut token_data, + &mut token_len, + reuse_token_ptr, + ) + }; + + if result != 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Token authenticator callback failed", + )); + } + + // If Julia signals to reuse the previous token, return it + if *reuse_token_box != 0 { + let cached = self.cached_token.lock().unwrap(); + if let Some(token) = cached.as_ref() { + return Ok(token.clone()); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Token authenticator requested to reuse previous token, but no cached token exists", + )); + } + } + + // Otherwise, handle the new token from Julia + if token_data.is_null() || token_len == 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Token authenticator returned null pointer or zero length", + )); + } + + // SAFETY: The callback is responsible for ensuring token_data points to valid + // UTF-8 bytes with length token_len. Rust will copy the data immediately. + let token = unsafe { + let slice = std::slice::from_raw_parts(token_data as *const u8, token_len); + String::from_utf8(slice.to_vec()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid UTF-8 in token: {}", e), + ) + })? + }; + + // Cache the new token for potential reuse + *self.cached_token.lock().unwrap() = Some(token.clone()); + + Ok(token) + } +} + /// Opaque catalog handle for FFI -#[repr(C)] +/// Stores a RestCatalog instance wrapped in a Box for safe memory management. +/// Also stores the authenticator to allow setting it before catalog creation. pub struct IcebergCatalog { - catalog: Box, + catalog: Option>, + /// Stores a pending authenticator to be applied before first use + authenticator: Option>, +} + +// SAFETY: Send and Sync are safe because: +// 1. The RestCatalog is accessed only through this struct via exclusive ownership (Box) +// 2. We enforce exclusive mutable access for operations that mutate (set_token_authenticator) +// 3. The struct represents unshared ownership across FFI boundary +// 4. The struct is manually freed via iceberg_catalog_free(), not via Drop +unsafe impl Send for IcebergCatalog {} +unsafe impl Sync for IcebergCatalog {} + +impl Default for IcebergCatalog { + fn default() -> Self { + IcebergCatalog { + catalog: None, + authenticator: None, + } + } } impl IcebergCatalog { - /// Create a new REST catalog - pub async fn create_rest(uri: String, props: HashMap) -> Result { + /// Create and initialize a REST catalog with optional authenticator + pub async fn create_rest( + mut self, + uri: String, + props: HashMap, + ) -> Result { let mut catalog_props = props; catalog_props.insert("uri".to_string(), uri); - let catalog = RestCatalogBuilder::default() - .load("rest", catalog_props) - .await?; + // Apply authenticator to builder if set + let mut builder = RestCatalogBuilder::default(); + if let Some(ref authenticator) = self.authenticator { + builder = builder.with_token_authenticator(authenticator.clone()); + } + + let catalog = builder.load("rest", catalog_props).await?; + self.catalog = Some(Box::new(catalog)); + + Ok(self) + } + + /// Set a custom token authenticator before catalog creation + pub fn set_token_authenticator( + &mut self, + callback: CustomAuthenticatorCallback, + auth_fn: *mut c_void, + ) -> Result<()> { + let authenticator = Arc::new(FFITokenAuthenticator { + callback, + auth_fn, + cached_token: Arc::new(Mutex::new(None)), + }); + + // Store the authenticator to be used when building the catalog + self.authenticator = Some(authenticator); + + Ok(()) + } - Ok(IcebergCatalog { - catalog: Box::new(catalog), - }) + /// Get a reference to the underlying RestCatalog. + /// + /// Returns a reference to the catalog. + /// Panics if the catalog has not been initialized via create_rest. + fn as_ref(&self) -> &RestCatalog { + self.catalog + .as_ref() + .expect("catalog should be initialized") } /// Load a table by namespace and name @@ -44,7 +210,7 @@ impl IcebergCatalog { ) -> Result { let namespace = NamespaceIdent::from_vec(namespace_parts)?; let table_ident = TableIdent::new(namespace, table_name); - let table = self.catalog.load_table(&table_ident).await?; + let table = self.as_ref().load_table(&table_ident).await?; Ok(IcebergTable { table }) } @@ -52,7 +218,7 @@ impl IcebergCatalog { /// List tables in a namespace pub async fn list_tables(&self, namespace_parts: Vec) -> Result> { let namespace = NamespaceIdent::from_vec(namespace_parts)?; - let tables = self.catalog.list_tables(&namespace).await?; + let tables = self.as_ref().list_tables(&namespace).await?; Ok(tables.into_iter().map(|t| t.name().to_string()).collect()) } @@ -68,7 +234,7 @@ impl IcebergCatalog { None }; - let namespaces = self.catalog.list_namespaces(parent.as_ref()).await?; + let namespaces = self.as_ref().list_namespaces(parent.as_ref()).await?; Ok(namespaces .into_iter() @@ -84,7 +250,7 @@ impl IcebergCatalog { ) -> Result { let namespace = NamespaceIdent::from_vec(namespace_parts)?; let table_ident = TableIdent::new(namespace, table_name); - self.catalog + self.as_ref() .table_exists(&table_ident) .await .map_err(|e| anyhow::anyhow!(e)) @@ -94,7 +260,7 @@ impl IcebergCatalog { /// Response type for catalog operations that return a catalog #[repr(C)] pub struct IcebergCatalogResponse { - pub result: crate::CResult, + pub result: CResult, pub catalog: *mut IcebergCatalog, pub error_message: *mut c_char, pub context: *const crate::Context, @@ -105,7 +271,7 @@ unsafe impl Send for IcebergCatalogResponse {} impl crate::RawResponse for IcebergCatalogResponse { type Payload = IcebergCatalog; - fn result_mut(&mut self) -> &mut crate::CResult { + fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -131,7 +297,7 @@ impl crate::RawResponse for IcebergCatalogResponse { /// Response type for string list operations #[repr(C)] pub struct IcebergStringListResponse { - pub result: crate::CResult, + pub result: CResult, pub items: *mut *mut c_char, pub count: usize, pub error_message: *mut c_char, @@ -143,7 +309,7 @@ unsafe impl Send for IcebergStringListResponse {} impl crate::RawResponse for IcebergStringListResponse { type Payload = Vec; - fn result_mut(&mut self) -> &mut crate::CResult { + fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -181,7 +347,7 @@ impl crate::RawResponse for IcebergStringListResponse { /// Response type for boolean operations #[repr(C)] pub struct IcebergBoolResponse { - pub result: crate::CResult, + pub result: CResult, pub value: bool, pub error_message: *mut c_char, pub context: *const crate::Context, @@ -192,7 +358,7 @@ unsafe impl Send for IcebergBoolResponse {} impl crate::RawResponse for IcebergBoolResponse { type Payload = bool; - fn result_mut(&mut self) -> &mut crate::CResult { + fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -215,7 +381,7 @@ impl crate::RawResponse for IcebergBoolResponse { /// Response type for nested string list operations (for namespace lists) #[repr(C)] pub struct IcebergNestedStringListResponse { - pub result: crate::CResult, + pub result: CResult, pub outer_items: *mut *mut *mut c_char, pub outer_count: usize, pub inner_counts: *mut usize, @@ -228,7 +394,7 @@ unsafe impl Send for IcebergNestedStringListResponse {} impl crate::RawResponse for IcebergNestedStringListResponse { type Payload = Vec>; - fn result_mut(&mut self) -> &mut crate::CResult { + fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -295,20 +461,34 @@ pub extern "C" fn iceberg_catalog_free(catalog: *mut IcebergCatalog) { // FFI Export functions for catalog operations // These functions are exported to be called from Julia via the FFI -// Create a REST catalog +// Create an empty catalog (default constructor exposed as sync FFI function) +#[no_mangle] +pub extern "C" fn iceberg_catalog_init() -> *mut IcebergCatalog { + Box::into_raw(Box::new(IcebergCatalog::default())) +} + +// Create a REST catalog from an existing catalog pointer (which may have an authenticator already set) +// This function takes ownership of the catalog pointer and returns it in the response export_runtime_op!( iceberg_rest_catalog_create, IcebergCatalogResponse, || { + if catalog.is_null() { + return Err(anyhow::anyhow!("Null catalog pointer provided")); + } + // SAFETY: catalog was checked to be non-null above and came from FFI + let catalog = unsafe { Box::from_raw(catalog) }; let uri_str = parse_c_string(uri, "uri")?; let props = parse_properties(properties, properties_len)?; - Ok((uri_str, props)) + Ok::<(Box, String, HashMap), anyhow::Error>((catalog, uri_str, props)) }, result_tuple, async { - let (uri, props) = result_tuple; - IcebergCatalog::create_rest(uri, props).await + let (catalog, uri, props) = result_tuple; + // create_rest takes ownership and returns the catalog + catalog.create_rest(uri, props).await }, + catalog: *mut IcebergCatalog, uri: *const c_char, properties: *const PropertyEntry, properties_len: usize @@ -417,3 +597,28 @@ export_runtime_op!( namespace_parts_len: usize, table_name: *const c_char ); + +/// Set a custom token authenticator for the catalog +#[no_mangle] +pub extern "C" fn iceberg_catalog_set_token_authenticator( + catalog: *mut IcebergCatalog, + callback: CustomAuthenticatorCallback, + auth_fn: *mut c_void, +) -> CResult { + // Check for null catalog pointer + if catalog.is_null() { + return CResult::Error; + } + + // SAFETY: catalog was checked to be non-null above. + // The caller must ensure the catalog pointer remains valid for the duration of this call. + let result = unsafe { &mut *catalog }.set_token_authenticator(callback, auth_fn); + + match result { + Ok(()) => CResult::Ok, + Err(e) => { + eprintln!("Error setting token authenticator: {}", e); + CResult::Error + } + } +} diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index 77bd0fd..a9d9bf0 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -3,6 +3,7 @@ module RustyIceberg using Base: @kwdef, @lock using Base.Threads: Atomic using Arrow +using FunctionWrappers: FunctionWrapper using iceberg_rust_ffi_jll export Table, Scan, IncrementalScan, ArrowBatch, StaticConfig, ArrowStream diff --git a/src/catalog.jl b/src/catalog.jl index eb984ee..60d9be6 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -4,14 +4,79 @@ Catalog support for RustyIceberg.jl This module provides Julia wrappers for the REST catalog FFI functions. """ +# Token callback function that can be exported as C-callable with token caching support +# This is a static function (no closure) that takes auth_fn and extracts the authenticator +Base.@ccallable function iceberg_token_callback_impl( + auth_fn::Ptr{Cvoid}, + token_data_ptr::Ptr{Ptr{Cchar}}, + token_len_ptr::Ptr{Csize_t}, + reuse_token_ptr::Ptr{Cint} +)::Cint + try + # Check that output pointers are valid + if token_data_ptr == C_NULL || token_len_ptr == C_NULL || reuse_token_ptr == C_NULL + return Cint(1) # Error: invalid output pointers + end + + # Extract authenticator function from auth_fn pointer + auth_ref = unsafe_pointer_to_objref(auth_fn) + auth_fn_call = auth_ref[] + + # Call the authenticator to get the token + # The authenticator should return either: + # - A String with the token to use and cache + # - nothing to signal reuse of the previously cached token + token_result = auth_fn_call() + + # Handle nothing (signal to reuse cached token) + if token_result === nothing + unsafe_store!(reuse_token_ptr, Cint(1)) + return Cint(0) + end + + # Ensure we got a String + token_str = token_result::String + + # Signal that this is a new token (not reusing) + unsafe_store!(reuse_token_ptr, Cint(0)) + + # Get token bytes and write data pointer and length to output parameters + # The data will be kept alive by the token_str variable until the callback returns + token_bytes = Vector{UInt8}(token_str) + token_len = length(token_bytes) + token_data = pointer(token_bytes) + + # Write the pointer and length to the output parameters + unsafe_store!(token_data_ptr, token_data) + unsafe_store!(token_len_ptr, Csize_t(token_len)) + + return Cint(0) + catch + # Return error code on exception + return Cint(1) + end +end + """ Catalog -Opaque pointer type representing an Iceberg catalog handle from the Rust FFI layer. +Mutable struct holding an Iceberg catalog handle from the Rust FFI layer. + +The struct stores both the raw catalog pointer and optionally an authenticator function, +ensuring the authenticator stays alive while the catalog is in use. Create a catalog using `catalog_create_rest` and free it with `free_catalog` when done. """ -const Catalog = Ptr{Cvoid} +mutable struct Catalog + ptr::Ptr{Cvoid} + authenticator::Union{Nothing, Ref} +end + +# Constructor for simple catalogs without authenticator +Catalog(ptr::Ptr{Cvoid}) = Catalog(ptr, nothing) + +# Support conversion to Ptr for FFI calls +Base.unsafe_convert(::Type{Ptr{Cvoid}}, catalog::Catalog) = catalog.ptr """ CatalogResponse @@ -20,13 +85,13 @@ Response structure for catalog creation operations. # Fields - `result::Cint`: Result code from the operation (0 for success) -- `catalog::Catalog`: The created catalog handle +- `catalog::Ptr{Cvoid}`: The created catalog handle (raw pointer) - `error_message::Ptr{Cchar}`: Error message string if operation failed - `context::Ptr{Cvoid}`: Context pointer for operation cancellation """ mutable struct CatalogResponse result::Cint - catalog::Catalog + catalog::Ptr{Cvoid} error_message::Ptr{Cchar} context::Ptr{Cvoid} @@ -118,14 +183,107 @@ catalog = catalog_create_rest("http://polaris:8181") ``` """ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{String,String}()) + # Create an empty catalog (no authenticator) + catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid} + if catalog_ptr == C_NULL + throw(IcebergException("Failed to create empty catalog")) + end + + # Initialize the catalog with REST connection + # Convert properties dict to array of PropertyEntry structs + property_entries = [PropertyEntry(pointer(k), pointer(v)) for (k, v) in properties] + properties_len = length(property_entries) + response = CatalogResponse() + async_ccall(response, property_entries, properties) do handle + @ccall rust_lib.iceberg_rest_catalog_create( + catalog_ptr::Ptr{Cvoid}, + uri::Cstring, + (properties_len > 0 ? pointer(property_entries) : C_NULL)::Ptr{PropertyEntry}, + properties_len::Csize_t, + response::Ref{CatalogResponse}, + handle::Ptr{Cvoid} + )::Cint + end + + @throw_on_error(response, "catalog_create_rest", IcebergException) + + return Catalog(response.catalog, nothing) +end + +""" + catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog + +Create a REST catalog connection with custom token authentication and token caching support. + +# Arguments +- `authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}`: A callable that takes no arguments and returns either: + - A `String` containing the token: Rust will cache it and use it for authentication + - `nothing`: Signal to reuse the previously cached token (avoids malloc, memory copy, and free operations) + + **Token Caching Optimization**: Returning `nothing` allows efficient token reuse without unnecessary copying. + +- `uri::String`: URI of the Iceberg REST catalog server (e.g., "http://localhost:8181") +- `properties::Dict{String,String}`: Optional key-value properties for catalog configuration. + By default (empty dict), no additional properties are passed. + +# Returns +- A `Catalog` handle for use in other catalog operations + +# Example with Token Caching +```julia +using FunctionWrappers: FunctionWrapper + +function get_token() + # Check if we need a new token, or if we can reuse the cached one + if needs_refresh() + return fetch_new_token() # Returns String with token + else + return nothing # Signal to reuse cached token (efficient!) + end +end + +catalog = catalog_create_rest(FunctionWrapper{Union{String,Nothing},Tuple{}}(get_token), "http://polaris:8181") +``` +""" +function catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}()) + # Step 1: Create an empty catalog + catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid} + if catalog_ptr == C_NULL + throw(IcebergException("Failed to create empty catalog")) + end + + # Step 2: Wrap the authenticator in a Ref for stable memory address + authenticator_ref = Ref(authenticator) + + # Step 3: Create C callback using the static token_callback_impl function + c_callback = @cfunction(iceberg_token_callback_impl, Cint, (Ptr{Cvoid}, Ptr{Ptr{Cchar}}, Ptr{Csize_t}, Ptr{Cint})) + + # Step 4: Get auth_fn pointer to pass to the authenticator + auth_fn = pointer_from_objref(authenticator_ref) + + # Step 5: Set the authenticator on the empty catalog BEFORE initializing REST + result = @ccall rust_lib.iceberg_catalog_set_token_authenticator( + catalog_ptr::Ptr{Cvoid}, + c_callback::Ptr{Cvoid}, + auth_fn::Ptr{Cvoid} + )::Cint + + if result != 0 + throw(IcebergException("Failed to set token authenticator")) + end + + # Step 6: Initialize the catalog with REST connection # Convert properties dict to array of PropertyEntry structs property_entries = [PropertyEntry(pointer(k), pointer(v)) for (k, v) in properties] properties_len = length(property_entries) + response = CatalogResponse() + async_ccall(response, property_entries, properties) do handle @ccall rust_lib.iceberg_rest_catalog_create( + catalog_ptr::Ptr{Cvoid}, uri::Cstring, (properties_len > 0 ? pointer(property_entries) : C_NULL)::Ptr{PropertyEntry}, properties_len::Csize_t, @@ -136,7 +294,9 @@ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{S @throw_on_error(response, "catalog_create_rest", IcebergException) - return response.catalog + # Create a Catalog struct that holds the catalog pointer and keeps authenticator alive + catalog = Catalog(response.catalog, authenticator_ref) + return catalog end """ @@ -145,7 +305,7 @@ end Free the memory associated with a catalog. """ function free_catalog(catalog::Catalog) - @ccall rust_lib.iceberg_catalog_free(catalog::Catalog)::Cvoid + @ccall rust_lib.iceberg_catalog_free(catalog.ptr::Ptr{Cvoid})::Cvoid end """ @@ -175,7 +335,7 @@ function load_table(catalog::Catalog, namespace::Vector{String}, table_name::Str async_ccall(response, namespace, namespace_ptrs) do handle @ccall rust_lib.iceberg_catalog_load_table( - catalog::Catalog, + catalog.ptr::Ptr{Cvoid}, (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, namespace_len::Csize_t, table_name::Cstring, @@ -215,7 +375,7 @@ function list_tables(catalog::Catalog, namespace::Vector{String}) async_ccall(response, namespace, namespace_ptrs) do handle @ccall rust_lib.iceberg_catalog_list_tables( - catalog::Catalog, + catalog.ptr::Ptr{Cvoid}, (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, namespace_len::Csize_t, response::Ref{StringListResponse}, @@ -272,7 +432,7 @@ function list_namespaces(catalog::Catalog, parent::Vector{String}=String[]) async_ccall(response, parent, parent_ptrs) do handle @ccall rust_lib.iceberg_catalog_list_namespaces( - catalog::Catalog, + catalog.ptr::Ptr{Cvoid}, (parent_len > 0 ? pointer(parent_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, parent_len::Csize_t, response::Ref{NestedStringListResponse}, @@ -313,7 +473,7 @@ function table_exists(catalog::Catalog, namespace::Vector{String}, table_name::S async_ccall(response, namespace, namespace_ptrs) do handle @ccall rust_lib.iceberg_catalog_table_exists( - catalog::Catalog, + catalog.ptr::Ptr{Cvoid}, (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, namespace_len::Csize_t, table_name::Cstring, diff --git a/test/runtests.jl b/test/runtests.jl index c28e83a..d510c47 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,6 +2,7 @@ using Test using RustyIceberg using DataFrames using Arrow +using FunctionWrappers: FunctionWrapper @testset "RustyIceberg.jl" begin @testset "Runtime Initialization" begin @@ -595,7 +596,11 @@ end token_response = HTTP.post( token_endpoint; - headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], + headers=[ + "Authorization" => auth_header, + "Polaris-Realm" => realm, + "Content-Type" => "application/x-www-form-urlencoded" + ], body=body, status_exception=false ) @@ -643,9 +648,6 @@ end @test table in tpch_tables end println("✅ All expected TPCH tables found: $expected_tables") - - catch e - rethrow(e) finally # Clean up if catalog != C_NULL @@ -657,6 +659,168 @@ end println("✅ Catalog API with token authentication tests completed!") end +@testset "Catalog API with Custom Authenticator Function" begin + println("Testing catalog API with custom authenticator function...") + + using HTTP + using JSON + using Base64 + + # Token endpoint + token_endpoint = "http://localhost:8181/api/catalog/v1/oauth/tokens" + catalog_uri = "http://localhost:8181/api/catalog" + + # Client credentials + client_id = "root" + client_secret = "s3cr3t" + realm = "POLARIS" + + # Create a custom authenticator function that fetches tokens on demand + function authenticator() + # Track number of actual token fetches + fetch_count = Threads.Atomic{Int}(0) + # Cache the token - use Ref to store it safely + cached_token = Ref{Union{Nothing, String}}(nothing) + token_lock = Threads.ReentrantLock() + + function get_token_impl() + lock(token_lock) do + # Check if we have a cached token + if cached_token[] !== nothing + return cached_token[]::String + end + + # Fetch access token using client credentials + credentials = base64encode("$client_id:$client_secret") + auth_header = "Basic $credentials" + body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" + + token_response = HTTP.post( + token_endpoint; + headers=[ + "Authorization" => auth_header, + "Polaris-Realm" => realm, + "Content-Type" => "application/x-www-form-urlencoded" + ], + body=body, + status_exception=false + ) + + if token_response.status != 200 + error("Failed to fetch token: $(String(token_response.body))") + end + + token_data = JSON.parse(String(token_response.body)) + token = token_data["access_token"] + + # Increment fetch count ONLY when actually fetching from server + Threads.atomic_add!(fetch_count, 1) + + # Cache the token for future use + cached_token[] = token + + return token + end + end + + return get_token_impl, fetch_count, cached_token + end + + auth_fn, fetch_counter, cached_token_ref = authenticator() + + catalog = C_NULL + try + # Test catalog creation with custom authenticator function + println("Creating catalog with custom authenticator function...") + props = Dict( + "warehouse" => "warehouse" + ) + catalog = RustyIceberg.catalog_create_rest(FunctionWrapper{Union{String,Nothing},Tuple{}}(auth_fn), catalog_uri; properties=props) + @test catalog != C_NULL + println("✅ Catalog created successfully with custom authenticator function") + + # Test listing namespaces to verify authentication works + println("Listing namespaces with custom authenticator...") + root_namespaces = RustyIceberg.list_namespaces(catalog) + @test isa(root_namespaces, Vector{Vector{String}}) + @test length(root_namespaces) >= 2 + println("✅ Namespaces listed: $root_namespaces") + + # Verify expected namespaces exist + @test ["tpch.sf01"] in root_namespaces + println("✅ Expected namespace 'tpch.sf01' verified") + + # Test listing tables in tpch.sf01 + println("Listing tables in tpch.sf01...") + tpch_tables = RustyIceberg.list_tables(catalog, ["tpch.sf01"]) + @test isa(tpch_tables, Vector{String}) + @test length(tpch_tables) > 0 + println("✅ Tables in tpch.sf01: $tpch_tables") + + # Verify expected TPCH tables exist + expected_tables = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"] + for table in expected_tables + @test table in tpch_tables + end + println("✅ All expected TPCH tables found: $expected_tables") + + # Test table existence check + println("Verifying table existence for TPCH tables...") + for table in expected_tables + exists = RustyIceberg.table_exists(catalog, ["tpch.sf01"], table) + @test exists == true + end + println("✅ All TPCH tables verified to exist: $expected_tables") + + # Verify that authenticator fetched the token exactly once (subsequent calls use cached token) + final_fetch_count = fetch_counter[] + println("✅ Authenticator fetched token $final_fetch_count time(s)") + @test final_fetch_count == 1 + + # Test token invalidation and re-fetch + println("\nTesting token invalidation and re-fetch...") + + # Invalidate the cached token by setting it to an invalid value + cached_token_ref[] = "invalid_token_foo" + println("✅ Invalidated cached token") + + # Attempt to list namespaces with invalid token - should fail with 401 + println("Attempting to list namespaces with invalid token...") + error_occurred = false + error_msg = "" + try + root_namespaces = RustyIceberg.list_namespaces(catalog) + # If we get here, the test should fail + catch err + error_occurred = true + error_msg = string(err) + println("✅ Got expected error with invalid token: $(typeof(err))") + println(" Error message: $error_msg") + end + + # Verify that an error occurred when using invalid token + @test error_occurred + + # Check if error message contains indication of 401 or authentication failure + has_401 = contains(error_msg, "401") + has_unauthorized = contains(error_msg, "Unauthorized") || contains(error_msg, "unauthorized") + has_unexpected = contains(error_msg, "unexpected status code") + has_auth_error = has_401 || has_unauthorized || has_unexpected + + @test has_auth_error + + println("✅ Token invalidation test passed") + finally + # Clean up + if catalog != C_NULL + RustyIceberg.free_catalog(catalog) + println("✅ Catalog cleaned up successfully") + end + end + + println("✅ Catalog API with custom authenticator function tests completed!") +end + @testset "Catalog Table Loading" begin println("Testing catalog table loading...")