Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions iceberg_rust_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions iceberg_rust_ffi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iceberg_rust_ffi"
version = "0.7.1"
version = "0.7.2"
edition = "2021"

[lib]
Expand All @@ -12,8 +12,8 @@ default = ["julia"]
julia = []

[dependencies]
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44", features = ["storage-azdls"] }
iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "34829106fbb6ef94785edd99f784ea767a4aae44" }
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378", features = ["storage-azdls"] }
iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "aec29baaeb16dd9b4d2c5c4416ac4b5829fe3378" }
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
Expand Down
42 changes: 42 additions & 0 deletions iceberg_rust_ffi/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,22 @@ impl IcebergCatalog {
Ok(IcebergTable { table })
}

/// Load a table by namespace and name with vended credentials
pub async fn load_table_with_credentials(
&self,
namespace_parts: Vec<String>,
table_name: String,
) -> Result<IcebergTable> {
let namespace = NamespaceIdent::from_vec(namespace_parts)?;
let table_ident = TableIdent::new(namespace, table_name);
let table = self
.as_ref()
.load_table_with_credentials(&table_ident)
.await?;

Ok(IcebergTable { table })
}

/// List tables in a namespace
pub async fn list_tables(&self, namespace_parts: Vec<String>) -> Result<Vec<String>> {
let namespace = NamespaceIdent::from_vec(namespace_parts)?;
Expand Down Expand Up @@ -520,6 +536,32 @@ export_runtime_op!(
table_name: *const c_char
);

// Load a table from the catalog with vended credentials
export_runtime_op!(
iceberg_catalog_load_table_with_credentials,
crate::IcebergTableResponse,
|| {
if catalog.is_null() {
return Err(anyhow::anyhow!("Null catalog pointer provided"));
}

let namespace_parts = parse_string_array(namespace_parts_ptr, namespace_parts_len)?;
let table_name = parse_c_string(table_name, "table_name")?;
let catalog_ref = unsafe { &*catalog };

Ok((catalog_ref, namespace_parts, table_name))
},
result_tuple,
async {
let (catalog_ref, namespace_parts, table_name) = result_tuple;
catalog_ref.load_table_with_credentials(namespace_parts, table_name).await
},
catalog: *mut IcebergCatalog,
namespace_parts_ptr: *const *const c_char,
namespace_parts_len: usize,
table_name: *const c_char
);

// List tables in a namespace
export_runtime_op!(
iceberg_catalog_list_tables,
Expand Down
37 changes: 27 additions & 10 deletions src/catalog.jl
Original file line number Diff line number Diff line change
Expand Up @@ -309,39 +309,56 @@ function free_catalog(catalog::Catalog)
end

"""
load_table(catalog::Catalog, namespace::Vector{String}, table_name::String)::Table
load_table(catalog::Catalog, namespace::Vector{String}, table_name::String; load_credentials::Bool=false)::Table

Load a table from a catalog by namespace and name.

# Arguments
- `catalog::Catalog`: The catalog handle
- `namespace::Vector{String}`: Namespace parts (e.g., ["warehouse", "orders"])
- `table_name::String`: The table name
- `load_credentials::Bool=false`: If true, fetches vended credentials from the catalog server

# Returns
- A `Table` handle for use in scan operations

# Example
```julia
# Load without credentials (default)
table = load_table(catalog, ["warehouse", "orders"], "customers")

# Load with vended credentials
table = load_table(catalog, ["warehouse", "orders"], "customers"; load_credentials=true)
```
"""
function load_table(catalog::Catalog, namespace::Vector{String}, table_name::String)
function load_table(catalog::Catalog, namespace::Vector{String}, table_name::String; load_credentials::Bool=false)
response = TableResponse()

# Convert namespace to array of C strings
namespace_ptrs = [pointer(part) for part in namespace]
namespace_len = length(namespace)
namespace_ptrs_ptr = (namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)

async_ccall(response, namespace, namespace_ptrs) do handle
@ccall rust_lib.iceberg_catalog_load_table(
catalog.ptr::Ptr{Cvoid},
(namespace_len > 0 ? pointer(namespace_ptrs) : C_NULL)::Ptr{Ptr{Cchar}},
namespace_len::Csize_t,
table_name::Cstring,
response::Ref{TableResponse},
handle::Ptr{Cvoid}
)::Cint
if load_credentials
@ccall rust_lib.iceberg_catalog_load_table_with_credentials(
catalog.ptr::Ptr{Cvoid},
namespace_ptrs_ptr::Ptr{Ptr{Cchar}},
namespace_len::Csize_t,
table_name::Cstring,
response::Ref{TableResponse},
handle::Ptr{Cvoid}
)::Cint
else
@ccall rust_lib.iceberg_catalog_load_table(
catalog.ptr::Ptr{Cvoid},
namespace_ptrs_ptr::Ptr{Ptr{Cchar}},
namespace_len::Csize_t,
table_name::Cstring,
response::Ref{TableResponse},
handle::Ptr{Cvoid}
)::Cint
end
end

@throw_on_error(response, "catalog_load_table", IcebergException)
Expand Down
93 changes: 87 additions & 6 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -888,9 +888,6 @@ end
RustyIceberg.free_batch(batch_ptr)
end
end

catch e
rethrow(e)
finally
# Clean up all resources in reverse order
if stream != C_NULL
Expand All @@ -911,6 +908,93 @@ end
println("✅ Catalog table loading tests completed!")
end

@testset "Catalog Table Loading with Credentials" begin
println("Testing catalog table loading with vended credentials...")

catalog_uri = "http://localhost:8181/api/catalog"

catalog = C_NULL
table = C_NULL
scan = C_NULL
stream = C_NULL
batch = nothing

try
# Create catalog connection WITHOUT S3 credentials
# When using load_credentials=true, the catalog will provide vended credentials
props = Dict(
"credential" => "root:s3cr3t",
"scope" => "PRINCIPAL_ROLE:ALL",
"warehouse" => "warehouse",
# Note: We include s3.endpoint as we would get the domain name `minio` otherwise.
"s3.endpoint" => "http://localhost:9000",
"s3.region" => "us-east-1"
)
catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props)
@test catalog != C_NULL
println("✅ Catalog created successfully")

# Load the customer table with vended credentials
# The catalog will provide short-lived S3 credentials
println("Attempting to load customer table from tpch.sf01 with vended credentials...")
table = RustyIceberg.load_table(catalog, ["tpch.sf01"], "customer"; load_credentials=true)
@test table != C_NULL
println("✅ Customer table loaded successfully with vended credentials")

# Create a scan on the loaded table
println("Creating scan on loaded customer table...")
scan = RustyIceberg.new_scan(table)
@test scan != C_NULL
println("✅ Scan created successfully on loaded table")

# Select specific columns
println("Selecting specific columns from customer table...")
RustyIceberg.select_columns!(scan, ["c_custkey", "c_name", "c_nationkey"])
println("✅ Column selection completed")

# Execute the scan
println("Executing scan on loaded customer table...")
stream = RustyIceberg.scan!(scan)
@test stream != C_NULL
println("✅ Scan executed successfully")

# Read the first batch to verify data
println("Reading first batch from loaded customer table...")
batch_ptr = RustyIceberg.next_batch(stream)

if batch_ptr != C_NULL
batch = unsafe_load(batch_ptr)
if batch.data != C_NULL && batch.length > 0
println("✅ Successfully read first batch with $(batch.length) bytes of Arrow IPC data")

# Verify we got actual data
@test batch.length > 0
println("✅ Batch contains valid Arrow data from catalog-loaded customer table with vended credentials")

# Clean up the batch
RustyIceberg.free_batch(batch_ptr)
end
end
finally
# Clean up all resources in reverse order
if stream != C_NULL
RustyIceberg.free_stream(stream)
end
if scan != C_NULL
RustyIceberg.free_scan!(scan)
end
if table != C_NULL
RustyIceberg.free_table(table)
end
if catalog != C_NULL
RustyIceberg.free_catalog(catalog)
end
println("✅ All resources cleaned up successfully")
end

println("✅ Catalog table loading with credentials tests completed!")
end

@testset "Catalog Incremental Scan" begin
println("Testing catalog incremental scan...")

Expand Down Expand Up @@ -1019,9 +1103,6 @@ end
println("✅ Successfully read from catalog-loaded incremental scan")
println(" - Inserts batches: $inserts_count, total rows: $total_inserts")
println(" - Deletes batches: $deletes_count, total rows: $total_deletes")

catch e
rethrow(e)
finally
# Clean up all resources in reverse order
if inserts_stream != C_NULL
Expand Down