From d1bb37a69af463b7c2eb1179d81127c68b9da72e Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:33:17 +0100 Subject: [PATCH 01/30] Add custom authenticator support --- iceberg_rust_ffi/Cargo.lock | 3 +- iceberg_rust_ffi/Cargo.toml | 5 +- iceberg_rust_ffi/src/catalog.rs | 164 +++++++++++++++++++++++++++++--- src/catalog.jl | 124 ++++++++++++++++++++++-- test/runtests.jl | 95 ++++++++++++++++++ 5 files changed, 366 insertions(+), 25 deletions(-) diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 4cd95e9..6811705 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1523,7 +1523,6 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=5210ce1d93ff6cb70ef382b7573a9b9331ee4e52#5210ce1d93ff6cb70ef382b7573a9b9331ee4e52" dependencies = [ "anyhow", "apache-avro", @@ -1578,7 +1577,6 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=5210ce1d93ff6cb70ef382b7573a9b9331ee4e52#5210ce1d93ff6cb70ef382b7573a9b9331ee4e52" dependencies = [ "async-trait", "chrono", @@ -1602,6 +1600,7 @@ dependencies = [ "anyhow", "arrow-array", "arrow-ipc", + "async-trait", "futures", "home", "iceberg", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index b6f9aa5..07748c1 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -12,8 +12,8 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "5210ce1d93ff6cb70ef382b7573a9b9331ee4e52", features = ["storage-azdls"] } -iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "5210ce1d93ff6cb70ef382b7573a9b9331ee4e52" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "0e802be6ff998b14f0565edede458e2b5958bff2", features = ["storage-azdls"] } +iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "0e802be6ff998b14f0565edede458e2b5958bff2" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" @@ -24,6 +24,7 @@ arrow-ipc = "57.1" tracing-subscriber = "0.3" tracing = "0.1" once_cell = "1.19" +async-trait = "0.1" # Pin home to 0.5.9 to support rustc < 1.88 # (0.5.11+ requires rustc 1.88+) home = "=0.5.9" diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 1d553e0..ba72beb 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -1,10 +1,12 @@ /// Catalog support for iceberg_rust_ffi use crate::IcebergTable; use anyhow::Result; -use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent}; -use iceberg_catalog_rest::RestCatalogBuilder; +use async_trait::async_trait; +use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, NamespaceIdent, TableIdent}; +use iceberg_catalog_rest::{RestCatalogBuilder, TokenAuthenticator}; use std::collections::HashMap; -use std::ffi::{c_char, c_void}; +use std::ffi::{c_char, c_void, CString}; +use std::sync::Arc; // FFI exports use object_store_ffi::{ @@ -15,12 +17,90 @@ use object_store_ffi::{ use crate::util::{parse_c_string, parse_properties, parse_string_array}; use crate::PropertyEntry; +/// Callback function type for token authentication from FFI +/// +/// The callback receives: +/// - user_data: opaque pointer to user context (e.g., Julia closure) +/// - token_ptr: output pointer where the token string should be written +/// +/// Returns: +/// - 0 for success (token_ptr must point to a CString allocated with CString::into_raw) +/// - non-zero for error +pub type TokenAuthenticatorCallback = + extern "C" fn(user_data: *mut c_void, token_ptr: *mut *mut c_char) -> i32; + +/// Rust implementation of TokenAuthenticator that calls a C callback with user_data +#[derive(Debug, Clone)] +struct FFITokenAuthenticator { + callback: TokenAuthenticatorCallback, + user_data: *mut c_void, +} + +// SAFETY: We trust that the Julia callback is thread-safe. +// The user_data pointer is opaque and its thread-safety is the caller's responsibility. +unsafe impl Send for FFITokenAuthenticator {} +unsafe impl Sync for FFITokenAuthenticator {} + +#[async_trait] +impl TokenAuthenticator for FFITokenAuthenticator { + async fn get_token(&self) -> iceberg::Result { + let mut token_ptr: *mut c_char = std::ptr::null_mut(); + + let result = (self.callback)(self.user_data, &mut token_ptr); + + if result != 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Token authenticator callback failed", + )); + } + + if token_ptr.is_null() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Token authenticator returned null pointer", + )); + } + + // SAFETY: The callback is responsible for ensuring token_ptr is a valid + // null-terminated C string that was allocated with CString::into_raw + let token_cstring = unsafe { CString::from_raw(token_ptr) }; + + token_cstring.into_string().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid UTF-8 in token: {}", e), + ) + }) + } +} + /// Opaque catalog handle for FFI +/// Holds a raw pointer to a RestCatalog allocated on the heap. +/// The catalog is owned by this struct and cleaned up when dropped. #[repr(C)] pub struct IcebergCatalog { - catalog: Box, + catalog: *mut iceberg_catalog_rest::RestCatalog, +} + +impl Drop for IcebergCatalog { + fn drop(&mut self) { + unsafe { + if !self.catalog.is_null() { + let _ = Box::from_raw(self.catalog); + } + } + } } +// SAFETY: The catalog pointer is owned exclusively by this struct. +// Send and Sync are safe because: +// 1. The RestCatalog is accessed only through this struct +// 2. We enforce exclusive mutable access for operations that mutate (set_token_authenticator) +// 3. The pointer is never shared or aliased from FFI +unsafe impl Send for IcebergCatalog {} +unsafe impl Sync for IcebergCatalog {} + impl IcebergCatalog { /// Create a new REST catalog pub async fn create_rest(uri: String, props: HashMap) -> Result { @@ -32,10 +112,40 @@ impl IcebergCatalog { .await?; Ok(IcebergCatalog { - catalog: Box::new(catalog), + catalog: Box::into_raw(Box::new(catalog)), }) } + /// Set a custom token authenticator + /// Must be called before the first catalog operation + pub fn set_token_authenticator( + &mut self, + callback: TokenAuthenticatorCallback, + user_data: *mut c_void, + ) -> Result<()> { + if self.catalog.is_null() { + return Err(anyhow::anyhow!("Catalog is null")); + } + + let authenticator = Arc::new(FFITokenAuthenticator { + callback, + user_data, + }); + + // SAFETY: We own the catalog through the raw pointer. This is safe because: + // 1. with_token_authenticator is synchronous + // 2. It only updates internal state and returns self + // 3. We have exclusive mutable access through &mut self + // 4. We use ptr::read/write to move the value without Clone + unsafe { + let catalog = std::ptr::read(self.catalog); + let updated = catalog.with_token_authenticator(authenticator); + std::ptr::write(self.catalog, updated); + } + + Ok(()) + } + /// Load a table by namespace and name pub async fn load_table( &self, @@ -44,7 +154,8 @@ impl IcebergCatalog { ) -> Result { let namespace = NamespaceIdent::from_vec(namespace_parts)?; let table_ident = TableIdent::new(namespace, table_name); - let table = self.catalog.load_table(&table_ident).await?; + // SAFETY: catalog is valid as long as self is valid + let table = unsafe { (*self.catalog).load_table(&table_ident).await? }; Ok(IcebergTable { table }) } @@ -52,7 +163,8 @@ impl IcebergCatalog { /// List tables in a namespace pub async fn list_tables(&self, namespace_parts: Vec) -> Result> { let namespace = NamespaceIdent::from_vec(namespace_parts)?; - let tables = self.catalog.list_tables(&namespace).await?; + // SAFETY: catalog is valid as long as self is valid + let tables = unsafe { (*self.catalog).list_tables(&namespace).await? }; Ok(tables.into_iter().map(|t| t.name().to_string()).collect()) } @@ -68,7 +180,12 @@ impl IcebergCatalog { None }; - let namespaces = self.catalog.list_namespaces(parent.as_ref()).await?; + // SAFETY: catalog is valid as long as self is valid + let namespaces = unsafe { + (*self.catalog) + .list_namespaces(parent.as_ref()) + .await? + }; Ok(namespaces .into_iter() @@ -84,9 +201,8 @@ impl IcebergCatalog { ) -> Result { let namespace = NamespaceIdent::from_vec(namespace_parts)?; let table_ident = TableIdent::new(namespace, table_name); - self.catalog - .table_exists(&table_ident) - .await + // SAFETY: catalog is valid as long as self is valid + unsafe { (*self.catalog).table_exists(&table_ident).await } .map_err(|e| anyhow::anyhow!(e)) } } @@ -417,3 +533,29 @@ export_runtime_op!( namespace_parts_len: usize, table_name: *const c_char ); + +/// Set a custom token authenticator for the catalog +#[no_mangle] +pub extern "C" fn iceberg_catalog_set_token_authenticator( + catalog: *mut IcebergCatalog, + callback: TokenAuthenticatorCallback, + user_data: *mut c_void, +) -> crate::CResult { + // Check for null catalog pointer + if catalog.is_null() { + return crate::CResult::Error; + } + + // SAFETY: catalog was checked to be non-null above. + // The caller must ensure the catalog pointer remains valid for the duration of this call. + let catalog_ref = unsafe { &mut *catalog }; + + // Call the synchronous set_token_authenticator method + match catalog_ref.set_token_authenticator(callback, user_data) { + Ok(()) => crate::CResult::Ok, + Err(e) => { + eprintln!("Error setting token authenticator: {}", e); + crate::CResult::Error + } + } +} diff --git a/src/catalog.jl b/src/catalog.jl index eb984ee..a9d1f1d 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -7,11 +7,23 @@ This module provides Julia wrappers for the REST catalog FFI functions. """ Catalog -Opaque pointer type representing an Iceberg catalog handle from the Rust FFI layer. +Mutable struct holding an Iceberg catalog handle from the Rust FFI layer. + +The struct stores both the raw catalog pointer and optionally an authenticator function, +ensuring the authenticator stays alive while the catalog is in use. Create a catalog using `catalog_create_rest` and free it with `free_catalog` when done. """ -const Catalog = Ptr{Cvoid} +mutable struct Catalog + ptr::Ptr{Cvoid} + authenticator::Union{Nothing, Ref} +end + +# Constructor for simple catalogs without authenticator +Catalog(ptr::Ptr{Cvoid}) = Catalog(ptr, nothing) + +# Support conversion to Ptr for FFI calls +Base.unsafe_convert(::Type{Ptr{Cvoid}}, catalog::Catalog) = catalog.ptr """ CatalogResponse @@ -20,13 +32,13 @@ Response structure for catalog creation operations. # Fields - `result::Cint`: Result code from the operation (0 for success) -- `catalog::Catalog`: The created catalog handle +- `catalog::Ptr{Cvoid}`: The created catalog handle (raw pointer) - `error_message::Ptr{Cchar}`: Error message string if operation failed - `context::Ptr{Cvoid}`: Context pointer for operation cancellation """ mutable struct CatalogResponse result::Cint - catalog::Catalog + catalog::Ptr{Cvoid} error_message::Ptr{Cchar} context::Ptr{Cvoid} @@ -136,7 +148,99 @@ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{S @throw_on_error(response, "catalog_create_rest", IcebergException) - return response.catalog + return Catalog(response.catalog, nothing) +end + +""" + catalog_create_rest(authenticator::Function, uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog + +Create a REST catalog connection with custom token authentication. + +# Arguments +- `authenticator::Function`: A callable that takes no arguments and returns a token string. + The function will be called whenever a new token is needed for authentication. +- `uri::String`: URI of the Iceberg REST catalog server (e.g., "http://localhost:8181") +- `properties::Dict{String,String}`: Optional key-value properties for catalog configuration. + By default (empty dict), no additional properties are passed. + +# Returns +- A `Catalog` handle for use in other catalog operations + +# Example +```julia +function get_token() + return ENV["ICEBERG_TOKEN"] +end + +catalog = catalog_create_rest(get_token, "http://polaris:8181") +``` +""" +function catalog_create_rest(authenticator::Function, uri::String; properties::Dict{String,String}=Dict{String,String}()) + # First create the base catalog without the authenticator + catalog = catalog_create_rest(uri; properties=properties) + + # Wrap the authenticator in a mutable container so we can get a stable pointer + authenticator_ref = Ref(authenticator) + + # Create the C callback function that wraps the Julia authenticator + c_callback = @cfunction( + $( + function token_callback(user_data::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}})::Cint + try + # Get the authenticator function from user_data (cast back to its Ref) + # We don't annotate the type here to avoid type mismatch with closures + auth_ref = unsafe_pointer_to_objref(user_data) + auth_fn = auth_ref[] + + # Call the authenticator to get the token + token_str = auth_fn()::String + + # Allocate C string using libc malloc and copy the token + # This matches what Rust expects: a CString allocated with libc malloc + token_bytes = Vector{UInt8}(token_str) + token_len = length(token_bytes) + c_str_ptr = @ccall malloc((token_len + 1)::Csize_t)::Ptr{Cchar} + + if c_str_ptr == C_NULL + return Cint(1) # Error: allocation failed + end + + # Copy the token bytes to the allocated memory + unsafe_copyto!(convert(Ptr{UInt8}, c_str_ptr), pointer(token_bytes), token_len) + # Add null terminator + unsafe_store!(convert(Ptr{UInt8}, c_str_ptr), UInt8(0), token_len + 1) + + # Write the pointer to the output parameter + unsafe_store!(token_ptr, c_str_ptr) + + return Cint(0) + catch e + # Return error code on exception + return Cint(1) + end + end + ), + Cint, + (Ptr{Cvoid}, Ptr{Ptr{Cchar}}) + ) + + # Set the authenticator on the catalog using the raw pointer from the Catalog struct + user_data = pointer_from_objref(authenticator_ref) + result = @ccall rust_lib.iceberg_catalog_set_token_authenticator( + catalog.ptr::Ptr{Cvoid}, + c_callback::Ptr{Cvoid}, + user_data::Ptr{Cvoid} + )::Cint + + if result != 0 + free_catalog(catalog) + throw(IcebergException("Failed to set token authenticator")) + end + + # Store the authenticator_ref in the catalog struct to keep it alive + catalog.authenticator = authenticator_ref + + return catalog end """ @@ -145,7 +249,7 @@ end Free the memory associated with a catalog. """ function free_catalog(catalog::Catalog) - @ccall rust_lib.iceberg_catalog_free(catalog::Catalog)::Cvoid + @ccall rust_lib.iceberg_catalog_free(catalog.ptr::Ptr{Cvoid})::Cvoid end """ @@ -175,7 +279,7 @@ function load_table(catalog::Catalog, namespace::Vector{String}, table_name::Str async_ccall(response, namespace, namespace_ptrs) do handle @ccall rust_lib.iceberg_catalog_load_table( - catalog::Catalog, + catalog.ptr::Ptr{Cvoid}, (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, namespace_len::Csize_t, table_name::Cstring, @@ -215,7 +319,7 @@ function list_tables(catalog::Catalog, namespace::Vector{String}) async_ccall(response, namespace, namespace_ptrs) do handle @ccall rust_lib.iceberg_catalog_list_tables( - catalog::Catalog, + catalog.ptr::Ptr{Cvoid}, (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, namespace_len::Csize_t, response::Ref{StringListResponse}, @@ -272,7 +376,7 @@ function list_namespaces(catalog::Catalog, parent::Vector{String}=String[]) async_ccall(response, parent, parent_ptrs) do handle @ccall rust_lib.iceberg_catalog_list_namespaces( - catalog::Catalog, + catalog.ptr::Ptr{Cvoid}, (parent_len > 0 ? pointer(parent_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, parent_len::Csize_t, response::Ref{NestedStringListResponse}, @@ -313,7 +417,7 @@ function table_exists(catalog::Catalog, namespace::Vector{String}, table_name::S async_ccall(response, namespace, namespace_ptrs) do handle @ccall rust_lib.iceberg_catalog_table_exists( - catalog::Catalog, + catalog.ptr::Ptr{Cvoid}, (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, namespace_len::Csize_t, table_name::Cstring, diff --git a/test/runtests.jl b/test/runtests.jl index c28e83a..8bd27f3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -657,6 +657,101 @@ end println("✅ Catalog API with token authentication tests completed!") end +@testset "Catalog API with Custom Authenticator Function" begin + println("Testing catalog API with custom authenticator function...") + + using HTTP + using JSON + using Base64 + + # Token endpoint + token_endpoint = "http://localhost:8181/api/catalog/v1/oauth/tokens" + catalog_uri = "http://localhost:8181/api/catalog" + + # Client credentials + client_id = "root" + client_secret = "s3cr3t" + realm = "POLARIS" + + # Create a custom authenticator function that fetches tokens on demand + function get_token() + # Fetch access token using client credentials + credentials = base64encode("$client_id:$client_secret") + auth_header = "Basic $credentials" + body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" + + token_response = HTTP.post( + token_endpoint; + headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], + body=body, + status_exception=false + ) + + if token_response.status != 200 + error("Failed to fetch token: $(String(token_response.body))") + end + + token_data = JSON.parse(String(token_response.body)) + return token_data["access_token"] + end + + catalog = C_NULL + try + # Test catalog creation with custom authenticator function + println("Creating catalog with custom authenticator function...") + props = Dict( + "warehouse" => "warehouse" + ) + catalog = RustyIceberg.catalog_create_rest(get_token, catalog_uri; properties=props) + @test catalog != C_NULL + println("✅ Catalog created successfully with custom authenticator function") + + # Test listing namespaces to verify authentication works + println("Listing namespaces with custom authenticator...") + root_namespaces = RustyIceberg.list_namespaces(catalog) + @test isa(root_namespaces, Vector{Vector{String}}) + @test length(root_namespaces) >= 2 + println("✅ Namespaces listed: $root_namespaces") + + # Verify expected namespaces exist + @test ["tpch.sf01"] in root_namespaces + println("✅ Expected namespace 'tpch.sf01' verified") + + # Test listing tables in tpch.sf01 + println("Listing tables in tpch.sf01...") + tpch_tables = RustyIceberg.list_tables(catalog, ["tpch.sf01"]) + @test isa(tpch_tables, Vector{String}) + @test length(tpch_tables) > 0 + println("✅ Tables in tpch.sf01: $tpch_tables") + + # Verify expected TPCH tables exist + expected_tables = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"] + for table in expected_tables + @test table in tpch_tables + end + println("✅ All expected TPCH tables found: $expected_tables") + + # Test table existence check + println("Verifying table existence for TPCH tables...") + for table in expected_tables + exists = RustyIceberg.table_exists(catalog, ["tpch.sf01"], table) + @test exists == true + end + println("✅ All TPCH tables verified to exist: $expected_tables") + + catch e + rethrow(e) + finally + # Clean up + if catalog != C_NULL + RustyIceberg.free_catalog(catalog) + println("✅ Catalog cleaned up successfully") + end + end + + println("✅ Catalog API with custom authenticator function tests completed!") +end + @testset "Catalog Table Loading" begin println("Testing catalog table loading...") From 1286b46e34e36a1c1f35bef5ca02960dbc1d8a38 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:36:20 +0100 Subject: [PATCH 02/30] Format --- iceberg_rust_ffi/src/catalog.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index ba72beb..7663aa2 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -181,11 +181,7 @@ impl IcebergCatalog { }; // SAFETY: catalog is valid as long as self is valid - let namespaces = unsafe { - (*self.catalog) - .list_namespaces(parent.as_ref()) - .await? - }; + let namespaces = unsafe { (*self.catalog).list_namespaces(parent.as_ref()).await? }; Ok(namespaces .into_iter() @@ -202,8 +198,7 @@ impl IcebergCatalog { let namespace = NamespaceIdent::from_vec(namespace_parts)?; let table_ident = TableIdent::new(namespace, table_name); // SAFETY: catalog is valid as long as self is valid - unsafe { (*self.catalog).table_exists(&table_ident).await } - .map_err(|e| anyhow::anyhow!(e)) + unsafe { (*self.catalog).table_exists(&table_ident).await }.map_err(|e| anyhow::anyhow!(e)) } } From f8f4c2b795f0db743aab32d1f1b335b57b1af9f1 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:38:10 +0100 Subject: [PATCH 03/30] Cleanup --- iceberg_rust_ffi/src/catalog.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 7663aa2..ca89b56 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -205,7 +205,7 @@ impl IcebergCatalog { /// Response type for catalog operations that return a catalog #[repr(C)] pub struct IcebergCatalogResponse { - pub result: crate::CResult, + pub result: CResult, pub catalog: *mut IcebergCatalog, pub error_message: *mut c_char, pub context: *const crate::Context, @@ -216,7 +216,7 @@ unsafe impl Send for IcebergCatalogResponse {} impl crate::RawResponse for IcebergCatalogResponse { type Payload = IcebergCatalog; - fn result_mut(&mut self) -> &mut crate::CResult { + fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -242,7 +242,7 @@ impl crate::RawResponse for IcebergCatalogResponse { /// Response type for string list operations #[repr(C)] pub struct IcebergStringListResponse { - pub result: crate::CResult, + pub result: CResult, pub items: *mut *mut c_char, pub count: usize, pub error_message: *mut c_char, @@ -254,7 +254,7 @@ unsafe impl Send for IcebergStringListResponse {} impl crate::RawResponse for IcebergStringListResponse { type Payload = Vec; - fn result_mut(&mut self) -> &mut crate::CResult { + fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -292,7 +292,7 @@ impl crate::RawResponse for IcebergStringListResponse { /// Response type for boolean operations #[repr(C)] pub struct IcebergBoolResponse { - pub result: crate::CResult, + pub result: CResult, pub value: bool, pub error_message: *mut c_char, pub context: *const crate::Context, @@ -303,7 +303,7 @@ unsafe impl Send for IcebergBoolResponse {} impl crate::RawResponse for IcebergBoolResponse { type Payload = bool; - fn result_mut(&mut self) -> &mut crate::CResult { + fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -326,7 +326,7 @@ impl crate::RawResponse for IcebergBoolResponse { /// Response type for nested string list operations (for namespace lists) #[repr(C)] pub struct IcebergNestedStringListResponse { - pub result: crate::CResult, + pub result: CResult, pub outer_items: *mut *mut *mut c_char, pub outer_count: usize, pub inner_counts: *mut usize, @@ -339,7 +339,7 @@ unsafe impl Send for IcebergNestedStringListResponse {} impl crate::RawResponse for IcebergNestedStringListResponse { type Payload = Vec>; - fn result_mut(&mut self) -> &mut crate::CResult { + fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -535,10 +535,10 @@ pub extern "C" fn iceberg_catalog_set_token_authenticator( catalog: *mut IcebergCatalog, callback: TokenAuthenticatorCallback, user_data: *mut c_void, -) -> crate::CResult { +) -> CResult { // Check for null catalog pointer if catalog.is_null() { - return crate::CResult::Error; + return CResult::Error; } // SAFETY: catalog was checked to be non-null above. @@ -547,10 +547,10 @@ pub extern "C" fn iceberg_catalog_set_token_authenticator( // Call the synchronous set_token_authenticator method match catalog_ref.set_token_authenticator(callback, user_data) { - Ok(()) => crate::CResult::Ok, + Ok(()) => CResult::Ok, Err(e) => { eprintln!("Error setting token authenticator: {}", e); - crate::CResult::Error + CResult::Error } } } From 664931bc3af73fafc6c509f08ee47935791d0e09 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:39:19 +0100 Subject: [PATCH 04/30] Simplify --- test/runtests.jl | 6 ------ 1 file changed, 6 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 8bd27f3..8c1e979 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -643,9 +643,6 @@ end @test table in tpch_tables end println("✅ All expected TPCH tables found: $expected_tables") - - catch e - rethrow(e) finally # Clean up if catalog != C_NULL @@ -738,9 +735,6 @@ end @test exists == true end println("✅ All TPCH tables verified to exist: $expected_tables") - - catch e - rethrow(e) finally # Clean up if catalog != C_NULL From 47f7f58a70ba073c6cd5baf1e638002921ea26dc Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:44:12 +0100 Subject: [PATCH 05/30] . --- iceberg_rust_ffi/src/catalog.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index ca89b56..70d8ba8 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -132,15 +132,13 @@ impl IcebergCatalog { user_data, }); - // SAFETY: We own the catalog through the raw pointer. This is safe because: - // 1. with_token_authenticator is synchronous - // 2. It only updates internal state and returns self - // 3. We have exclusive mutable access through &mut self - // 4. We use ptr::read/write to move the value without Clone + // SAFETY: catalog was checked to be non-null above. + // Box::from_raw takes ownership of the heap-allocated catalog. + // We immediately apply with_token_authenticator and convert back with into_raw. unsafe { - let catalog = std::ptr::read(self.catalog); - let updated = catalog.with_token_authenticator(authenticator); - std::ptr::write(self.catalog, updated); + let mut catalog = Box::from_raw(self.catalog); + *catalog = catalog.with_token_authenticator(authenticator); + self.catalog = Box::into_raw(catalog); } Ok(()) From 5bf1b99932e86cea678cdb2e3e57f75932482cdd Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:45:52 +0100 Subject: [PATCH 06/30] Add helper --- iceberg_rust_ffi/src/catalog.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 70d8ba8..0986aab 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -116,6 +116,16 @@ impl IcebergCatalog { }) } + /// Get a reference to the underlying RestCatalog. + /// + /// SAFETY: Returns a reference valid only for the lifetime of self. + /// The caller must ensure this method is not called concurrently with set_token_authenticator. + fn as_ref(&self) -> &iceberg_catalog_rest::RestCatalog { + // SAFETY: catalog is checked to be non-null in set_token_authenticator and during creation. + // It's only modified by set_token_authenticator with exclusive &mut access. + unsafe { &*self.catalog } + } + /// Set a custom token authenticator /// Must be called before the first catalog operation pub fn set_token_authenticator( @@ -152,8 +162,7 @@ impl IcebergCatalog { ) -> Result { let namespace = NamespaceIdent::from_vec(namespace_parts)?; let table_ident = TableIdent::new(namespace, table_name); - // SAFETY: catalog is valid as long as self is valid - let table = unsafe { (*self.catalog).load_table(&table_ident).await? }; + let table = self.as_ref().load_table(&table_ident).await?; Ok(IcebergTable { table }) } @@ -161,8 +170,7 @@ impl IcebergCatalog { /// List tables in a namespace pub async fn list_tables(&self, namespace_parts: Vec) -> Result> { let namespace = NamespaceIdent::from_vec(namespace_parts)?; - // SAFETY: catalog is valid as long as self is valid - let tables = unsafe { (*self.catalog).list_tables(&namespace).await? }; + let tables = self.as_ref().list_tables(&namespace).await?; Ok(tables.into_iter().map(|t| t.name().to_string()).collect()) } @@ -178,8 +186,7 @@ impl IcebergCatalog { None }; - // SAFETY: catalog is valid as long as self is valid - let namespaces = unsafe { (*self.catalog).list_namespaces(parent.as_ref()).await? }; + let namespaces = self.as_ref().list_namespaces(parent.as_ref()).await?; Ok(namespaces .into_iter() @@ -195,8 +202,7 @@ impl IcebergCatalog { ) -> Result { let namespace = NamespaceIdent::from_vec(namespace_parts)?; let table_ident = TableIdent::new(namespace, table_name); - // SAFETY: catalog is valid as long as self is valid - unsafe { (*self.catalog).table_exists(&table_ident).await }.map_err(|e| anyhow::anyhow!(e)) + self.as_ref().table_exists(&table_ident).await.map_err(|e| anyhow::anyhow!(e)) } } From 8f130d58fad4033d93a9744c3190d4cecd07bee4 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 11:47:30 +0100 Subject: [PATCH 07/30] Format --- iceberg_rust_ffi/src/catalog.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 0986aab..2c18767 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -202,7 +202,10 @@ impl IcebergCatalog { ) -> Result { let namespace = NamespaceIdent::from_vec(namespace_parts)?; let table_ident = TableIdent::new(namespace, table_name); - self.as_ref().table_exists(&table_ident).await.map_err(|e| anyhow::anyhow!(e)) + self.as_ref() + .table_exists(&table_ident) + .await + .map_err(|e| anyhow::anyhow!(e)) } } From f84898043ca362b630b0c868dbed873b9cd5a317 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 13:31:13 +0100 Subject: [PATCH 08/30] Refactor to match iceberg-rust --- iceberg_rust_ffi/Cargo.lock | 2 + iceberg_rust_ffi/Cargo.toml | 4 +- iceberg_rust_ffi/src/catalog.rs | 102 +++++++++++++++++--------------- 3 files changed, 58 insertions(+), 50 deletions(-) diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 6811705..9ccee1c 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1523,6 +1523,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=34829106fbb6ef94785edd99f784ea767a4aae44#34829106fbb6ef94785edd99f784ea767a4aae44" dependencies = [ "anyhow", "apache-avro", @@ -1577,6 +1578,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.7.0" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=34829106fbb6ef94785edd99f784ea767a4aae44#34829106fbb6ef94785edd99f784ea767a4aae44" dependencies = [ "async-trait", "chrono", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 07748c1..0c60c63 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -12,8 +12,8 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "0e802be6ff998b14f0565edede458e2b5958bff2", features = ["storage-azdls"] } -iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "0e802be6ff998b14f0565edede458e2b5958bff2" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44", features = ["storage-azdls"] } +iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 2c18767..7f4fc4d 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -3,7 +3,7 @@ use crate::IcebergTable; use anyhow::Result; use async_trait::async_trait; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, NamespaceIdent, TableIdent}; -use iceberg_catalog_rest::{RestCatalogBuilder, TokenAuthenticator}; +use iceberg_catalog_rest::{CustomAuthenticator, RestCatalog, RestCatalogBuilder}; use std::collections::HashMap; use std::ffi::{c_char, c_void, CString}; use std::sync::Arc; @@ -17,7 +17,7 @@ use object_store_ffi::{ use crate::util::{parse_c_string, parse_properties, parse_string_array}; use crate::PropertyEntry; -/// Callback function type for token authentication from FFI +/// Callback function type for custom token authentication from FFI /// /// The callback receives: /// - user_data: opaque pointer to user context (e.g., Julia closure) @@ -26,13 +26,13 @@ use crate::PropertyEntry; /// Returns: /// - 0 for success (token_ptr must point to a CString allocated with CString::into_raw) /// - non-zero for error -pub type TokenAuthenticatorCallback = +pub type CustomAuthenticatorCallback = extern "C" fn(user_data: *mut c_void, token_ptr: *mut *mut c_char) -> i32; -/// Rust implementation of TokenAuthenticator that calls a C callback with user_data +/// Rust implementation of CustomAuthenticator that calls a C callback with user_data #[derive(Debug, Clone)] struct FFITokenAuthenticator { - callback: TokenAuthenticatorCallback, + callback: CustomAuthenticatorCallback, user_data: *mut c_void, } @@ -42,7 +42,7 @@ unsafe impl Send for FFITokenAuthenticator {} unsafe impl Sync for FFITokenAuthenticator {} #[async_trait] -impl TokenAuthenticator for FFITokenAuthenticator { +impl CustomAuthenticator for FFITokenAuthenticator { async fn get_token(&self) -> iceberg::Result { let mut token_ptr: *mut c_char = std::ptr::null_mut(); @@ -78,16 +78,18 @@ impl TokenAuthenticator for FFITokenAuthenticator { /// Opaque catalog handle for FFI /// Holds a raw pointer to a RestCatalog allocated on the heap. /// The catalog is owned by this struct and cleaned up when dropped. -#[repr(C)] +/// Also stores the authenticator to allow setting it before catalog creation. pub struct IcebergCatalog { - catalog: *mut iceberg_catalog_rest::RestCatalog, + catalog: Option<*mut RestCatalog>, + /// Stores a pending authenticator to be applied before first use + authenticator: Option>, } impl Drop for IcebergCatalog { fn drop(&mut self) { unsafe { - if !self.catalog.is_null() { - let _ = Box::from_raw(self.catalog); + if let Some(catalog_ptr) = self.catalog { + let _ = Box::from_raw(catalog_ptr); } } } @@ -101,59 +103,63 @@ impl Drop for IcebergCatalog { unsafe impl Send for IcebergCatalog {} unsafe impl Sync for IcebergCatalog {} +impl Default for IcebergCatalog { + fn default() -> Self { + IcebergCatalog { + catalog: None, + authenticator: None, + } + } +} + impl IcebergCatalog { - /// Create a new REST catalog - pub async fn create_rest(uri: String, props: HashMap) -> Result { + /// Create and initialize a REST catalog with optional authenticator + pub async fn create_rest( + mut self, + uri: String, + props: HashMap, + ) -> Result { let mut catalog_props = props; catalog_props.insert("uri".to_string(), uri); - let catalog = RestCatalogBuilder::default() - .load("rest", catalog_props) - .await?; + // Apply authenticator to builder if set + let mut builder = RestCatalogBuilder::default(); + if let Some(ref authenticator) = self.authenticator { + builder = builder.with_token_authenticator(authenticator.clone()); + } - Ok(IcebergCatalog { - catalog: Box::into_raw(Box::new(catalog)), - }) - } + let catalog = builder.load("rest", catalog_props).await?; + self.catalog = Some(Box::into_raw(Box::new(catalog))); - /// Get a reference to the underlying RestCatalog. - /// - /// SAFETY: Returns a reference valid only for the lifetime of self. - /// The caller must ensure this method is not called concurrently with set_token_authenticator. - fn as_ref(&self) -> &iceberg_catalog_rest::RestCatalog { - // SAFETY: catalog is checked to be non-null in set_token_authenticator and during creation. - // It's only modified by set_token_authenticator with exclusive &mut access. - unsafe { &*self.catalog } + Ok(self) } - /// Set a custom token authenticator - /// Must be called before the first catalog operation + /// Set a custom token authenticator before catalog creation pub fn set_token_authenticator( &mut self, - callback: TokenAuthenticatorCallback, + callback: CustomAuthenticatorCallback, user_data: *mut c_void, ) -> Result<()> { - if self.catalog.is_null() { - return Err(anyhow::anyhow!("Catalog is null")); - } - let authenticator = Arc::new(FFITokenAuthenticator { callback, user_data, }); - // SAFETY: catalog was checked to be non-null above. - // Box::from_raw takes ownership of the heap-allocated catalog. - // We immediately apply with_token_authenticator and convert back with into_raw. - unsafe { - let mut catalog = Box::from_raw(self.catalog); - *catalog = catalog.with_token_authenticator(authenticator); - self.catalog = Box::into_raw(catalog); - } + // Store the authenticator to be used when building the catalog + self.authenticator = Some(authenticator); Ok(()) } + /// Get a reference to the underlying RestCatalog. + /// + /// SAFETY: Returns a reference valid only for the lifetime of self. + /// The caller must ensure the catalog is initialized before calling this. + fn as_ref(&self) -> &RestCatalog { + // SAFETY: catalog is checked to be Some during create_rest. + unsafe { &*self.catalog.expect("catalog should be initialized") } + } + /// Load a table by namespace and name pub async fn load_table( &self, @@ -420,12 +426,13 @@ export_runtime_op!( || { let uri_str = parse_c_string(uri, "uri")?; let props = parse_properties(properties, properties_len)?; - Ok((uri_str, props)) + Ok::<(String, HashMap), anyhow::Error>((uri_str, props)) }, result_tuple, async { let (uri, props) = result_tuple; - IcebergCatalog::create_rest(uri, props).await + let catalog = IcebergCatalog::default().create_rest(uri, props).await?; + Ok::(catalog) }, uri: *const c_char, properties: *const PropertyEntry, @@ -540,7 +547,7 @@ export_runtime_op!( #[no_mangle] pub extern "C" fn iceberg_catalog_set_token_authenticator( catalog: *mut IcebergCatalog, - callback: TokenAuthenticatorCallback, + callback: CustomAuthenticatorCallback, user_data: *mut c_void, ) -> CResult { // Check for null catalog pointer @@ -550,10 +557,9 @@ pub extern "C" fn iceberg_catalog_set_token_authenticator( // SAFETY: catalog was checked to be non-null above. // The caller must ensure the catalog pointer remains valid for the duration of this call. - let catalog_ref = unsafe { &mut *catalog }; + let result = unsafe { &mut *catalog }.set_token_authenticator(callback, user_data); - // Call the synchronous set_token_authenticator method - match catalog_ref.set_token_authenticator(callback, user_data) { + match result { Ok(()) => CResult::Ok, Err(e) => { eprintln!("Error setting token authenticator: {}", e); From ae66429e9a909cb40963acd0ed1ff86eabae381e Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 13:31:41 +0100 Subject: [PATCH 09/30] Bump version --- iceberg_rust_ffi/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 0c60c63..37776ea 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.7.0" +version = "0.7.1" edition = "2021" [lib] From 1e4042dff7ecf383062e5e8f4254ca2906878f8e Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 13:37:12 +0100 Subject: [PATCH 10/30] Make test more sophisticated --- test/runtests.jl | 59 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 8c1e979..7bd5df0 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -670,28 +670,48 @@ end client_secret = "s3cr3t" realm = "POLARIS" - # Create a custom authenticator function that fetches tokens on demand - function get_token() - # Fetch access token using client credentials - credentials = base64encode("$client_id:$client_secret") - auth_header = "Basic $credentials" - body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" + # Create a custom authenticator function that fetches tokens on demand and caches them + function authenticator() + # Track number of actual token fetches (not calls from cache) + fetch_count = Threads.Atomic{Int}(0) + cached_token = Ref{Union{Nothing, String}}(nothing) + + function get_token_impl() + # Return cached token if available + if cached_token[] !== nothing + return cached_token[]::String + end - token_response = HTTP.post( - token_endpoint; - headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], - body=body, - status_exception=false - ) + # Fetch access token using client credentials + credentials = base64encode("$client_id:$client_secret") + auth_header = "Basic $credentials" + body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" - if token_response.status != 200 - error("Failed to fetch token: $(String(token_response.body))") + token_response = HTTP.post( + token_endpoint; + headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], + body=body, + status_exception=false + ) + + if token_response.status != 200 + error("Failed to fetch token: $(String(token_response.body))") + end + + token_data = JSON.parse(String(token_response.body)) + token = token_data["access_token"] + + # Increment fetch count and cache the token + Threads.atomic_add!(fetch_count, 1) + cached_token[] = token + return token end - token_data = JSON.parse(String(token_response.body)) - return token_data["access_token"] + return get_token_impl, fetch_count end + auth_fn, fetch_counter = authenticator() + catalog = C_NULL try # Test catalog creation with custom authenticator function @@ -699,7 +719,7 @@ end props = Dict( "warehouse" => "warehouse" ) - catalog = RustyIceberg.catalog_create_rest(get_token, catalog_uri; properties=props) + catalog = RustyIceberg.catalog_create_rest(auth_fn, catalog_uri; properties=props) @test catalog != C_NULL println("✅ Catalog created successfully with custom authenticator function") @@ -735,6 +755,11 @@ end @test exists == true end println("✅ All TPCH tables verified to exist: $expected_tables") + + # Verify that token caching works (only one token should have been fetched) + final_fetch_count = fetch_counter[] + println("✅ Authenticator fetched token $final_fetch_count time(s) (token caching verified)") + @test final_fetch_count == 1 finally # Clean up if catalog != C_NULL From b1e69bf934bbc3c98cb4c7ed80bb3f92e149d324 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 13:39:39 +0100 Subject: [PATCH 11/30] . --- test/runtests.jl | 50 ++++++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 7bd5df0..143b8a1 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -674,37 +674,41 @@ end function authenticator() # Track number of actual token fetches (not calls from cache) fetch_count = Threads.Atomic{Int}(0) + # Use a lock to protect the cached token + token_lock = Threads.ReentrantLock() cached_token = Ref{Union{Nothing, String}}(nothing) function get_token_impl() - # Return cached token if available - if cached_token[] !== nothing - return cached_token[]::String - end + lock(token_lock) do + # Return cached token if available + if cached_token[] !== nothing + return cached_token[]::String + end - # Fetch access token using client credentials - credentials = base64encode("$client_id:$client_secret") - auth_header = "Basic $credentials" - body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" + # Fetch access token using client credentials + credentials = base64encode("$client_id:$client_secret") + auth_header = "Basic $credentials" + body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" - token_response = HTTP.post( - token_endpoint; - headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], - body=body, - status_exception=false - ) + token_response = HTTP.post( + token_endpoint; + headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], + body=body, + status_exception=false + ) - if token_response.status != 200 - error("Failed to fetch token: $(String(token_response.body))") - end + if token_response.status != 200 + error("Failed to fetch token: $(String(token_response.body))") + end - token_data = JSON.parse(String(token_response.body)) - token = token_data["access_token"] + token_data = JSON.parse(String(token_response.body)) + token = token_data["access_token"] - # Increment fetch count and cache the token - Threads.atomic_add!(fetch_count, 1) - cached_token[] = token - return token + # Increment fetch count and cache the token + Threads.atomic_add!(fetch_count, 1) + cached_token[] = token + return token + end end return get_token_impl, fetch_count From d7f97c95cbff28e52fcf813a75c753c51da61cdd Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 13:46:12 +0100 Subject: [PATCH 12/30] Debug --- test/runtests.jl | 61 ++++++++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 143b8a1..8f5ede5 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -670,45 +670,34 @@ end client_secret = "s3cr3t" realm = "POLARIS" - # Create a custom authenticator function that fetches tokens on demand and caches them + # Create a custom authenticator function that fetches tokens on demand function authenticator() - # Track number of actual token fetches (not calls from cache) + # Track number of actual token fetches fetch_count = Threads.Atomic{Int}(0) - # Use a lock to protect the cached token - token_lock = Threads.ReentrantLock() - cached_token = Ref{Union{Nothing, String}}(nothing) function get_token_impl() - lock(token_lock) do - # Return cached token if available - if cached_token[] !== nothing - return cached_token[]::String - end - - # Fetch access token using client credentials - credentials = base64encode("$client_id:$client_secret") - auth_header = "Basic $credentials" - body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" - - token_response = HTTP.post( - token_endpoint; - headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], - body=body, - status_exception=false - ) - - if token_response.status != 200 - error("Failed to fetch token: $(String(token_response.body))") - end + # Fetch access token using client credentials + credentials = base64encode("$client_id:$client_secret") + auth_header = "Basic $credentials" + body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" + + token_response = HTTP.post( + token_endpoint; + headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], + body=body, + status_exception=false + ) + + if token_response.status != 200 + error("Failed to fetch token: $(String(token_response.body))") + end - token_data = JSON.parse(String(token_response.body)) - token = token_data["access_token"] + token_data = JSON.parse(String(token_response.body)) + token = token_data["access_token"] - # Increment fetch count and cache the token - Threads.atomic_add!(fetch_count, 1) - cached_token[] = token - return token - end + # Increment fetch count + Threads.atomic_add!(fetch_count, 1) + return token end return get_token_impl, fetch_count @@ -760,10 +749,10 @@ end end println("✅ All TPCH tables verified to exist: $expected_tables") - # Verify that token caching works (only one token should have been fetched) + # Verify that authenticator was called multiple times (fresh token each time) final_fetch_count = fetch_counter[] - println("✅ Authenticator fetched token $final_fetch_count time(s) (token caching verified)") - @test final_fetch_count == 1 + println("✅ Authenticator fetched token $final_fetch_count time(s)") + @test final_fetch_count >= 1 finally # Clean up if catalog != C_NULL From a0bf5e0f0690771e65f676d46b34428413b3cd58 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 14:05:01 +0100 Subject: [PATCH 13/30] . --- iceberg_rust_ffi/Cargo.lock | 2 +- iceberg_rust_ffi/src/catalog.rs | 23 +++++++++++---- src/catalog.jl | 52 ++++++++++++++++++++++++++------- 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 9ccee1c..5fbd003 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1597,7 +1597,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.7.0" +version = "0.7.1" dependencies = [ "anyhow", "arrow-array", diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 7f4fc4d..f04706f 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -419,21 +419,34 @@ pub extern "C" fn iceberg_catalog_free(catalog: *mut IcebergCatalog) { // FFI Export functions for catalog operations // These functions are exported to be called from Julia via the FFI -// Create a REST catalog +// Create an empty catalog (default constructor exposed as sync FFI function) +#[no_mangle] +pub extern "C" fn iceberg_catalog_init() -> *mut IcebergCatalog { + Box::into_raw(Box::new(IcebergCatalog::default())) +} + +// Create a REST catalog from an existing catalog pointer (which may have an authenticator already set) +// This function takes ownership of the catalog pointer and returns it in the response export_runtime_op!( iceberg_rest_catalog_create, IcebergCatalogResponse, || { + if catalog.is_null() { + return Err(anyhow::anyhow!("Null catalog pointer provided")); + } + // SAFETY: catalog was checked to be non-null above and came from FFI + let catalog = unsafe { Box::from_raw(catalog) }; let uri_str = parse_c_string(uri, "uri")?; let props = parse_properties(properties, properties_len)?; - Ok::<(String, HashMap), anyhow::Error>((uri_str, props)) + Ok::<(Box, String, HashMap), anyhow::Error>((catalog, uri_str, props)) }, result_tuple, async { - let (uri, props) = result_tuple; - let catalog = IcebergCatalog::default().create_rest(uri, props).await?; - Ok::(catalog) + let (catalog, uri, props) = result_tuple; + // create_rest takes ownership and returns the catalog + catalog.create_rest(uri, props).await }, + catalog: *mut IcebergCatalog, uri: *const c_char, properties: *const PropertyEntry, properties_len: usize diff --git a/src/catalog.jl b/src/catalog.jl index a9d1f1d..1b1d729 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -130,14 +130,22 @@ catalog = catalog_create_rest("http://polaris:8181") ``` """ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{String,String}()) - response = CatalogResponse() + # Create an empty catalog (no authenticator) + catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid} + if catalog_ptr == C_NULL + throw(IcebergException("Failed to create empty catalog")) + end + # Initialize the catalog with REST connection # Convert properties dict to array of PropertyEntry structs property_entries = [PropertyEntry(pointer(k), pointer(v)) for (k, v) in properties] properties_len = length(property_entries) + response = CatalogResponse() + async_ccall(response, property_entries, properties) do handle @ccall rust_lib.iceberg_rest_catalog_create( + catalog_ptr::Ptr{Cvoid}, uri::Cstring, (properties_len > 0 ? pointer(property_entries) : C_NULL)::Ptr{PropertyEntry}, properties_len::Csize_t, @@ -176,13 +184,16 @@ catalog = catalog_create_rest(get_token, "http://polaris:8181") ``` """ function catalog_create_rest(authenticator::Function, uri::String; properties::Dict{String,String}=Dict{String,String}()) - # First create the base catalog without the authenticator - catalog = catalog_create_rest(uri; properties=properties) + # Step 1: Create an empty catalog + catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid} + if catalog_ptr == C_NULL + throw(IcebergException("Failed to create empty catalog")) + end - # Wrap the authenticator in a mutable container so we can get a stable pointer + # Step 2: Wrap the authenticator in a mutable container so we can get a stable pointer authenticator_ref = Ref(authenticator) - # Create the C callback function that wraps the Julia authenticator + # Step 3: Create the C callback function that wraps the Julia authenticator c_callback = @cfunction( $( function token_callback(user_data::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}})::Cint @@ -224,22 +235,43 @@ function catalog_create_rest(authenticator::Function, uri::String; properties::D (Ptr{Cvoid}, Ptr{Ptr{Cchar}}) ) - # Set the authenticator on the catalog using the raw pointer from the Catalog struct + # Step 4: Get user_data pointer to pass to the authenticator user_data = pointer_from_objref(authenticator_ref) + + # Step 5: Set the authenticator on the empty catalog BEFORE initializing REST result = @ccall rust_lib.iceberg_catalog_set_token_authenticator( - catalog.ptr::Ptr{Cvoid}, + catalog_ptr::Ptr{Cvoid}, c_callback::Ptr{Cvoid}, user_data::Ptr{Cvoid} )::Cint if result != 0 - free_catalog(catalog) + @ccall rust_lib.iceberg_catalog_free(catalog_ptr::Ptr{Cvoid})::Cvoid throw(IcebergException("Failed to set token authenticator")) end - # Store the authenticator_ref in the catalog struct to keep it alive - catalog.authenticator = authenticator_ref + # Step 6: Initialize the catalog with REST connection + # Convert properties dict to array of PropertyEntry structs + property_entries = [PropertyEntry(pointer(k), pointer(v)) for (k, v) in properties] + properties_len = length(property_entries) + + response = CatalogResponse() + + async_ccall(response, property_entries, properties) do handle + @ccall rust_lib.iceberg_rest_catalog_create( + catalog_ptr::Ptr{Cvoid}, + uri::Cstring, + (properties_len > 0 ? pointer(property_entries) : C_NULL)::Ptr{PropertyEntry}, + properties_len::Csize_t, + response::Ref{CatalogResponse}, + handle::Ptr{Cvoid} + )::Cint + end + + @throw_on_error(response, "catalog_create_rest", IcebergException) + # Create a Catalog struct that holds the catalog pointer and keeps authenticator alive + catalog = Catalog(response.catalog, authenticator_ref) return catalog end From b383f4b66add68639543df2bd686170c7fea5e12 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 14:08:21 +0100 Subject: [PATCH 14/30] . --- test/runtests.jl | 58 ++++++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 8f5ede5..dcd9856 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -674,30 +674,44 @@ end function authenticator() # Track number of actual token fetches fetch_count = Threads.Atomic{Int}(0) + # Cache the token - use Ref to store it safely + cached_token = Ref{Union{Nothing, String}}(nothing) + token_lock = Threads.ReentrantLock() function get_token_impl() - # Fetch access token using client credentials - credentials = base64encode("$client_id:$client_secret") - auth_header = "Basic $credentials" - body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" - - token_response = HTTP.post( - token_endpoint; - headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], - body=body, - status_exception=false - ) - - if token_response.status != 200 - error("Failed to fetch token: $(String(token_response.body))") - end + lock(token_lock) do + # Check if we have a cached token + if cached_token[] !== nothing + return cached_token[]::String + end + + # Fetch access token using client credentials + credentials = base64encode("$client_id:$client_secret") + auth_header = "Basic $credentials" + body = "grant_type=client_credentials&scope=PRINCIPAL_ROLE:ALL" - token_data = JSON.parse(String(token_response.body)) - token = token_data["access_token"] + token_response = HTTP.post( + token_endpoint; + headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], + body=body, + status_exception=false + ) + + if token_response.status != 200 + error("Failed to fetch token: $(String(token_response.body))") + end - # Increment fetch count - Threads.atomic_add!(fetch_count, 1) - return token + token_data = JSON.parse(String(token_response.body)) + token = token_data["access_token"] + + # Increment fetch count ONLY when actually fetching from server + Threads.atomic_add!(fetch_count, 1) + + # Cache the token for future use + cached_token[] = token + + return token + end end return get_token_impl, fetch_count @@ -749,10 +763,10 @@ end end println("✅ All TPCH tables verified to exist: $expected_tables") - # Verify that authenticator was called multiple times (fresh token each time) + # Verify that authenticator fetched the token at least once (subsequent calls use cached token) final_fetch_count = fetch_counter[] println("✅ Authenticator fetched token $final_fetch_count time(s)") - @test final_fetch_count >= 1 + @test final_fetch_count == 1 finally # Clean up if catalog != C_NULL From c1a3a986f4f0d186c912ef410fbe0cf537fb48f5 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 14:09:38 +0100 Subject: [PATCH 15/30] Format --- test/runtests.jl | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index dcd9856..d019dbf 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -595,7 +595,11 @@ end token_response = HTTP.post( token_endpoint; - headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], + headers=[ + "Authorization" => auth_header, + "Polaris-Realm" => realm, + "Content-Type" => "application/x-www-form-urlencoded" + ], body=body, status_exception=false ) @@ -692,7 +696,11 @@ end token_response = HTTP.post( token_endpoint; - headers=["Authorization" => auth_header, "Polaris-Realm" => realm, "Content-Type" => "application/x-www-form-urlencoded"], + headers=[ + "Authorization" => auth_header, + "Polaris-Realm" => realm, + "Content-Type" => "application/x-www-form-urlencoded" + ], body=body, status_exception=false ) From 47e4af04a1b3fcf8e8a153d3a67d1da149cc74eb Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 14:17:27 +0100 Subject: [PATCH 16/30] Cleanup --- iceberg_rust_ffi/src/catalog.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index f04706f..2f66376 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -77,7 +77,7 @@ impl CustomAuthenticator for FFITokenAuthenticator { /// Opaque catalog handle for FFI /// Holds a raw pointer to a RestCatalog allocated on the heap. -/// The catalog is owned by this struct and cleaned up when dropped. +/// Must be freed explicitly via iceberg_catalog_free() - not automatically dropped. /// Also stores the authenticator to allow setting it before catalog creation. pub struct IcebergCatalog { catalog: Option<*mut RestCatalog>, @@ -85,16 +85,6 @@ pub struct IcebergCatalog { authenticator: Option>, } -impl Drop for IcebergCatalog { - fn drop(&mut self) { - unsafe { - if let Some(catalog_ptr) = self.catalog { - let _ = Box::from_raw(catalog_ptr); - } - } - } -} - // SAFETY: The catalog pointer is owned exclusively by this struct. // Send and Sync are safe because: // 1. The RestCatalog is accessed only through this struct From a9a598521c4a9e83dec49a22838a6761ed5cc4bf Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 14:17:43 +0100 Subject: [PATCH 17/30] . --- iceberg_rust_ffi/src/catalog.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 2f66376..de05aed 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -85,11 +85,12 @@ pub struct IcebergCatalog { authenticator: Option>, } -// SAFETY: The catalog pointer is owned exclusively by this struct. +// SAFETY: The catalog pointer represents unshared ownership across FFI boundary. // Send and Sync are safe because: // 1. The RestCatalog is accessed only through this struct // 2. We enforce exclusive mutable access for operations that mutate (set_token_authenticator) // 3. The pointer is never shared or aliased from FFI +// 4. The struct is manually freed via iceberg_catalog_free(), not via Drop unsafe impl Send for IcebergCatalog {} unsafe impl Sync for IcebergCatalog {} From 5f92b7363b70ffe93215b5cf0d51550403ca9f45 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 22 Dec 2025 16:46:31 +0100 Subject: [PATCH 18/30] Extend test --- test/runtests.jl | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index d019dbf..3df8b67 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -722,10 +722,10 @@ end end end - return get_token_impl, fetch_count + return get_token_impl, fetch_count, cached_token end - auth_fn, fetch_counter = authenticator() + auth_fn, fetch_counter, cached_token_ref = authenticator() catalog = C_NULL try @@ -775,6 +775,40 @@ end final_fetch_count = fetch_counter[] println("✅ Authenticator fetched token $final_fetch_count time(s)") @test final_fetch_count == 1 + + # Test token invalidation and re-fetch + println("\nTesting token invalidation and re-fetch...") + + # Invalidate the cached token by setting it to an invalid value + cached_token_ref[] = "invalid_token_foo" + println("✅ Invalidated cached token") + + # Attempt to list namespaces with invalid token - should fail with 401 + println("Attempting to list namespaces with invalid token...") + error_occurred = false + error_msg = "" + try + root_namespaces = RustyIceberg.list_namespaces(catalog) + # If we get here, the test should fail + catch err + error_occurred = true + error_msg = string(err) + println("✅ Got expected error with invalid token: $(typeof(err))") + println(" Error message: $error_msg") + end + + # Verify that an error occurred when using invalid token + @test error_occurred + + # Check if error message contains indication of 401 or authentication failure + has_401 = contains(error_msg, "401") + has_unauthorized = contains(error_msg, "Unauthorized") || contains(error_msg, "unauthorized") + has_unexpected = contains(error_msg, "unexpected status code") + has_auth_error = has_401 || has_unauthorized || has_unexpected + + @test has_auth_error + + println("✅ Token invalidation test passed") finally # Clean up if catalog != C_NULL From 2d7b98a61da07d4959ad27c666e741de4831bf2f Mon Sep 17 00:00:00 2001 From: Gerald Berger <59661379+gbrgr@users.noreply.github.com> Date: Tue, 23 Dec 2025 15:06:16 +0100 Subject: [PATCH 19/30] Update test/runtests.jl Co-authored-by: Vukasin Stefanovic --- test/runtests.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/runtests.jl b/test/runtests.jl index 3df8b67..c625b4a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -771,7 +771,7 @@ end end println("✅ All TPCH tables verified to exist: $expected_tables") - # Verify that authenticator fetched the token at least once (subsequent calls use cached token) + # Verify that authenticator fetched the token exactly once (subsequent calls use cached token) final_fetch_count = fetch_counter[] println("✅ Authenticator fetched token $final_fetch_count time(s)") @test final_fetch_count == 1 From e5f6fc2d435c9ed58aeb7022c6037a30afb709eb Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 15:24:45 +0100 Subject: [PATCH 20/30] Implement option 1 --- iceberg_rust_ffi/src/catalog.rs | 28 ++++++---- src/catalog.jl | 97 +++++++++++++++++---------------- 2 files changed, 67 insertions(+), 58 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index de05aed..b58f8d4 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -20,24 +20,30 @@ use crate::PropertyEntry; /// Callback function type for custom token authentication from FFI /// /// The callback receives: -/// - user_data: opaque pointer to user context (e.g., Julia closure) +/// - auth_fn: opaque pointer to user context (e.g., Julia Ref to authenticator) /// - token_ptr: output pointer where the token string should be written /// +/// The callback is responsible for: +/// 1. Extracting the authenticator from auth_fn pointer +/// 2. Invoking the Julia authenticator function +/// 3. Allocating the token string with libc malloc +/// 4. Writing its pointer to token_ptr +/// /// Returns: -/// - 0 for success (token_ptr must point to a CString allocated with CString::into_raw) +/// - 0 for success (token_ptr must point to a C string allocated with libc malloc) /// - non-zero for error pub type CustomAuthenticatorCallback = - extern "C" fn(user_data: *mut c_void, token_ptr: *mut *mut c_char) -> i32; + extern "C" fn(auth_fn: *mut c_void, token_ptr: *mut *mut c_char) -> i32; -/// Rust implementation of CustomAuthenticator that calls a C callback with user_data +/// Rust implementation of CustomAuthenticator that calls a C callback with auth_fn pointer #[derive(Debug, Clone)] struct FFITokenAuthenticator { callback: CustomAuthenticatorCallback, - user_data: *mut c_void, + auth_fn: *mut c_void, } // SAFETY: We trust that the Julia callback is thread-safe. -// The user_data pointer is opaque and its thread-safety is the caller's responsibility. +// The auth_fn pointer is opaque and its thread-safety is the caller's responsibility. unsafe impl Send for FFITokenAuthenticator {} unsafe impl Sync for FFITokenAuthenticator {} @@ -46,7 +52,7 @@ impl CustomAuthenticator for FFITokenAuthenticator { async fn get_token(&self) -> iceberg::Result { let mut token_ptr: *mut c_char = std::ptr::null_mut(); - let result = (self.callback)(self.user_data, &mut token_ptr); + let result = (self.callback)(self.auth_fn, &mut token_ptr); if result != 0 { return Err(Error::new( @@ -129,11 +135,11 @@ impl IcebergCatalog { pub fn set_token_authenticator( &mut self, callback: CustomAuthenticatorCallback, - user_data: *mut c_void, + auth_fn: *mut c_void, ) -> Result<()> { let authenticator = Arc::new(FFITokenAuthenticator { callback, - user_data, + auth_fn, }); // Store the authenticator to be used when building the catalog @@ -552,7 +558,7 @@ export_runtime_op!( pub extern "C" fn iceberg_catalog_set_token_authenticator( catalog: *mut IcebergCatalog, callback: CustomAuthenticatorCallback, - user_data: *mut c_void, + auth_fn: *mut c_void, ) -> CResult { // Check for null catalog pointer if catalog.is_null() { @@ -561,7 +567,7 @@ pub extern "C" fn iceberg_catalog_set_token_authenticator( // SAFETY: catalog was checked to be non-null above. // The caller must ensure the catalog pointer remains valid for the duration of this call. - let result = unsafe { &mut *catalog }.set_token_authenticator(callback, user_data); + let result = unsafe { &mut *catalog }.set_token_authenticator(callback, auth_fn); match result { Ok(()) => CResult::Ok, diff --git a/src/catalog.jl b/src/catalog.jl index 1b1d729..93d7345 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -4,6 +4,41 @@ Catalog support for RustyIceberg.jl This module provides Julia wrappers for the REST catalog FFI functions. """ +# Token callback function that can be exported as C-callable +# This is a static function (no closure) that takes auth_fn and extracts the authenticator +function token_callback_impl(auth_fn::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}})::Cint + try + # Extract authenticator function from auth_fn pointer + auth_ref = unsafe_pointer_to_objref(auth_fn) + auth_fn_call = auth_ref[] + + # Call the authenticator to get the token + token_str = auth_fn_call()::String + + # Allocate C string using libc malloc and copy the token + token_bytes = Vector{UInt8}(token_str) + token_len = length(token_bytes) + c_str_ptr = @ccall malloc((token_len + 1)::Csize_t)::Ptr{Cchar} + + if c_str_ptr == C_NULL + return Cint(1) # Error: allocation failed + end + + # Copy the token bytes to the allocated memory + unsafe_copyto!(convert(Ptr{UInt8}, c_str_ptr), pointer(token_bytes), token_len) + # Add null terminator + unsafe_store!(convert(Ptr{UInt8}, c_str_ptr), UInt8(0), token_len + 1) + + # Write the pointer to the output parameter + unsafe_store!(token_ptr, c_str_ptr) + + return Cint(0) + catch e + # Return error code on exception + return Cint(1) + end +end + """ Catalog @@ -190,63 +225,23 @@ function catalog_create_rest(authenticator::Function, uri::String; properties::D throw(IcebergException("Failed to create empty catalog")) end - # Step 2: Wrap the authenticator in a mutable container so we can get a stable pointer + # Step 2: Wrap the authenticator in a Ref for stable memory address authenticator_ref = Ref(authenticator) - # Step 3: Create the C callback function that wraps the Julia authenticator - c_callback = @cfunction( - $( - function token_callback(user_data::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}})::Cint - try - # Get the authenticator function from user_data (cast back to its Ref) - # We don't annotate the type here to avoid type mismatch with closures - auth_ref = unsafe_pointer_to_objref(user_data) - auth_fn = auth_ref[] - - # Call the authenticator to get the token - token_str = auth_fn()::String - - # Allocate C string using libc malloc and copy the token - # This matches what Rust expects: a CString allocated with libc malloc - token_bytes = Vector{UInt8}(token_str) - token_len = length(token_bytes) - c_str_ptr = @ccall malloc((token_len + 1)::Csize_t)::Ptr{Cchar} - - if c_str_ptr == C_NULL - return Cint(1) # Error: allocation failed - end - - # Copy the token bytes to the allocated memory - unsafe_copyto!(convert(Ptr{UInt8}, c_str_ptr), pointer(token_bytes), token_len) - # Add null terminator - unsafe_store!(convert(Ptr{UInt8}, c_str_ptr), UInt8(0), token_len + 1) - - # Write the pointer to the output parameter - unsafe_store!(token_ptr, c_str_ptr) - - return Cint(0) - catch e - # Return error code on exception - return Cint(1) - end - end - ), - Cint, - (Ptr{Cvoid}, Ptr{Ptr{Cchar}}) - ) + # Step 3: Create C callback using the static token_callback_impl function + c_callback = @cfunction(token_callback_impl, Cint, (Ptr{Cvoid}, Ptr{Ptr{Cchar}})) - # Step 4: Get user_data pointer to pass to the authenticator - user_data = pointer_from_objref(authenticator_ref) + # Step 4: Get auth_fn pointer to pass to the authenticator + auth_fn = pointer_from_objref(authenticator_ref) # Step 5: Set the authenticator on the empty catalog BEFORE initializing REST result = @ccall rust_lib.iceberg_catalog_set_token_authenticator( catalog_ptr::Ptr{Cvoid}, c_callback::Ptr{Cvoid}, - user_data::Ptr{Cvoid} + auth_fn::Ptr{Cvoid} )::Cint if result != 0 - @ccall rust_lib.iceberg_catalog_free(catalog_ptr::Ptr{Cvoid})::Cvoid throw(IcebergException("Failed to set token authenticator")) end @@ -359,7 +354,15 @@ function list_tables(catalog::Catalog, namespace::Vector{String}) )::Cint end - @throw_on_error(response, "list_tables", IcebergException) + if response.result != 0 + error_msg = response.error_message != C_NULL ? unsafe_string(response.error_message) : "" + status_code = _extract_status_code_from_error_message(error_msg) + if status_code >= 0 + response.status_code = status_code + throw(CatalogException(response)) + end + throw(IcebergException(response_error_to_string(response, "list_tables"))) + end # Convert C string array to Julia strings tables = String[] From a22144b88fb57e89882dac7b66a6dded6c5cb578 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 15:34:25 +0100 Subject: [PATCH 21/30] PR comments --- Project.toml | 2 ++ iceberg_rust_ffi/src/catalog.rs | 9 +++------ src/RustyIceberg.jl | 1 + src/catalog.jl | 10 ++++++---- test/runtests.jl | 3 ++- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/Project.toml b/Project.toml index ae30467..f6eb74d 100644 --- a/Project.toml +++ b/Project.toml @@ -4,11 +4,13 @@ version = "0.5.3" [deps] Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" +FunctionWrappers = "069b7b12-0de2-55c6-9aab-29f3d0a68a2e" Preferences = "21216c6a-2e73-6563-6e65-726566657250" iceberg_rust_ffi_jll = "6bd5c94f-693c-53e3-983d-a09fad412f22" [compat] Arrow = "2.0" +FunctionWrappers = "1" Preferences = "1.3" iceberg_rust_ffi_jll = "0.7" julia = "1.10" diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index b58f8d4..f2b4a8e 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -33,7 +33,7 @@ use crate::PropertyEntry; /// - 0 for success (token_ptr must point to a C string allocated with libc malloc) /// - non-zero for error pub type CustomAuthenticatorCallback = - extern "C" fn(auth_fn: *mut c_void, token_ptr: *mut *mut c_char) -> i32; + unsafe extern "C" fn(auth_fn: *mut c_void, token_ptr: *mut *mut c_char) -> i32; /// Rust implementation of CustomAuthenticator that calls a C callback with auth_fn pointer #[derive(Debug, Clone)] @@ -52,7 +52,7 @@ impl CustomAuthenticator for FFITokenAuthenticator { async fn get_token(&self) -> iceberg::Result { let mut token_ptr: *mut c_char = std::ptr::null_mut(); - let result = (self.callback)(self.auth_fn, &mut token_ptr); + let result = unsafe { (self.callback)(self.auth_fn, &mut token_ptr) }; if result != 0 { return Err(Error::new( @@ -137,10 +137,7 @@ impl IcebergCatalog { callback: CustomAuthenticatorCallback, auth_fn: *mut c_void, ) -> Result<()> { - let authenticator = Arc::new(FFITokenAuthenticator { - callback, - auth_fn, - }); + let authenticator = Arc::new(FFITokenAuthenticator { callback, auth_fn }); // Store the authenticator to be used when building the catalog self.authenticator = Some(authenticator); diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index 77bd0fd..a9d9bf0 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -3,6 +3,7 @@ module RustyIceberg using Base: @kwdef, @lock using Base.Threads: Atomic using Arrow +using FunctionWrappers: FunctionWrapper using iceberg_rust_ffi_jll export Table, Scan, IncrementalScan, ArrowBatch, StaticConfig, ArrowStream diff --git a/src/catalog.jl b/src/catalog.jl index 93d7345..93c486b 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -195,12 +195,12 @@ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{S end """ - catalog_create_rest(authenticator::Function, uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog + catalog_create_rest(authenticator::FunctionWrapper{String,Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog Create a REST catalog connection with custom token authentication. # Arguments -- `authenticator::Function`: A callable that takes no arguments and returns a token string. +- `authenticator::FunctionWrapper{String,Tuple{}}`: A callable that takes no arguments and returns a token string. The function will be called whenever a new token is needed for authentication. - `uri::String`: URI of the Iceberg REST catalog server (e.g., "http://localhost:8181") - `properties::Dict{String,String}`: Optional key-value properties for catalog configuration. @@ -211,14 +211,16 @@ Create a REST catalog connection with custom token authentication. # Example ```julia +using FunctionWrappers: FunctionWrapper + function get_token() return ENV["ICEBERG_TOKEN"] end -catalog = catalog_create_rest(get_token, "http://polaris:8181") +catalog = catalog_create_rest(FunctionWrapper{String,Tuple{}}(get_token), "http://polaris:8181") ``` """ -function catalog_create_rest(authenticator::Function, uri::String; properties::Dict{String,String}=Dict{String,String}()) +function catalog_create_rest(authenticator::FunctionWrapper{String,Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}()) # Step 1: Create an empty catalog catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid} if catalog_ptr == C_NULL diff --git a/test/runtests.jl b/test/runtests.jl index c625b4a..7543530 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,6 +2,7 @@ using Test using RustyIceberg using DataFrames using Arrow +using FunctionWrappers: FunctionWrapper @testset "RustyIceberg.jl" begin @testset "Runtime Initialization" begin @@ -734,7 +735,7 @@ end props = Dict( "warehouse" => "warehouse" ) - catalog = RustyIceberg.catalog_create_rest(auth_fn, catalog_uri; properties=props) + catalog = RustyIceberg.catalog_create_rest(FunctionWrapper{String,Tuple{}}(auth_fn), catalog_uri; properties=props) @test catalog != C_NULL println("✅ Catalog created successfully with custom authenticator function") From 3b14cb7c53967ef84f59b4ef451d770ae06b4918 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 15:38:53 +0100 Subject: [PATCH 22/30] Guard against use after free --- src/catalog.jl | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/catalog.jl b/src/catalog.jl index 93c486b..95a232e 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -54,6 +54,25 @@ mutable struct Catalog authenticator::Union{Nothing, Ref} end +""" + _invalidate_catalog_ptr!(catalog_ptr_ref) + +Internal helper to prevent accidental reuse of a catalog pointer after `iceberg_rest_catalog_create`. + +The FFI function `iceberg_rest_catalog_create` takes ownership of the pointer via `Box::from_raw()`. +Reusing the same pointer would cause use-after-free. This function clears the pointer to make +accidental reuse obvious (would result in a null pointer error). + +# Arguments +- `catalog_ptr_ref`: A reference to the catalog pointer variable to invalidate +""" +function _invalidate_catalog_ptr!(catalog_ptr_ref) + # This is a simple safety guard - in normal usage each call gets a fresh pointer + # via iceberg_catalog_init(), but if someone reuses a pointer, they'll get C_NULL + # instead of undefined behavior + catalog_ptr_ref[] = C_NULL +end + # Constructor for simple catalogs without authenticator Catalog(ptr::Ptr{Cvoid}) = Catalog(ptr, nothing) @@ -191,6 +210,10 @@ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{S @throw_on_error(response, "catalog_create_rest", IcebergException) + # Invalidate the original pointer to prevent accidental reuse + # The FFI function took ownership via Box::from_raw() + _invalidate_catalog_ptr!(Ref(catalog_ptr)) + return Catalog(response.catalog, nothing) end @@ -267,6 +290,10 @@ function catalog_create_rest(authenticator::FunctionWrapper{String,Tuple{}}, uri @throw_on_error(response, "catalog_create_rest", IcebergException) + # Invalidate the original pointer to prevent accidental reuse + # The FFI function took ownership via Box::from_raw() + _invalidate_catalog_ptr!(Ref(catalog_ptr)) + # Create a Catalog struct that holds the catalog pointer and keeps authenticator alive catalog = Catalog(response.catalog, authenticator_ref) return catalog From b3a6277d0658aebc5f9c5d8aebd69b7ed98ea31a Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 15:41:37 +0100 Subject: [PATCH 23/30] Revert "Guard against use after free" This reverts commit 3b14cb7c53967ef84f59b4ef451d770ae06b4918. --- src/catalog.jl | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/src/catalog.jl b/src/catalog.jl index 95a232e..93c486b 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -54,25 +54,6 @@ mutable struct Catalog authenticator::Union{Nothing, Ref} end -""" - _invalidate_catalog_ptr!(catalog_ptr_ref) - -Internal helper to prevent accidental reuse of a catalog pointer after `iceberg_rest_catalog_create`. - -The FFI function `iceberg_rest_catalog_create` takes ownership of the pointer via `Box::from_raw()`. -Reusing the same pointer would cause use-after-free. This function clears the pointer to make -accidental reuse obvious (would result in a null pointer error). - -# Arguments -- `catalog_ptr_ref`: A reference to the catalog pointer variable to invalidate -""" -function _invalidate_catalog_ptr!(catalog_ptr_ref) - # This is a simple safety guard - in normal usage each call gets a fresh pointer - # via iceberg_catalog_init(), but if someone reuses a pointer, they'll get C_NULL - # instead of undefined behavior - catalog_ptr_ref[] = C_NULL -end - # Constructor for simple catalogs without authenticator Catalog(ptr::Ptr{Cvoid}) = Catalog(ptr, nothing) @@ -210,10 +191,6 @@ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{S @throw_on_error(response, "catalog_create_rest", IcebergException) - # Invalidate the original pointer to prevent accidental reuse - # The FFI function took ownership via Box::from_raw() - _invalidate_catalog_ptr!(Ref(catalog_ptr)) - return Catalog(response.catalog, nothing) end @@ -290,10 +267,6 @@ function catalog_create_rest(authenticator::FunctionWrapper{String,Tuple{}}, uri @throw_on_error(response, "catalog_create_rest", IcebergException) - # Invalidate the original pointer to prevent accidental reuse - # The FFI function took ownership via Box::from_raw() - _invalidate_catalog_ptr!(Ref(catalog_ptr)) - # Create a Catalog struct that holds the catalog pointer and keeps authenticator alive catalog = Catalog(response.catalog, authenticator_ref) return catalog From 8913974658c3a86763f7ca4aa5579273fd4ca17c Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 15:47:14 +0100 Subject: [PATCH 24/30] Use Box instead of mut pointer --- iceberg_rust_ffi/src/catalog.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index f2b4a8e..d55f181 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -82,20 +82,18 @@ impl CustomAuthenticator for FFITokenAuthenticator { } /// Opaque catalog handle for FFI -/// Holds a raw pointer to a RestCatalog allocated on the heap. -/// Must be freed explicitly via iceberg_catalog_free() - not automatically dropped. +/// Stores a RestCatalog instance wrapped in a Box for safe memory management. /// Also stores the authenticator to allow setting it before catalog creation. pub struct IcebergCatalog { - catalog: Option<*mut RestCatalog>, + catalog: Option>, /// Stores a pending authenticator to be applied before first use authenticator: Option>, } -// SAFETY: The catalog pointer represents unshared ownership across FFI boundary. -// Send and Sync are safe because: -// 1. The RestCatalog is accessed only through this struct +// SAFETY: Send and Sync are safe because: +// 1. The RestCatalog is accessed only through this struct via exclusive ownership (Box) // 2. We enforce exclusive mutable access for operations that mutate (set_token_authenticator) -// 3. The pointer is never shared or aliased from FFI +// 3. The struct represents unshared ownership across FFI boundary // 4. The struct is manually freed via iceberg_catalog_free(), not via Drop unsafe impl Send for IcebergCatalog {} unsafe impl Sync for IcebergCatalog {} @@ -126,7 +124,7 @@ impl IcebergCatalog { } let catalog = builder.load("rest", catalog_props).await?; - self.catalog = Some(Box::into_raw(Box::new(catalog))); + self.catalog = Some(Box::new(catalog)); Ok(self) } @@ -147,11 +145,12 @@ impl IcebergCatalog { /// Get a reference to the underlying RestCatalog. /// - /// SAFETY: Returns a reference valid only for the lifetime of self. - /// The caller must ensure the catalog is initialized before calling this. + /// Returns a reference to the catalog. + /// Panics if the catalog has not been initialized via create_rest. fn as_ref(&self) -> &RestCatalog { - // SAFETY: catalog is checked to be Some during create_rest. - unsafe { &*self.catalog.expect("catalog should be initialized") } + self.catalog + .as_ref() + .expect("catalog should be initialized") } /// Load a table by namespace and name From de16239ea938d94df4f835c126cb65718a1f5961 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 15:49:34 +0100 Subject: [PATCH 25/30] Undo CatalogException change --- src/catalog.jl | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/catalog.jl b/src/catalog.jl index 93c486b..e839a39 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -356,15 +356,7 @@ function list_tables(catalog::Catalog, namespace::Vector{String}) )::Cint end - if response.result != 0 - error_msg = response.error_message != C_NULL ? unsafe_string(response.error_message) : "" - status_code = _extract_status_code_from_error_message(error_msg) - if status_code >= 0 - response.status_code = status_code - throw(CatalogException(response)) - end - throw(IcebergException(response_error_to_string(response, "list_tables"))) - end + @throw_on_error(response, "list_tables", IcebergException) # Convert C string array to Julia strings tables = String[] From 2fbcc57d6c27883b3871593dead28ecd6a6753c2 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 15:57:19 +0100 Subject: [PATCH 26/30] Token caching --- iceberg_rust_ffi/src/catalog.rs | 65 +++++++++++++++++++++++++-------- src/catalog.jl | 50 ++++++++++++++++++------- test/runtests.jl | 2 +- 3 files changed, 88 insertions(+), 29 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index d55f181..fff0db7 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -6,7 +6,7 @@ use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, NamespaceIdent, TableId use iceberg_catalog_rest::{CustomAuthenticator, RestCatalog, RestCatalogBuilder}; use std::collections::HashMap; use std::ffi::{c_char, c_void, CString}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; // FFI exports use object_store_ffi::{ @@ -17,29 +17,40 @@ use object_store_ffi::{ use crate::util::{parse_c_string, parse_properties, parse_string_array}; use crate::PropertyEntry; -/// Callback function type for custom token authentication from FFI +/// Callback function type for custom token authentication from FFI with token caching support /// /// The callback receives: /// - auth_fn: opaque pointer to user context (e.g., Julia Ref to authenticator) -/// - token_ptr: output pointer where the token string should be written +/// - token_ptr: output pointer where the token string should be written (only if not reusing) +/// - reuse_token_ptr: output pointer for reuse flag (1 = reuse previous token, 0 = new token) /// /// The callback is responsible for: /// 1. Extracting the authenticator from auth_fn pointer -/// 2. Invoking the Julia authenticator function -/// 3. Allocating the token string with libc malloc -/// 4. Writing its pointer to token_ptr +/// 2. Invoking the Julia authenticator function which returns Union{String, Nothing}: +/// - String: new token to cache and use +/// - nothing: signal to reuse the previously cached token +/// 3. Setting reuse_token_ptr to indicate the result: +/// - Write 1 if Julia returned nothing (reuse the previous token) +/// - Write 0 if Julia returned a String (new token provided in token_ptr) +/// 4. If reuse flag is 0: allocate the token string with libc malloc and write to token_ptr /// /// Returns: -/// - 0 for success (token_ptr must point to a C string allocated with libc malloc) +/// - 0 for success /// - non-zero for error -pub type CustomAuthenticatorCallback = - unsafe extern "C" fn(auth_fn: *mut c_void, token_ptr: *mut *mut c_char) -> i32; +pub type CustomAuthenticatorCallback = unsafe extern "C" fn( + auth_fn: *mut c_void, + token_ptr: *mut *mut c_char, + reuse_token_ptr: *mut i32, +) -> i32; /// Rust implementation of CustomAuthenticator that calls a C callback with auth_fn pointer -#[derive(Debug, Clone)] +/// Supports token caching to avoid unnecessary copying when Julia returns the same token +#[derive(Clone)] struct FFITokenAuthenticator { callback: CustomAuthenticatorCallback, auth_fn: *mut c_void, + // Cached token to avoid copying when Julia signals to reuse + cached_token: Arc>>, } // SAFETY: We trust that the Julia callback is thread-safe. @@ -51,8 +62,9 @@ unsafe impl Sync for FFITokenAuthenticator {} impl CustomAuthenticator for FFITokenAuthenticator { async fn get_token(&self) -> iceberg::Result { let mut token_ptr: *mut c_char = std::ptr::null_mut(); + let mut reuse_token: i32 = 0; - let result = unsafe { (self.callback)(self.auth_fn, &mut token_ptr) }; + let result = unsafe { (self.callback)(self.auth_fn, &mut token_ptr, &mut reuse_token) }; if result != 0 { return Err(Error::new( @@ -61,6 +73,20 @@ impl CustomAuthenticator for FFITokenAuthenticator { )); } + // If Julia signals to reuse the previous token, return it + if reuse_token != 0 { + let cached = self.cached_token.lock().unwrap(); + if let Some(token) = cached.as_ref() { + return Ok(token.clone()); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Token authenticator requested to reuse previous token, but no cached token exists", + )); + } + } + + // Otherwise, handle the new token from Julia if token_ptr.is_null() { return Err(Error::new( ErrorKind::DataInvalid, @@ -69,15 +95,20 @@ impl CustomAuthenticator for FFITokenAuthenticator { } // SAFETY: The callback is responsible for ensuring token_ptr is a valid - // null-terminated C string that was allocated with CString::into_raw + // null-terminated C string that was allocated with libc malloc let token_cstring = unsafe { CString::from_raw(token_ptr) }; - token_cstring.into_string().map_err(|e| { + let token = token_cstring.into_string().map_err(|e| { Error::new( ErrorKind::DataInvalid, format!("Invalid UTF-8 in token: {}", e), ) - }) + })?; + + // Cache the new token for potential reuse + *self.cached_token.lock().unwrap() = Some(token.clone()); + + Ok(token) } } @@ -135,7 +166,11 @@ impl IcebergCatalog { callback: CustomAuthenticatorCallback, auth_fn: *mut c_void, ) -> Result<()> { - let authenticator = Arc::new(FFITokenAuthenticator { callback, auth_fn }); + let authenticator = Arc::new(FFITokenAuthenticator { + callback, + auth_fn, + cached_token: Arc::new(Mutex::new(None)), + }); // Store the authenticator to be used when building the catalog self.authenticator = Some(authenticator); diff --git a/src/catalog.jl b/src/catalog.jl index e839a39..423c3f4 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -4,16 +4,31 @@ Catalog support for RustyIceberg.jl This module provides Julia wrappers for the REST catalog FFI functions. """ -# Token callback function that can be exported as C-callable +# Token callback function that can be exported as C-callable with token caching support # This is a static function (no closure) that takes auth_fn and extracts the authenticator -function token_callback_impl(auth_fn::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}})::Cint +function token_callback_impl(auth_fn::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}}, reuse_token_ptr::Ptr{Cint})::Cint try # Extract authenticator function from auth_fn pointer auth_ref = unsafe_pointer_to_objref(auth_fn) auth_fn_call = auth_ref[] # Call the authenticator to get the token - token_str = auth_fn_call()::String + # The authenticator should return either: + # - A String with the token to use and cache + # - nothing to signal reuse of the previously cached token + token_result = auth_fn_call() + + # Handle nothing (signal to reuse cached token) + if token_result === nothing + unsafe_store!(reuse_token_ptr, Cint(1)) + return Cint(0) + end + + # Ensure we got a String + token_str = token_result::String + + # Signal that this is a new token (not reusing) + unsafe_store!(reuse_token_ptr, Cint(0)) # Allocate C string using libc malloc and copy the token token_bytes = Vector{UInt8}(token_str) @@ -33,7 +48,7 @@ function token_callback_impl(auth_fn::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}})::C unsafe_store!(token_ptr, c_str_ptr) return Cint(0) - catch e + catch # Return error code on exception return Cint(1) end @@ -195,13 +210,17 @@ function catalog_create_rest(uri::String; properties::Dict{String,String}=Dict{S end """ - catalog_create_rest(authenticator::FunctionWrapper{String,Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog + catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}())::Catalog -Create a REST catalog connection with custom token authentication. +Create a REST catalog connection with custom token authentication and token caching support. # Arguments -- `authenticator::FunctionWrapper{String,Tuple{}}`: A callable that takes no arguments and returns a token string. - The function will be called whenever a new token is needed for authentication. +- `authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}`: A callable that takes no arguments and returns either: + - A `String` containing the token: Rust will cache it and use it for authentication + - `nothing`: Signal to reuse the previously cached token (avoids malloc, memory copy, and free operations) + + **Token Caching Optimization**: Returning `nothing` allows efficient token reuse without unnecessary copying. + - `uri::String`: URI of the Iceberg REST catalog server (e.g., "http://localhost:8181") - `properties::Dict{String,String}`: Optional key-value properties for catalog configuration. By default (empty dict), no additional properties are passed. @@ -209,18 +228,23 @@ Create a REST catalog connection with custom token authentication. # Returns - A `Catalog` handle for use in other catalog operations -# Example +# Example with Token Caching ```julia using FunctionWrappers: FunctionWrapper function get_token() - return ENV["ICEBERG_TOKEN"] + # Check if we need a new token, or if we can reuse the cached one + if needs_refresh() + return fetch_new_token() # Returns String with token + else + return nothing # Signal to reuse cached token (efficient!) + end end -catalog = catalog_create_rest(FunctionWrapper{String,Tuple{}}(get_token), "http://polaris:8181") +catalog = catalog_create_rest(FunctionWrapper{Union{String,Nothing},Tuple{}}(get_token), "http://polaris:8181") ``` """ -function catalog_create_rest(authenticator::FunctionWrapper{String,Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}()) +function catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing},Tuple{}}, uri::String; properties::Dict{String,String}=Dict{String,String}()) # Step 1: Create an empty catalog catalog_ptr = @ccall rust_lib.iceberg_catalog_init()::Ptr{Cvoid} if catalog_ptr == C_NULL @@ -231,7 +255,7 @@ function catalog_create_rest(authenticator::FunctionWrapper{String,Tuple{}}, uri authenticator_ref = Ref(authenticator) # Step 3: Create C callback using the static token_callback_impl function - c_callback = @cfunction(token_callback_impl, Cint, (Ptr{Cvoid}, Ptr{Ptr{Cchar}})) + c_callback = @cfunction(token_callback_impl, Cint, (Ptr{Cvoid}, Ptr{Ptr{Cchar}}, Ptr{Cint})) # Step 4: Get auth_fn pointer to pass to the authenticator auth_fn = pointer_from_objref(authenticator_ref) diff --git a/test/runtests.jl b/test/runtests.jl index 7543530..d510c47 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -735,7 +735,7 @@ end props = Dict( "warehouse" => "warehouse" ) - catalog = RustyIceberg.catalog_create_rest(FunctionWrapper{String,Tuple{}}(auth_fn), catalog_uri; properties=props) + catalog = RustyIceberg.catalog_create_rest(FunctionWrapper{Union{String,Nothing},Tuple{}}(auth_fn), catalog_uri; properties=props) @test catalog != C_NULL println("✅ Catalog created successfully with custom authenticator function") From ebd704e754aa2145a7a2dd35db98a51c659216cd Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 16:09:25 +0100 Subject: [PATCH 27/30] Fix segfault --- iceberg_rust_ffi/src/catalog.rs | 9 +++++---- src/catalog.jl | 5 +++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index fff0db7..5d60b49 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -45,7 +45,7 @@ pub type CustomAuthenticatorCallback = unsafe extern "C" fn( /// Rust implementation of CustomAuthenticator that calls a C callback with auth_fn pointer /// Supports token caching to avoid unnecessary copying when Julia returns the same token -#[derive(Clone)] +#[derive(Clone, Debug)] struct FFITokenAuthenticator { callback: CustomAuthenticatorCallback, auth_fn: *mut c_void, @@ -62,9 +62,10 @@ unsafe impl Sync for FFITokenAuthenticator {} impl CustomAuthenticator for FFITokenAuthenticator { async fn get_token(&self) -> iceberg::Result { let mut token_ptr: *mut c_char = std::ptr::null_mut(); - let mut reuse_token: i32 = 0; + let mut reuse_token_box = Box::new(0i32); + let reuse_token_ptr = reuse_token_box.as_mut() as *mut i32; - let result = unsafe { (self.callback)(self.auth_fn, &mut token_ptr, &mut reuse_token) }; + let result = unsafe { (self.callback)(self.auth_fn, &mut token_ptr, reuse_token_ptr) }; if result != 0 { return Err(Error::new( @@ -74,7 +75,7 @@ impl CustomAuthenticator for FFITokenAuthenticator { } // If Julia signals to reuse the previous token, return it - if reuse_token != 0 { + if *reuse_token_box != 0 { let cached = self.cached_token.lock().unwrap(); if let Some(token) = cached.as_ref() { return Ok(token.clone()); diff --git a/src/catalog.jl b/src/catalog.jl index 423c3f4..901b93b 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -8,6 +8,11 @@ This module provides Julia wrappers for the REST catalog FFI functions. # This is a static function (no closure) that takes auth_fn and extracts the authenticator function token_callback_impl(auth_fn::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}}, reuse_token_ptr::Ptr{Cint})::Cint try + # Check that output pointers are valid + if token_ptr == C_NULL || reuse_token_ptr == C_NULL + return Cint(1) # Error: invalid output pointers + end + # Extract authenticator function from auth_fn pointer auth_ref = unsafe_pointer_to_objref(auth_fn) auth_fn_call = auth_ref[] From 9cff54d0739bf83d2f0e938b61c4eae402d3d1a9 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 16:31:22 +0100 Subject: [PATCH 28/30] Let Rust allocate --- iceberg_rust_ffi/src/catalog.rs | 51 +++++++++++++++++++++------------ src/catalog.jl | 25 ++++++---------- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 5d60b49..0f4cba4 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, NamespaceIdent, TableIdent}; use iceberg_catalog_rest::{CustomAuthenticator, RestCatalog, RestCatalogBuilder}; use std::collections::HashMap; -use std::ffi::{c_char, c_void, CString}; +use std::ffi::{c_char, c_void}; use std::sync::{Arc, Mutex}; // FFI exports @@ -21,7 +21,8 @@ use crate::PropertyEntry; /// /// The callback receives: /// - auth_fn: opaque pointer to user context (e.g., Julia Ref to authenticator) -/// - token_ptr: output pointer where the token string should be written (only if not reusing) +/// - token_data_ptr: output pointer where token string bytes pointer should be written +/// - token_len_ptr: output pointer where token string length should be written /// - reuse_token_ptr: output pointer for reuse flag (1 = reuse previous token, 0 = new token) /// /// The callback is responsible for: @@ -31,15 +32,18 @@ use crate::PropertyEntry; /// - nothing: signal to reuse the previously cached token /// 3. Setting reuse_token_ptr to indicate the result: /// - Write 1 if Julia returned nothing (reuse the previous token) -/// - Write 0 if Julia returned a String (new token provided in token_ptr) -/// 4. If reuse flag is 0: allocate the token string with libc malloc and write to token_ptr +/// - Write 0 if Julia returned a String (new token data provided) +/// 4. If reuse flag is 0: write token_data_ptr (pointer to bytes) and token_len_ptr (length) +/// Note: Julia is responsible for keeping the data valid for the duration of the call. +/// Rust will copy the data immediately. /// /// Returns: /// - 0 for success /// - non-zero for error pub type CustomAuthenticatorCallback = unsafe extern "C" fn( auth_fn: *mut c_void, - token_ptr: *mut *mut c_char, + token_data_ptr: *mut *mut c_char, + token_len_ptr: *mut usize, reuse_token_ptr: *mut i32, ) -> i32; @@ -61,11 +65,19 @@ unsafe impl Sync for FFITokenAuthenticator {} #[async_trait] impl CustomAuthenticator for FFITokenAuthenticator { async fn get_token(&self) -> iceberg::Result { - let mut token_ptr: *mut c_char = std::ptr::null_mut(); + let mut token_data: *mut c_char = std::ptr::null_mut(); + let mut token_len: usize = 0; let mut reuse_token_box = Box::new(0i32); let reuse_token_ptr = reuse_token_box.as_mut() as *mut i32; - let result = unsafe { (self.callback)(self.auth_fn, &mut token_ptr, reuse_token_ptr) }; + let result = unsafe { + (self.callback)( + self.auth_fn, + &mut token_data, + &mut token_len, + reuse_token_ptr + ) + }; if result != 0 { return Err(Error::new( @@ -88,23 +100,24 @@ impl CustomAuthenticator for FFITokenAuthenticator { } // Otherwise, handle the new token from Julia - if token_ptr.is_null() { + if token_data.is_null() || token_len == 0 { return Err(Error::new( ErrorKind::DataInvalid, - "Token authenticator returned null pointer", + "Token authenticator returned null pointer or zero length", )); } - // SAFETY: The callback is responsible for ensuring token_ptr is a valid - // null-terminated C string that was allocated with libc malloc - let token_cstring = unsafe { CString::from_raw(token_ptr) }; - - let token = token_cstring.into_string().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid UTF-8 in token: {}", e), - ) - })?; + // SAFETY: The callback is responsible for ensuring token_data points to valid + // UTF-8 bytes with length token_len. Rust will copy the data immediately. + let token = unsafe { + let slice = std::slice::from_raw_parts(token_data as *const u8, token_len); + String::from_utf8(slice.to_vec()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid UTF-8 in token: {}", e), + ) + })? + }; // Cache the new token for potential reuse *self.cached_token.lock().unwrap() = Some(token.clone()); diff --git a/src/catalog.jl b/src/catalog.jl index 901b93b..db14f14 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -6,10 +6,10 @@ This module provides Julia wrappers for the REST catalog FFI functions. # Token callback function that can be exported as C-callable with token caching support # This is a static function (no closure) that takes auth_fn and extracts the authenticator -function token_callback_impl(auth_fn::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}}, reuse_token_ptr::Ptr{Cint})::Cint +function token_callback_impl(auth_fn::Ptr{Cvoid}, token_data_ptr::Ptr{Ptr{Cchar}}, token_len_ptr::Ptr{Csize_t}, reuse_token_ptr::Ptr{Cint})::Cint try # Check that output pointers are valid - if token_ptr == C_NULL || reuse_token_ptr == C_NULL + if token_data_ptr == C_NULL || token_len_ptr == C_NULL || reuse_token_ptr == C_NULL return Cint(1) # Error: invalid output pointers end @@ -35,22 +35,15 @@ function token_callback_impl(auth_fn::Ptr{Cvoid}, token_ptr::Ptr{Ptr{Cchar}}, re # Signal that this is a new token (not reusing) unsafe_store!(reuse_token_ptr, Cint(0)) - # Allocate C string using libc malloc and copy the token + # Get token bytes and write data pointer and length to output parameters + # The data will be kept alive by the token_str variable until the callback returns token_bytes = Vector{UInt8}(token_str) token_len = length(token_bytes) - c_str_ptr = @ccall malloc((token_len + 1)::Csize_t)::Ptr{Cchar} + token_data = pointer(token_bytes) - if c_str_ptr == C_NULL - return Cint(1) # Error: allocation failed - end - - # Copy the token bytes to the allocated memory - unsafe_copyto!(convert(Ptr{UInt8}, c_str_ptr), pointer(token_bytes), token_len) - # Add null terminator - unsafe_store!(convert(Ptr{UInt8}, c_str_ptr), UInt8(0), token_len + 1) - - # Write the pointer to the output parameter - unsafe_store!(token_ptr, c_str_ptr) + # Write the pointer and length to the output parameters + unsafe_store!(token_data_ptr, token_data) + unsafe_store!(token_len_ptr, Csize_t(token_len)) return Cint(0) catch @@ -260,7 +253,7 @@ function catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing authenticator_ref = Ref(authenticator) # Step 3: Create C callback using the static token_callback_impl function - c_callback = @cfunction(token_callback_impl, Cint, (Ptr{Cvoid}, Ptr{Ptr{Cchar}}, Ptr{Cint})) + c_callback = @cfunction(token_callback_impl, Cint, (Ptr{Cvoid}, Ptr{Ptr{Cchar}}, Ptr{Csize_t}, Ptr{Cint})) # Step 4: Get auth_fn pointer to pass to the authenticator auth_fn = pointer_from_objref(authenticator_ref) From dd91983c07b9a4f604d94933f6f2e721c54c5c0e Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 16:34:07 +0100 Subject: [PATCH 29/30] Format --- iceberg_rust_ffi/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 0f4cba4..25ba077 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -75,7 +75,7 @@ impl CustomAuthenticator for FFITokenAuthenticator { self.auth_fn, &mut token_data, &mut token_len, - reuse_token_ptr + reuse_token_ptr, ) }; From 44fd8512aa99fb096461f75441efa40f0b6e0794 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 23 Dec 2025 16:47:41 +0100 Subject: [PATCH 30/30] Make ccallable --- src/catalog.jl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/catalog.jl b/src/catalog.jl index db14f14..60d9be6 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -6,7 +6,12 @@ This module provides Julia wrappers for the REST catalog FFI functions. # Token callback function that can be exported as C-callable with token caching support # This is a static function (no closure) that takes auth_fn and extracts the authenticator -function token_callback_impl(auth_fn::Ptr{Cvoid}, token_data_ptr::Ptr{Ptr{Cchar}}, token_len_ptr::Ptr{Csize_t}, reuse_token_ptr::Ptr{Cint})::Cint +Base.@ccallable function iceberg_token_callback_impl( + auth_fn::Ptr{Cvoid}, + token_data_ptr::Ptr{Ptr{Cchar}}, + token_len_ptr::Ptr{Csize_t}, + reuse_token_ptr::Ptr{Cint} +)::Cint try # Check that output pointers are valid if token_data_ptr == C_NULL || token_len_ptr == C_NULL || reuse_token_ptr == C_NULL @@ -253,7 +258,7 @@ function catalog_create_rest(authenticator::FunctionWrapper{Union{String,Nothing authenticator_ref = Ref(authenticator) # Step 3: Create C callback using the static token_callback_impl function - c_callback = @cfunction(token_callback_impl, Cint, (Ptr{Cvoid}, Ptr{Ptr{Cchar}}, Ptr{Csize_t}, Ptr{Cint})) + c_callback = @cfunction(iceberg_token_callback_impl, Cint, (Ptr{Cvoid}, Ptr{Ptr{Cchar}}, Ptr{Csize_t}, Ptr{Cint})) # Step 4: Get auth_fn pointer to pass to the authenticator auth_fn = pointer_from_objref(authenticator_ref)