diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 5fbd003..8574531 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1523,7 +1523,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=34829106fbb6ef94785edd99f784ea767a4aae44#34829106fbb6ef94785edd99f784ea767a4aae44" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378#aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378" dependencies = [ "anyhow", "apache-avro", @@ -1578,7 +1578,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=34829106fbb6ef94785edd99f784ea767a4aae44#34829106fbb6ef94785edd99f784ea767a4aae44" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378#aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378" dependencies = [ "async-trait", "chrono", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 37776ea..0a7ed13 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.7.1" +version = "0.7.2" edition = "2021" [lib] @@ -12,8 +12,8 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44", features = ["storage-azdls"] } -iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378", features = ["storage-azdls"] } +iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 25ba077..45a32bc 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -215,6 +215,22 @@ impl IcebergCatalog { Ok(IcebergTable { table }) } + /// Load a table by namespace and name with vended credentials + pub async fn load_table_with_credentials( + &self, + namespace_parts: Vec, + table_name: String, + ) -> Result { + let namespace = NamespaceIdent::from_vec(namespace_parts)?; + let table_ident = TableIdent::new(namespace, table_name); + let table = self + .as_ref() + .load_table_with_credentials(&table_ident) + .await?; + + Ok(IcebergTable { table }) + } + /// List tables in a namespace pub async fn list_tables(&self, namespace_parts: Vec) -> Result> { let namespace = NamespaceIdent::from_vec(namespace_parts)?; @@ -520,6 +536,32 @@ export_runtime_op!( table_name: *const c_char ); +// Load a table from the catalog with vended credentials +export_runtime_op!( + iceberg_catalog_load_table_with_credentials, + crate::IcebergTableResponse, + || { + if catalog.is_null() { + return Err(anyhow::anyhow!("Null catalog pointer provided")); + } + + let namespace_parts = parse_string_array(namespace_parts_ptr, namespace_parts_len)?; + let table_name = parse_c_string(table_name, "table_name")?; + let catalog_ref = unsafe { &*catalog }; + + Ok((catalog_ref, namespace_parts, table_name)) + }, + result_tuple, + async { + let (catalog_ref, namespace_parts, table_name) = result_tuple; + catalog_ref.load_table_with_credentials(namespace_parts, table_name).await + }, + catalog: *mut IcebergCatalog, + namespace_parts_ptr: *const *const c_char, + namespace_parts_len: usize, + table_name: *const c_char +); + // List tables in a namespace export_runtime_op!( iceberg_catalog_list_tables, diff --git a/src/catalog.jl b/src/catalog.jl index 60d9be6..c944956 100644 --- a/src/catalog.jl +++ b/src/catalog.jl @@ -309,7 +309,7 @@ function free_catalog(catalog::Catalog) end """ - load_table(catalog::Catalog, namespace::Vector{String}, table_name::String)::Table + load_table(catalog::Catalog, namespace::Vector{String}, table_name::String; load_credentials::Bool=false)::Table Load a table from a catalog by namespace and name. @@ -317,31 +317,48 @@ Load a table from a catalog by namespace and name. - `catalog::Catalog`: The catalog handle - `namespace::Vector{String}`: Namespace parts (e.g., ["warehouse", "orders"]) - `table_name::String`: The table name +- `load_credentials::Bool=false`: If true, fetches vended credentials from the catalog server # Returns - A `Table` handle for use in scan operations # Example ```julia +# Load without credentials (default) table = load_table(catalog, ["warehouse", "orders"], "customers") + +# Load with vended credentials +table = load_table(catalog, ["warehouse", "orders"], "customers"; load_credentials=true) ``` """ -function load_table(catalog::Catalog, namespace::Vector{String}, table_name::String) +function load_table(catalog::Catalog, namespace::Vector{String}, table_name::String; load_credentials::Bool=false) response = TableResponse() # Convert namespace to array of C strings namespace_ptrs = [pointer(part) for part in namespace] namespace_len = length(namespace) + namespace_ptrs_ptr = (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL) async_ccall(response, namespace, namespace_ptrs) do handle - @ccall rust_lib.iceberg_catalog_load_table( - catalog.ptr::Ptr{Cvoid}, - (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}}, - namespace_len::Csize_t, - table_name::Cstring, - response::Ref{TableResponse}, - handle::Ptr{Cvoid} - )::Cint + if load_credentials + @ccall rust_lib.iceberg_catalog_load_table_with_credentials( + catalog.ptr::Ptr{Cvoid}, + namespace_ptrs_ptr::Ptr{Ptr{Cchar}}, + namespace_len::Csize_t, + table_name::Cstring, + response::Ref{TableResponse}, + handle::Ptr{Cvoid} + )::Cint + else + @ccall rust_lib.iceberg_catalog_load_table( + catalog.ptr::Ptr{Cvoid}, + namespace_ptrs_ptr::Ptr{Ptr{Cchar}}, + namespace_len::Csize_t, + table_name::Cstring, + response::Ref{TableResponse}, + handle::Ptr{Cvoid} + )::Cint + end end @throw_on_error(response, "catalog_load_table", IcebergException) diff --git a/test/runtests.jl b/test/runtests.jl index d510c47..e0e5f9e 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -888,9 +888,6 @@ end RustyIceberg.free_batch(batch_ptr) end end - - catch e - rethrow(e) finally # Clean up all resources in reverse order if stream != C_NULL @@ -911,6 +908,93 @@ end println("✅ Catalog table loading tests completed!") end +@testset "Catalog Table Loading with Credentials" begin + println("Testing catalog table loading with vended credentials...") + + catalog_uri = "http://localhost:8181/api/catalog" + + catalog = C_NULL + table = C_NULL + scan = C_NULL + stream = C_NULL + batch = nothing + + try + # Create catalog connection WITHOUT S3 credentials + # When using load_credentials=true, the catalog will provide vended credentials + props = Dict( + "credential" => "root:s3cr3t", + "scope" => "PRINCIPAL_ROLE:ALL", + "warehouse" => "warehouse", + # Note: We include s3.endpoint as we would get the domain name `minio` otherwise. + "s3.endpoint" => "http://localhost:9000", + "s3.region" => "us-east-1" + ) + catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props) + @test catalog != C_NULL + println("✅ Catalog created successfully") + + # Load the customer table with vended credentials + # The catalog will provide short-lived S3 credentials + println("Attempting to load customer table from tpch.sf01 with vended credentials...") + table = RustyIceberg.load_table(catalog, ["tpch.sf01"], "customer"; load_credentials=true) + @test table != C_NULL + println("✅ Customer table loaded successfully with vended credentials") + + # Create a scan on the loaded table + println("Creating scan on loaded customer table...") + scan = RustyIceberg.new_scan(table) + @test scan != C_NULL + println("✅ Scan created successfully on loaded table") + + # Select specific columns + println("Selecting specific columns from customer table...") + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name", "c_nationkey"]) + println("✅ Column selection completed") + + # Execute the scan + println("Executing scan on loaded customer table...") + stream = RustyIceberg.scan!(scan) + @test stream != C_NULL + println("✅ Scan executed successfully") + + # Read the first batch to verify data + println("Reading first batch from loaded customer table...") + batch_ptr = RustyIceberg.next_batch(stream) + + if batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + if batch.data != C_NULL && batch.length > 0 + println("✅ Successfully read first batch with $(batch.length) bytes of Arrow IPC data") + + # Verify we got actual data + @test batch.length > 0 + println("✅ Batch contains valid Arrow data from catalog-loaded customer table with vended credentials") + + # Clean up the batch + RustyIceberg.free_batch(batch_ptr) + end + end + finally + # Clean up all resources in reverse order + if stream != C_NULL + RustyIceberg.free_stream(stream) + end + if scan != C_NULL + RustyIceberg.free_scan!(scan) + end + if table != C_NULL + RustyIceberg.free_table(table) + end + if catalog != C_NULL + RustyIceberg.free_catalog(catalog) + end + println("✅ All resources cleaned up successfully") + end + + println("✅ Catalog table loading with credentials tests completed!") +end + @testset "Catalog Incremental Scan" begin println("Testing catalog incremental scan...") @@ -1019,9 +1103,6 @@ end println("✅ Successfully read from catalog-loaded incremental scan") println(" - Inserts batches: $inserts_count, total rows: $total_inserts") println(" - Deletes batches: $deletes_count, total rows: $total_deletes") - - catch e - rethrow(e) finally # Clean up all resources in reverse order if inserts_stream != C_NULL