Skip to content

Commit 9d3e9d3

Browse files
authored
Add support for vending credentials via the load table call (#41)
- Includes added support for vending storage credentials via load table calls
1 parent f6b0bcb commit 9d3e9d3

File tree

5 files changed

+161
-21
lines changed

5 files changed

+161
-21
lines changed

iceberg_rust_ffi/Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iceberg_rust_ffi/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "iceberg_rust_ffi"
3-
version = "0.7.1"
3+
version = "0.7.2"
44
edition = "2021"
55

66
[lib]
@@ -12,8 +12,8 @@ default = ["julia"]
1212
julia = []
1313

1414
[dependencies]
15-
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44", features = ["storage-azdls"] }
16-
iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44" }
15+
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378", features = ["storage-azdls"] }
16+
iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378" }
1717
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
1818
tokio = { version = "1.0", features = ["full"] }
1919
futures = "0.3"

iceberg_rust_ffi/src/catalog.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,22 @@ impl IcebergCatalog {
215215
Ok(IcebergTable { table })
216216
}
217217

218+
/// Load a table by namespace and name with vended credentials
219+
pub async fn load_table_with_credentials(
220+
&self,
221+
namespace_parts: Vec<String>,
222+
table_name: String,
223+
) -> Result<IcebergTable> {
224+
let namespace = NamespaceIdent::from_vec(namespace_parts)?;
225+
let table_ident = TableIdent::new(namespace, table_name);
226+
let table = self
227+
.as_ref()
228+
.load_table_with_credentials(&table_ident)
229+
.await?;
230+
231+
Ok(IcebergTable { table })
232+
}
233+
218234
/// List tables in a namespace
219235
pub async fn list_tables(&self, namespace_parts: Vec<String>) -> Result<Vec<String>> {
220236
let namespace = NamespaceIdent::from_vec(namespace_parts)?;
@@ -520,6 +536,32 @@ export_runtime_op!(
520536
table_name: *const c_char
521537
);
522538

539+
// Load a table from the catalog with vended credentials
540+
export_runtime_op!(
541+
iceberg_catalog_load_table_with_credentials,
542+
crate::IcebergTableResponse,
543+
|| {
544+
if catalog.is_null() {
545+
return Err(anyhow::anyhow!("Null catalog pointer provided"));
546+
}
547+
548+
let namespace_parts = parse_string_array(namespace_parts_ptr, namespace_parts_len)?;
549+
let table_name = parse_c_string(table_name, "table_name")?;
550+
let catalog_ref = unsafe { &*catalog };
551+
552+
Ok((catalog_ref, namespace_parts, table_name))
553+
},
554+
result_tuple,
555+
async {
556+
let (catalog_ref, namespace_parts, table_name) = result_tuple;
557+
catalog_ref.load_table_with_credentials(namespace_parts, table_name).await
558+
},
559+
catalog: *mut IcebergCatalog,
560+
namespace_parts_ptr: *const *const c_char,
561+
namespace_parts_len: usize,
562+
table_name: *const c_char
563+
);
564+
523565
// List tables in a namespace
524566
export_runtime_op!(
525567
iceberg_catalog_list_tables,

src/catalog.jl

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -309,39 +309,56 @@ function free_catalog(catalog::Catalog)
309309
end
310310

311311
"""
312-
load_table(catalog::Catalog, namespace::Vector{String}, table_name::String)::Table
312+
load_table(catalog::Catalog, namespace::Vector{String}, table_name::String; load_credentials::Bool=false)::Table
313313
314314
Load a table from a catalog by namespace and name.
315315
316316
# Arguments
317317
- `catalog::Catalog`: The catalog handle
318318
- `namespace::Vector{String}`: Namespace parts (e.g., ["warehouse", "orders"])
319319
- `table_name::String`: The table name
320+
- `load_credentials::Bool=false`: If true, fetches vended credentials from the catalog server
320321
321322
# Returns
322323
- A `Table` handle for use in scan operations
323324
324325
# Example
325326
```julia
327+
# Load without credentials (default)
326328
table = load_table(catalog, ["warehouse", "orders"], "customers")
329+
330+
# Load with vended credentials
331+
table = load_table(catalog, ["warehouse", "orders"], "customers"; load_credentials=true)
327332
```
328333
"""
329-
function load_table(catalog::Catalog, namespace::Vector{String}, table_name::String)
334+
function load_table(catalog::Catalog, namespace::Vector{String}, table_name::String; load_credentials::Bool=false)
330335
response = TableResponse()
331336

332337
# Convert namespace to array of C strings
333338
namespace_ptrs = [pointer(part) for part in namespace]
334339
namespace_len = length(namespace)
340+
namespace_ptrs_ptr = (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)
335341

336342
async_ccall(response, namespace, namespace_ptrs) do handle
337-
@ccall rust_lib.iceberg_catalog_load_table(
338-
catalog.ptr::Ptr{Cvoid},
339-
(namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}},
340-
namespace_len::Csize_t,
341-
table_name::Cstring,
342-
response::Ref{TableResponse},
343-
handle::Ptr{Cvoid}
344-
)::Cint
343+
if load_credentials
344+
@ccall rust_lib.iceberg_catalog_load_table_with_credentials(
345+
catalog.ptr::Ptr{Cvoid},
346+
namespace_ptrs_ptr::Ptr{Ptr{Cchar}},
347+
namespace_len::Csize_t,
348+
table_name::Cstring,
349+
response::Ref{TableResponse},
350+
handle::Ptr{Cvoid}
351+
)::Cint
352+
else
353+
@ccall rust_lib.iceberg_catalog_load_table(
354+
catalog.ptr::Ptr{Cvoid},
355+
namespace_ptrs_ptr::Ptr{Ptr{Cchar}},
356+
namespace_len::Csize_t,
357+
table_name::Cstring,
358+
response::Ref{TableResponse},
359+
handle::Ptr{Cvoid}
360+
)::Cint
361+
end
345362
end
346363

347364
@throw_on_error(response, "catalog_load_table", IcebergException)

test/runtests.jl

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -888,9 +888,6 @@ end
888888
RustyIceberg.free_batch(batch_ptr)
889889
end
890890
end
891-
892-
catch e
893-
rethrow(e)
894891
finally
895892
# Clean up all resources in reverse order
896893
if stream != C_NULL
@@ -911,6 +908,93 @@ end
911908
println("✅ Catalog table loading tests completed!")
912909
end
913910

911+
@testset "Catalog Table Loading with Credentials" begin
912+
println("Testing catalog table loading with vended credentials...")
913+
914+
catalog_uri = "http://localhost:8181/api/catalog"
915+
916+
catalog = C_NULL
917+
table = C_NULL
918+
scan = C_NULL
919+
stream = C_NULL
920+
batch = nothing
921+
922+
try
923+
# Create catalog connection WITHOUT S3 credentials
924+
# When using load_credentials=true, the catalog will provide vended credentials
925+
props = Dict(
926+
"credential" => "root:s3cr3t",
927+
"scope" => "PRINCIPAL_ROLE:ALL",
928+
"warehouse" => "warehouse",
929+
# Note: We include s3.endpoint as we would get the domain name `minio` otherwise.
930+
"s3.endpoint" => "http://localhost:9000",
931+
"s3.region" => "us-east-1"
932+
)
933+
catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props)
934+
@test catalog != C_NULL
935+
println("✅ Catalog created successfully")
936+
937+
# Load the customer table with vended credentials
938+
# The catalog will provide short-lived S3 credentials
939+
println("Attempting to load customer table from tpch.sf01 with vended credentials...")
940+
table = RustyIceberg.load_table(catalog, ["tpch.sf01"], "customer"; load_credentials=true)
941+
@test table != C_NULL
942+
println("✅ Customer table loaded successfully with vended credentials")
943+
944+
# Create a scan on the loaded table
945+
println("Creating scan on loaded customer table...")
946+
scan = RustyIceberg.new_scan(table)
947+
@test scan != C_NULL
948+
println("✅ Scan created successfully on loaded table")
949+
950+
# Select specific columns
951+
println("Selecting specific columns from customer table...")
952+
RustyIceberg.select_columns!(scan, ["c_custkey", "c_name", "c_nationkey"])
953+
println("✅ Column selection completed")
954+
955+
# Execute the scan
956+
println("Executing scan on loaded customer table...")
957+
stream = RustyIceberg.scan!(scan)
958+
@test stream != C_NULL
959+
println("✅ Scan executed successfully")
960+
961+
# Read the first batch to verify data
962+
println("Reading first batch from loaded customer table...")
963+
batch_ptr = RustyIceberg.next_batch(stream)
964+
965+
if batch_ptr != C_NULL
966+
batch = unsafe_load(batch_ptr)
967+
if batch.data != C_NULL && batch.length > 0
968+
println("✅ Successfully read first batch with $(batch.length) bytes of Arrow IPC data")
969+
970+
# Verify we got actual data
971+
@test batch.length > 0
972+
println("✅ Batch contains valid Arrow data from catalog-loaded customer table with vended credentials")
973+
974+
# Clean up the batch
975+
RustyIceberg.free_batch(batch_ptr)
976+
end
977+
end
978+
finally
979+
# Clean up all resources in reverse order
980+
if stream != C_NULL
981+
RustyIceberg.free_stream(stream)
982+
end
983+
if scan != C_NULL
984+
RustyIceberg.free_scan!(scan)
985+
end
986+
if table != C_NULL
987+
RustyIceberg.free_table(table)
988+
end
989+
if catalog != C_NULL
990+
RustyIceberg.free_catalog(catalog)
991+
end
992+
println("✅ All resources cleaned up successfully")
993+
end
994+
995+
println("✅ Catalog table loading with credentials tests completed!")
996+
end
997+
914998
@testset "Catalog Incremental Scan" begin
915999
println("Testing catalog incremental scan...")
9161000

@@ -1019,9 +1103,6 @@ end
10191103
println("✅ Successfully read from catalog-loaded incremental scan")
10201104
println(" - Inserts batches: $inserts_count, total rows: $total_inserts")
10211105
println(" - Deletes batches: $deletes_count, total rows: $total_deletes")
1022-
1023-
catch e
1024-
rethrow(e)
10251106
finally
10261107
# Clean up all resources in reverse order
10271108
if inserts_stream != C_NULL

0 commit comments

Comments
 (0)