diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..a31cba7 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,5 @@ +[target.aarch64-apple-darwin] +rustflags = ["-Clink-arg=-undefined","-Clink-arg=dynamic_lookup"] + +[target.x86_64-apple-darwin] +rustflags = ["-Clink-arg=-undefined","-Clink-arg=dynamic_lookup"] \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md new file mode 120000 index 0000000..681311e --- /dev/null +++ b/AGENTS.md @@ -0,0 +1 @@ +CLAUDE.md \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..c5881f2 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,111 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Common Development Commands + +### Building +- `cargo build` - Build the Rust FFI library (debug) +- `cargo build --release --no-default-features` - Build the Rust FFI library (production) +- `make build-lib` - Build the Rust library and generate C header using cbindgen +- `make build-test` - Build the C integration test (requires library). Not needed if `./run_integration_test.sh` is to be used. + +### Testing +- `./run_integration_test.sh` - Recommended way to run the full integration test (builds everything and runs test with colored output) locally (requires containers). +- `make all` - Build everything and run integration test (requires containers) +- `make run-containers` - Start Docker containers for S3 testing +- `make test` - Run the integration test (requires build and containers) +- `make stop-containers` - Stop Docker containers +- `cargo test` - Run Rust unit tests + +### Code Quality +- `cargo fmt` - Format Rust code +- `cargo clippy` - Run Rust linter +- `cargo check` - Quick check for Rust compilation errors + +### Cleanup +- `make clean` - Clean build artifacts (but keep target directory) +- `make clean-all` - Clean everything including target directory + +## Architecture Overview + +This project provides a **Foreign Function Interface (FFI)** for Apache Iceberg, allowing C programs (and other languages through C bindings) to access Iceberg tables stored in object storage systems like S3. The majority of the infrastucture relies on object_store_ffi crate. If you don't have access to that crate's code locally, access it at this [URL](https://github.com/RelationalAI/object_store_ffi). + +### Key Components + +#### Rust Library (`src/lib.rs`) +- **Core FFI Implementation**: Exposes Iceberg functionality through C-compatible functions +- **Async Runtime Integration**: Uses Tokio for async operations with object_store_ffi for callback handling. Async operations rely on `export_runtime_op!` macro, which has a sync block, which is a builder function, where all deserialization and conversion is done. Then the result of that is passed to an async block. Each parameter has to implement Send trait, in order to be passed to the async block +- **Julia Integration**: Conditional compilation features for Julia interop (`julia` feature flag) +- **Memory Management**: Safe FFI patterns with proper cleanup functions + +#### C header (`include/iceberg_rust_ffi.h`) +- **Manual Generation**: C header is not generated right now. Whenever you make a change in the Rust library, examine whether the header should be updated. +- **C99 Compatible**: Ensures compatibility with standard C compilers +- **Response Structures**: Async operations return response structures with context for cancellation + +#### Integration Test (`tests/integration_test.c`) +- **Dynamic Loading**: Uses `dlopen`/`dlsym` to load the Rust library at runtime +- **Async API Testing**: Tests the new async API with response structures and callbacks +- **S3 Integration**: Connects to S3 (or MinIO) to test real object storage operations + +### FFI Design Patterns + +#### Async Operations with Callbacks +The FFI uses an async callback pattern where: +1. C calls an async function with a response structure +2. Rust spawns the operation and returns immediately +3. When complete, Rust invokes a callback to signal completion +4. C polls or waits for completion, then checks the response structure + +#### Memory Management +- **Owned Pointers**: Rust allocates, C receives opaque pointers +- **Cleanup Functions**: Every allocated resource has a corresponding `_free` function +- **Error Handling**: Errors are returned via response structures with allocated error strings + +#### Context and Cancellation +- Operations return a context pointer that can be used for cancellation +- `iceberg_cancel_context` and `iceberg_destroy_context` functions manage operation lifecycle + +### S3 Configuration + +The integration test expects AWS S3 credentials through environment variables: +- `AWS_ACCESS_KEY_ID` +- `AWS_SECRET_ACCESS_KEY` +- `AWS_REGION` or `AWS_DEFAULT_REGION` +- `AWS_ENDPOINT_URL` (for MinIO or custom S3-compatible storage) + +Use the `.env` file or export variables directly. The test is designed to fail with permission errors when S3 paths are inaccessible, which confirms the API is working correctly. + +### Build System + +#### Cargo Features +- Default features: `["julia"]` +- `julia` feature: Enables Julia thread adoption and GC integration +- Integration tests use `--no-default-features` to avoid Julia dependencies + +### RustyIceberg.jl + +Whenever making API changes here, the corresponding changes should be made in the RustyIceberg.jl repository, if it exists locally in the same folder next to this repository. This repository is for Julia bindings on top of this FFI. +Once changes are made there, they should be tested by: +1. Doing `cargo build` (with default features) for the iceberg_rust_ffi. +2. Invoking `ICEBERG_RUST_LIB=/target/debug julia --project=. examples/basic_usage.jl` in the RustyIceberg.jl directory. + +## Development Notes + +### Working with FFI +- Always check for null pointers in C code before dereferencing +- Use the provided `_free` functions to avoid memory leaks +- Error messages are allocated strings that must be freed with `iceberg_destroy_cstring` + +### Testing Changes +Run the integration test after making changes to verify the FFI still works: +```bash +./run_integration_test.sh +``` + +### Object Store Integration +This crate depends on `object_store_ffi` for async runtime management and callback handling. The integration provides: +- Cross-platform async runtime setup +- Callback infrastructure for async operations +- Context management for cancellation support \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 6cdaad3..2d85e47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,9 @@ name = "anyhow" version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +dependencies = [ + "backtrace", +] [[package]] name = "apache-avro" @@ -277,13 +280,37 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0f477b951e452a0b6b4a10b53ccd569042d1d01729b519e02074a9c0958a063" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-compression" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "977eb15ea9efd848bb8a4a1a2500347ed7f0bf794edf0dc3ddcf439f43d36b23" +dependencies = [ + "compression-codecs", + "compression-core", + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-lock" version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener", + "event-listener 5.4.0", "event-listener-strategy", "pin-project-lite", ] @@ -309,15 +336,10 @@ dependencies = [ ] [[package]] -name = "atty" -version = "0.2.14" +name = "atomic-waker" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi 0.1.19", - "libc", - "winapi", -] +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" @@ -325,6 +347,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom 0.2.16", + "instant", + "rand 0.8.5", +] + [[package]] name = "backon" version = "1.5.2" @@ -377,12 +410,6 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.9.1" @@ -500,25 +527,6 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" -[[package]] -name = "cbindgen" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da6bc11b07529f16944307272d5bd9b22530bc7d05751717c9d416586cedab49" -dependencies = [ - "clap", - "heck 0.4.1", - "indexmap 1.9.3", - "log", - "proc-macro2", - "quote", - "serde", - "serde_json", - "syn 1.0.109", - "tempfile", - "toml", -] - [[package]] name = "cc" version = "1.2.30" @@ -558,29 +566,33 @@ dependencies = [ ] [[package]] -name = "clap" -version = "3.2.25" +name = "cmake" +version = "0.1.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" dependencies = [ - "atty", - "bitflags 1.3.2", - "clap_lex", - "indexmap 1.9.3", - "strsim 0.10.0", - "termcolor", - "textwrap", + "cc", ] [[package]] -name = "clap_lex" -version = "0.2.4" +name = "compression-codecs" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +checksum = "485abf41ac0c8047c07c87c72c8fb3eb5197f6e9d7ded615dfd1a00ae00a0f64" dependencies = [ - "os_str_bytes", + "compression-core", + "flate2", + "memchr", + "zstd", + "zstd-safe", ] +[[package]] +name = "compression-core" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -616,6 +628,16 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -676,6 +698,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -718,7 +749,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim 0.11.1", + "strsim", "syn 2.0.104", ] @@ -739,6 +770,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04d2cd9c18b9f454ed67da600630b021a8a80bf33f8c95896ab33aaf1c26b728" +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "deranged" version = "0.4.0" @@ -830,6 +867,24 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + +[[package]] +name = "enum-as-inner" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -846,6 +901,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "event-listener" version = "5.4.0" @@ -863,7 +924,7 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" dependencies = [ - "event-listener", + "event-listener 5.4.0", "pin-project-lite", ] @@ -889,7 +950,7 @@ version = "25.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1" dependencies = [ - "bitflags 2.9.1", + "bitflags", "rustc_version", ] @@ -900,16 +961,44 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" dependencies = [ "crc32fast", + "libz-ng-sys", "libz-rs-sys", "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1083,6 +1172,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "h2" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap 2.10.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.6.0" @@ -1119,27 +1227,12 @@ version = "0.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" -[[package]] -name = "heck" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" - [[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.5.2" @@ -1152,6 +1245,51 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hickory-proto" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92652067c9ce6f66ce53cc38d1169daa36e6e7eb7dd3b63b5103bd9d97117248" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna", + "ipnet", + "once_cell", + "rand 0.8.5", + "thiserror 1.0.69", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbb117a1ca520e111743ab2f6688eddee69db4e0ea242545a604dce8a66fd22e" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot", + "rand 0.8.5", + "resolv-conf", + "smallvec", + "thiserror 1.0.69", + "tokio", + "tracing", +] + [[package]] name = "hmac" version = "0.12.1" @@ -1210,6 +1348,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "humantime" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" + [[package]] name = "hyper" version = "1.6.0" @@ -1219,6 +1363,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2", "http", "http-body", "httparse", @@ -1239,6 +1384,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -1351,17 +1497,20 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "arrow-array", "arrow-ipc", - "cbindgen", "futures", "iceberg", "libc", + "object_store_ffi", + "once_cell", "tempfile", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -1499,6 +1648,15 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -1511,11 +1669,23 @@ version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" dependencies = [ - "bitflags 2.9.1", + "bitflags", "cfg-if", "libc", ] +[[package]] +name = "ipconfig" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" +dependencies = [ + "socket2 0.5.10", + "widestring", + "windows-sys 0.48.0", + "winreg", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -1673,6 +1843,16 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +[[package]] +name = "libz-ng-sys" +version = "1.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7118c2c2a3c7b6edc279a8b19507672b9c4d716f95e671172dfa4e23f9fd824" +dependencies = [ + "cmake", + "libc", +] + [[package]] name = "libz-rs-sys" version = "0.5.1" @@ -1682,6 +1862,12 @@ dependencies = [ "zlib-rs", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.9.4" @@ -1723,6 +1909,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -1763,6 +1958,35 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "metrics" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3045b4193fbdc5b5681f32f11070da9be3609f189a79f3390706d42587f46bb5" +dependencies = [ + "ahash 0.8.12", + "portable-atomic", +] + +[[package]] +name = "metrics-util" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.14.5", + "indexmap 2.10.0", + "metrics", + "num_cpus", + "ordered-float 4.6.0", + "quanta", + "radix_trie", + "sketches-ddsketch", +] + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -1793,7 +2017,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", - "event-listener", + "event-listener 5.4.0", "futures-util", "loom", "parking_lot", @@ -1811,6 +2035,24 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.16", +] + +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1908,7 +2150,7 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" dependencies = [ - "hermit-abi 0.5.2", + "hermit-abi", "libc", ] @@ -1921,6 +2163,80 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.11.2" +source = "git+https://github.com/RelationalAI/arrow-rs.git?tag=v0.11.3-beta1#fa77acbd1e5e3acbf0824443b2c1d1df8609b457" +dependencies = [ + "async-trait", + "base64", + "bytes", + "chrono", + "futures", + "httparse", + "humantime", + "hyper", + "itertools", + "md-5", + "parking_lot", + "percent-encoding", + "quick-xml", + "rand 0.8.5", + "reqwest", + "ring", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + +[[package]] +name = "object_store_ffi" +version = "0.12.3" +source = "git+https://github.com/RelationalAI/object_store_ffi?rev=79b08071c7a1642532b5891253280861eca9e44e#79b08071c7a1642532b5891253280861eca9e44e" +dependencies = [ + "anyhow", + "async-channel", + "async-compression", + "async-trait", + "backoff", + "base64", + "bytes", + "chrono", + "crossbeam-queue", + "flate2", + "flume", + "futures-util", + "hickory-resolver", + "hyper", + "metrics", + "metrics-util", + "moka", + "object_store", + "once_cell", + "openssl", + "pin-project", + "quanta", + "rand 0.8.5", + "regex", + "reqwest", + "serde", + "serde_json", + "serde_path_to_error", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "url", + "uuid", + "walkdir", + "zeroize", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1955,6 +2271,60 @@ dependencies = [ "uuid", ] +[[package]] +name = "openssl" +version = "0.10.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-src" +version = "300.5.2+3.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d270b79e2926f5150189d475bc7e9d2c69f9c4697b185fa917d5a32b792d21b4" +dependencies = [ + "cc", +] + +[[package]] +name = "openssl-sys" +version = "0.9.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +dependencies = [ + "cc", + "libc", + "openssl-src", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -1983,12 +2353,6 @@ dependencies = [ "hashbrown 0.14.5", ] -[[package]] -name = "os_str_bytes" -version = "6.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" - [[package]] name = "overload" version = "0.1.1" @@ -2071,6 +2435,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2163,6 +2547,21 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.1+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.37.5" @@ -2249,6 +2648,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2308,13 +2717,22 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" dependencies = [ - "bitflags 2.9.1", + "bitflags", ] [[package]] @@ -2435,6 +2853,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", + "h2", + "hickory-resolver", "http", "http-body", "http-body-util", @@ -2443,10 +2863,12 @@ dependencies = [ "hyper-util", "js-sys", "log", + "once_cell", "percent-encoding", "pin-project-lite", "quinn", "rustls", + "rustls-native-certs", "rustls-pki-types", "serde", "serde_json", @@ -2466,6 +2888,12 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "resolv-conf" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95325155c684b1c89f7765e30bc1c42e4a6da51ca513615660cb8a62ef9a88e3" + [[package]] name = "ring" version = "0.17.14" @@ -2578,7 +3006,7 @@ version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ - "bitflags 2.9.1", + "bitflags", "errno", "libc", "linux-raw-sys", @@ -2599,6 +3027,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -2632,6 +3072,24 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "schemars" version = "0.9.0" @@ -2674,6 +3132,29 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "security-framework" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.26" @@ -2727,6 +3208,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59fab13f937fa393d08645bf3a84bdfe86e296747b506ada67bb15f10f218b2a" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -2834,6 +3325,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.10" @@ -2846,6 +3343,27 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "snafu" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "snap" version = "1.1.1" @@ -2872,6 +3390,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2884,12 +3411,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "strsim" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" - [[package]] name = "strsim" version = "0.11.1" @@ -2917,7 +3438,7 @@ version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ - "heck 0.5.0", + "heck", "proc-macro2", "quote", "rustversion", @@ -2930,7 +3451,7 @@ version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" dependencies = [ - "heck 0.5.0", + "heck", "proc-macro2", "quote", "syn 2.0.104", @@ -3009,21 +3530,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "termcolor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" -dependencies = [ - "winapi-util", -] - -[[package]] -name = "textwrap" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c13547615a44dc9c452a8a534638acdf07120d4b6847c8178705da06306a3057" - [[package]] name = "thiserror" version = "1.0.69" @@ -3214,15 +3720,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" -dependencies = [ - "serde", -] - [[package]] name = "toml_datetime" version = "0.6.11" @@ -3261,7 +3758,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags 2.9.1", + "bitflags", "bytes", "futures-util", "http", @@ -3292,9 +3789,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "tracing-core" version = "0.1.34" @@ -3439,12 +3948,28 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3582,6 +4107,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "widestring" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d" + [[package]] name = "winapi" version = "0.3.9" @@ -3715,6 +4246,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -3742,6 +4282,21 @@ dependencies = [ "windows-targets 0.53.3", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -3784,6 +4339,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -3796,6 +4357,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -3808,6 +4375,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -3832,6 +4405,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -3844,6 +4423,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -3856,6 +4441,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -3868,6 +4459,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -3889,13 +4486,23 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags 2.9.1", + "bitflags", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0db631d..13a822d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,23 +1,28 @@ [package] name = "iceberg_rust_ffi" -version = "0.1.0" +version = "0.2.0" edition = "2021" [lib] name = "iceberg_rust_ffi" crate-type = ["cdylib"] -[build-dependencies] -cbindgen = "0.26" +[features] +default = ["julia"] +julia = [] [dependencies] iceberg = "0.6.0" +object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } -libc = "0.2" futures = "0.3" +libc = "0.2" anyhow = "1.0" arrow-array = "55.2.0" arrow-ipc = "55.2.0" +tracing-subscriber = "0.3" +tracing = "0.1" +once_cell = "1.19" [dev-dependencies] tempfile = "3.0" diff --git a/Makefile b/Makefile index aa7bc28..8e51903 100644 --- a/Makefile +++ b/Makefile @@ -17,17 +17,14 @@ TARGET = local # Default target all: build test -# Generate C header -generate-header: +# Build the Rust library +build-lib: @if [ "$(TARGET)" = "local" ]; then \ - cargo build --release; \ + cargo build --release --no-default-features; \ else \ - cargo build --release --target $(TARGET); \ + cargo build --release --no-default-features --target $(TARGET); \ fi -# Build the Rust library and generate header -build-lib: generate-header - # Build the integration test build-test: build-lib $(CC) $(CFLAGS) -o $(TEST_NAME) $(TEST_SOURCE) $(LDFLAGS) @@ -67,7 +64,6 @@ clean-all: clean help: @echo "Available targets:" @echo " all - Build and run integration test" - @echo " generate-header - Generate C header file using cbindgen" @echo " build-lib - Build only the Rust library" @echo " build-test - Build the integration test (requires library)" @echo " build - Build everything" @@ -78,4 +74,4 @@ help: @echo " clean-all - Clean everything including target directory" @echo " help - Show this help message" -.PHONY: all generate-header build-lib build-test build test clean clean-all help stop-containers run-containers \ No newline at end of file +.PHONY: all build-lib build-test build test clean clean-all help stop-containers run-containers \ No newline at end of file diff --git a/build.rs b/build.rs deleted file mode 100644 index d2de405..0000000 --- a/build.rs +++ /dev/null @@ -1,21 +0,0 @@ -use std::env; -use std::path::PathBuf; - -fn main() { - let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); - let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); - - let config = cbindgen::Config::default(); - - cbindgen::Builder::new() - .with_crate(&crate_dir) - .with_config(config) - .generate() - .expect("Unable to generate bindings") - .write_to_file(out_dir.join("iceberg_rust_ffi.h")); - - // Note: We're using a manually created header file instead of the cbindgen-generated one - // The cbindgen output is available in the build output directory if needed for reference - println!("cargo:rerun-if-changed=src/lib.rs"); - println!("cargo:rerun-if-changed=include/iceberg_rust_ffi.h"); -} diff --git a/cbindgen.toml b/cbindgen.toml deleted file mode 100644 index a1d446e..0000000 --- a/cbindgen.toml +++ /dev/null @@ -1,33 +0,0 @@ -language = "C" -include_guard = "ICEBERG_RUST_FFI_H" -autogen_warning = "// This file is auto-generated by cbindgen. Do not edit manually." -tab_width = 4 -documentation = true -documentation_style = "doxy" -line_length = 100 -cpp_compat = false -no_includes = true - -[export] -prefix = "iceberg_" -include = ["IcebergTable", "IcebergScan", "ArrowBatch", "IcebergResult"] -rename = { - "IcebergTable" = "IcebergTable", - "IcebergScan" = "IcebergScan", - "ArrowBatch" = "ArrowBatch", - "IcebergResult" = "IcebergResult" -} - -[defines] -"target_os = \"macos\"" = "TARGET_OS_MACOS" -"target_os = \"linux\"" = "TARGET_OS_LINUX" -"target_os = \"windows\"" = "TARGET_OS_WINDOWS" - -[parse] -parse_deps = true -include = ["iceberg_rust_ffi"] -extra_bindings = [] - -[header] -prefix = "#ifndef ICEBERG_RUST_FFI_H\n#define ICEBERG_RUST_FFI_H\n\n#include \n#include \n#include \n\n#ifdef __cplusplus\nextern \"C\" {\n#endif\n\n" -suffix = "\n#ifdef __cplusplus\n}\n#endif\n\n#endif // ICEBERG_RUST_FFI_H\n" \ No newline at end of file diff --git a/include/iceberg_rust_ffi.h b/include/iceberg_rust_ffi.h index 7fb7a10..ff71d58 100644 --- a/include/iceberg_rust_ffi.h +++ b/include/iceberg_rust_ffi.h @@ -11,49 +11,103 @@ extern "C" { // Forward declarations typedef struct IcebergTable IcebergTable; -typedef struct IcebergScan IcebergScan; +typedef struct Context Context; + +// Configuration for iceberg runtime +typedef struct { + size_t n_threads; +} IcebergStaticConfig; + +// Result types +typedef enum { + CRESULT_OK = 0, + CRESULT_ERROR = 1 +} CResult; // Arrow batch as serialized bytes -typedef struct ArrowBatch { - const uint8_t* data; // Pointer to serialized Arrow IPC data - size_t length; // Length of the data in bytes - void* rust_ptr; // Internal Rust pointer for memory management +typedef struct { + const uint8_t* data; + size_t length; + void* rust_ptr; } ArrowBatch; -typedef enum { - ICEBERG_OK = 0, - ICEBERG_ERROR = -1, - ICEBERG_NULL_POINTER = -2, - ICEBERG_IO_ERROR = -3, - ICEBERG_INVALID_TABLE = -4, - ICEBERG_END_OF_STREAM = -5 -} IcebergResult; - -// Table operations -IcebergResult iceberg_table_open(const char* table_path, const char* metadata_path, IcebergTable** table); +// Response structures for async operations +typedef struct { + CResult result; + IcebergTable* table; + char* error_message; + const Context* context; +} IcebergTableResponse; + +typedef struct IcebergScan IcebergScan; + +typedef struct { + CResult result; + IcebergScan* scan; + char* error_message; + const Context* context; +} IcebergScanResponse; + +typedef struct { + void *stream; +} IcebergArrowStream; + +typedef struct { + CResult result; + IcebergArrowStream* stream; + char* error_message; + const Context* context; +} IcebergArrowStreamResponse; + +typedef struct { + CResult result; + char* error_message; + const Context* context; +} IcebergResponse; + +typedef struct { + CResult result; + ArrowBatch* batch; + char* error_message; + const Context* context; +} IcebergBatchResponse; + +// Callback types +typedef int (*PanicCallback)(void); +typedef int (*ResultCallback)(const void* task); + +// Runtime initialization +CResult iceberg_init_runtime(IcebergStaticConfig config, PanicCallback panic_callback, ResultCallback result_callback); + +// Async table operations +CResult iceberg_table_open(const char* snapshot_path, IcebergTableResponse* response, const void* handle); void iceberg_table_free(IcebergTable* table); -// Scan operations -IcebergResult iceberg_table_scan(IcebergTable* table, IcebergScan** scan); -IcebergResult iceberg_scan_select_columns(IcebergScan* scan, const char** column_names, size_t num_columns); -void iceberg_scan_free(IcebergScan* scan); +// Synchronous scan creation +IcebergScan* iceberg_new_scan(IcebergTable* table); +int iceberg_select_columns(IcebergScan** scan, const char** column_names, size_t num_columns); +int iceberg_scan_build(IcebergScan** scan); +int iceberg_scan_with_data_file_concurrency_limit(IcebergScan** scan, size_t n); +int iceberg_scan_with_manifest_entry_concurrency_limit(IcebergScan** scan, size_t n); +int iceberg_scan_with_batch_size(IcebergScan** scan, size_t n); +void iceberg_scan_free(IcebergScan** scan); + +// Async streaming API +CResult iceberg_arrow_stream(IcebergScan* scan, IcebergArrowStreamResponse* response, const void* handle); +CResult iceberg_next_batch(IcebergArrowStream* stream, IcebergBatchResponse* response, const void* handle); +void iceberg_arrow_stream_free(IcebergArrowStream* stream); -// Arrow batch operations -IcebergResult iceberg_scan_next_batch(IcebergScan* scan, ArrowBatch** batch); +// Synchronous batch free void iceberg_arrow_batch_free(ArrowBatch* batch); -// Error handling -const char* iceberg_error_message(); - -// Function pointer typedefs for dynamic loading -typedef IcebergResult (*iceberg_table_open_func_t)(const char* table_path, const char* metadata_path, IcebergTable** table); -typedef void (*iceberg_table_free_func_t)(IcebergTable* table); -typedef IcebergResult (*iceberg_table_scan_func_t)(IcebergTable* table, IcebergScan** scan); -typedef IcebergResult (*iceberg_scan_select_columns_func_t)(IcebergScan* scan, const char** column_names, size_t num_columns); -typedef void (*iceberg_scan_free_func_t)(IcebergScan* scan); -typedef IcebergResult (*iceberg_scan_next_batch_func_t)(IcebergScan* scan, ArrowBatch** batch); -typedef void (*iceberg_arrow_batch_free_func_t)(ArrowBatch* batch); -typedef const char* (*iceberg_error_message_func_t)(); +// Utility functions +CResult iceberg_destroy_cstring(char* string); +const char* iceberg_current_metrics(void); + +// Context management functions for cancellation support +CResult iceberg_cancel_context(const Context* ctx); +CResult iceberg_destroy_context(const Context* ctx); + #ifdef __cplusplus } diff --git a/run_integration_test.sh b/run_integration_test.sh index 1ffca99..c881531 100755 --- a/run_integration_test.sh +++ b/run_integration_test.sh @@ -46,9 +46,9 @@ if [ -f ".env" ]; then set +a fi -# Step 1: Build the Rust library +# Step 1: Build the Rust library (without julia feature for standalone C integration) print_status "Building Rust library..." -if cargo build; then +if cargo build --no-default-features; then print_success "Rust library built successfully" else print_error "Failed to build Rust library" @@ -74,7 +74,7 @@ print_status "Using library from: $LIB_PATH" # Step 2: Build the integration test print_status "Building integration test..." -if gcc -o integration_test tests/integration_test.c -Iinclude -L"$LIB_PATH" -liceberg_rust_ffi -lpthread -ldl -lm; then +if gcc -Wall -Wextra -o integration_test tests/integration_test.c -Iinclude -L"$LIB_PATH" -liceberg_rust_ffi -lpthread -ldl -lm; then print_success "Integration test built successfully" else print_error "Failed to build integration test" @@ -84,7 +84,24 @@ fi # Step 3: Run the integration test print_status "Running integration test..." echo "==========================================" -if ./integration_test; then + +# Determine the exact library filename +LIBRARY="" +if [ -f "$LIB_PATH/libiceberg_rust_ffi.dylib" ]; then + LIBRARY="$LIB_PATH/libiceberg_rust_ffi.dylib" +elif [ -f "$LIB_PATH/libiceberg_rust_ffi.so" ]; then + LIBRARY="$LIB_PATH/libiceberg_rust_ffi.so" +else + print_error "Could not find dynamic library" + exit 1 +fi + +print_status "Using library: $LIBRARY" +# Pass through RUST_LOG environment variable if set +if [ -n "$RUST_LOG" ]; then + export RUST_LOG="$RUST_LOG" +fi +if ./integration_test "$LIBRARY"; then echo "==========================================" print_success "Integration test completed successfully!" else diff --git a/src/lib.rs b/src/lib.rs index ce10979..5ded550 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,90 +1,220 @@ -use std::ffi::{CStr, CString}; -use std::os::raw::c_char; +use futures::TryStreamExt; +use std::ffi::{c_char, c_void, CStr}; use std::ptr; -use std::sync::Mutex; -use std::sync::OnceLock; +use tokio::sync::Mutex as AsyncMutex; use anyhow::Result; use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; -use futures::stream::StreamExt; use iceberg::io::FileIOBuilder; -use iceberg::table::StaticTable; +use iceberg::scan::{TableScan, TableScanBuilder}; +use iceberg::table::{StaticTable, Table}; use iceberg::TableIdent; -use std::env; -use tokio::runtime::Runtime; -// cbindgen annotations -#[allow(non_camel_case_types)] -#[allow(non_snake_case)] - -// Internal structures for Rust implementation -struct IcebergTableInternal { - table: iceberg::table::Table, +// Import from object_store_ffi +use object_store_ffi::{ + cancel_context, current_metrics, destroy_context, destroy_cstring, export_runtime_op, + with_cancellation, CResult, Context, NotifyGuard, PanicCallback, RawResponse, ResponseGuard, + ResultCallback, RESULT_CB, RT, +}; + +// We use `jl_adopt_thread` to ensure Rust can call into Julia when notifying +// the Base.Event that is waiting for the Rust result. +// Note that this will be linked in from the Julia process, we do not try +// to link it while building this Rust lib. +#[cfg(feature = "julia")] +extern "C" { + fn jl_adopt_thread() -> i32; + fn jl_gc_safe_enter() -> i32; + fn jl_gc_disable_finalizers_internal() -> c_void; } -struct IcebergScanInternal { - table: Option, - columns: Option>, - stream: Option>>>, +// Simple response type for operations that only need success/failure status +#[repr(C)] +pub struct IcebergResponse { + result: CResult, + error_message: *mut c_char, + context: *const Context, } -// Global Tokio runtime using OnceLock for thread safety -// TODO: Might want to share tokio runtime between here and object_store_ffi.jl, e.g., -// by passing object store in and using its runtime. -static RUNTIME: OnceLock = OnceLock::new(); +unsafe impl Send for IcebergResponse {} -fn get_runtime() -> &'static Runtime { - RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime")) -} +impl RawResponse for IcebergResponse { + type Payload = (); + + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } -// Thread-local error storage -thread_local! { - static LAST_ERROR: std::cell::RefCell> = std::cell::RefCell::new(None); + fn set_payload(&mut self, _payload: Option) { + // No payload for simple response + } } -fn set_error(error: String) { - LAST_ERROR.with(|e| { - *e.borrow_mut() = Some(error); - }); +// Simple config for iceberg - only what we need +#[derive(Copy, Clone)] +#[repr(C)] +pub struct IcebergStaticConfig { + n_threads: usize, } -fn clear_error() { - LAST_ERROR.with(|e| { - *e.borrow_mut() = None; - }); +impl Default for IcebergStaticConfig { + fn default() -> Self { + IcebergStaticConfig { + n_threads: 0, // 0 means use tokio's default + } + } } -// C API structures +// Direct structures - no opaque wrappers #[repr(C)] pub struct IcebergTable { - _private: [u8; 0], // Opaque type for C + pub table: Table, } #[repr(C)] pub struct IcebergScan { - _private: [u8; 0], // Opaque type for C + pub builder: Option>, + pub scan: Option, +} + +unsafe impl Send for IcebergScan {} + +// Stream wrapper for FFI - using async mutex to avoid blocking calls +#[repr(C)] +pub struct IcebergArrowStream { + // TODO: Maybe remove this mutex and let this be handled in Julia? + pub stream: + AsyncMutex>>, } +unsafe impl Send for IcebergArrowStream {} + #[repr(C)] pub struct ArrowBatch { pub data: *const u8, pub length: usize, - pub rust_ptr: *mut std::ffi::c_void, + pub rust_ptr: *mut c_void, } +// Response types for async operations #[repr(C)] -pub enum IcebergResult { - IcebergOk = 0, - IcebergError = -1, - IcebergNullPointer = -2, - IcebergIoError = -3, - IcebergInvalidTable = -4, - IcebergEndOfStream = -5, +pub struct IcebergTableResponse { + result: CResult, + table: *mut IcebergTable, + error_message: *mut c_char, + context: *const Context, +} + +unsafe impl Send for IcebergTableResponse {} + +impl RawResponse for IcebergTableResponse { + type Payload = IcebergTable; + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } + fn set_payload(&mut self, payload: Option) { + match payload { + Some(table) => { + let table_ptr = Box::into_raw(Box::new(table)); + self.table = table_ptr; + } + None => self.table = ptr::null_mut(), + } + } +} + +#[repr(C)] +pub struct IcebergArrowStreamResponse { + result: CResult, + stream: *mut IcebergArrowStream, + error_message: *mut c_char, + context: *const Context, +} + +unsafe impl Send for IcebergArrowStreamResponse {} + +impl RawResponse for IcebergArrowStreamResponse { + type Payload = IcebergArrowStream; + + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } + + fn set_payload(&mut self, payload: Option) { + match payload { + Some(stream) => { + self.stream = Box::into_raw(Box::new(stream)); + } + None => self.stream = ptr::null_mut(), + } + } +} + +#[repr(C)] +pub struct IcebergBatchResponse { + result: CResult, + batch: *mut ArrowBatch, + error_message: *mut c_char, + context: *const Context, +} + +unsafe impl Send for IcebergBatchResponse {} + +impl RawResponse for IcebergBatchResponse { + type Payload = Option; + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } + fn set_payload(&mut self, payload: Option) { + match payload.flatten() { + Some(batch) => { + // TODO: This is currently a bottleneck, and should be done in parallel. + let arrow_batch = serialize_record_batch(batch); + match arrow_batch { + Ok(arrow_batch) => { + self.batch = Box::into_raw(Box::new(arrow_batch)); + } + Err(_) => { + self.batch = ptr::null_mut(); + } + } + } + None => self.batch = ptr::null_mut(), + } + } } // Helper function to create ArrowBatch from RecordBatch -// TODO: This should be zero-copy... +// TODO: Switch to zero-copy once Arrow.jl supports C API. fn serialize_record_batch(batch: RecordBatch) -> Result { let buffer = Vec::new(); let mut stream_writer = StreamWriter::try_new(buffer, &batch.schema())?; @@ -95,7 +225,7 @@ fn serialize_record_batch(batch: RecordBatch) -> Result { let boxed_data = Box::new(serialized_data); let data_ptr = boxed_data.as_ptr(); let length = boxed_data.len(); - let rust_ptr = Box::into_raw(boxed_data) as *mut std::ffi::c_void; + let rust_ptr = Box::into_raw(boxed_data) as *mut c_void; Ok(ArrowBatch { data: data_ptr, @@ -104,293 +234,353 @@ fn serialize_record_batch(batch: RecordBatch) -> Result { }) } -// C API functions +// Initialize runtime - configure RT and RESULT_CB directly #[no_mangle] -pub extern "C" fn iceberg_table_open( - table_path: *const c_char, - metadata_path: *const c_char, - table: *mut *mut IcebergTable, -) -> IcebergResult { - if table_path.is_null() || metadata_path.is_null() || table.is_null() { - set_error("Null pointer provided".to_string()); - return IcebergResult::IcebergNullPointer; - } - - clear_error(); - - let path_str = unsafe { - match CStr::from_ptr(table_path).to_str() { - Ok(s) => s, - Err(e) => { - set_error(format!("Invalid UTF-8 in table path: {}", e)); - return IcebergResult::IcebergError; - } - } - }; +pub extern "C" fn iceberg_init_runtime( + config: IcebergStaticConfig, + panic_callback: PanicCallback, + result_callback: ResultCallback, +) -> CResult { + // Set the result callback + if let Err(_) = RESULT_CB.set(result_callback) { + return CResult::Error; // Already initialized + } - let metadata_path_str = unsafe { - match CStr::from_ptr(metadata_path).to_str() { - Ok(s) => s, - Err(e) => { - set_error(format!("Invalid UTF-8 in metadata path: {}", e)); - return IcebergResult::IcebergError; - } + // Set up panic hook + let prev = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + prev(info); + unsafe { panic_callback() }; + })); + + // Set up logging if not already configured + if std::env::var("RUST_LOG").is_err() { + unsafe { std::env::set_var("RUST_LOG", "iceberg_rust_ffi=warn,iceberg=warn") } + } + + // Initialize tracing subscriber + let _ = tracing_subscriber::fmt::try_init(); + + // Build tokio runtime + let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); + rt_builder.enable_all(); + + // Configure Julia thread adoption for Julia integration + rt_builder.on_thread_start(|| { + #[cfg(feature = "julia")] + { + unsafe { jl_adopt_thread() }; + unsafe { jl_gc_safe_enter() }; + unsafe { jl_gc_disable_finalizers_internal() }; } + }); + + if config.n_threads > 0 { + rt_builder.worker_threads(config.n_threads); + } + + let runtime = match rt_builder.build() { + Ok(rt) => rt, + Err(_) => return CResult::Error, }; - // TODO: Perhaps we should have full asynchronicity that includes the caller code (e.g. Julia) instead of blocking here. - let result: Result = get_runtime().block_on(async { - // println!("DEBUG: Table path: {}", path_str); - // println!("DEBUG: Metadata path: {}", metadata_path_str); - - // Construct the full S3 path by combining table_path and metadata_path - let full_metadata_path = if metadata_path_str.starts_with('/') { - // If metadata_path starts with /, it's absolute, so use it as is - metadata_path_str.to_string() - } else { - // Otherwise, combine table_path with metadata_path - let table_path_trimmed = path_str.trim_end_matches('/'); - let metadata_path_trimmed = metadata_path_str.trim_start_matches('/'); - format!("{}/{}", table_path_trimmed, metadata_path_trimmed) - }; + if RT.set(runtime).is_err() { + return CResult::Error; + } - // println!("DEBUG: Full metadata file path: {}", full_metadata_path); + CResult::Ok +} - let _ = env::var("AWS_ACCESS_KEY_ID").expect("AWS_ACCESS_KEY_ID must be set"); +// Use export_runtime_op! macro for table opening +export_runtime_op!( + iceberg_table_open, + IcebergTableResponse, + || { + let snapshot_path_str = unsafe { + CStr::from_ptr(snapshot_path).to_str() + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in snapshot path: {}", e))? + }; + Ok(snapshot_path_str.to_string()) + }, + full_metadata_path, + async { // Create file IO for S3 let file_io = FileIOBuilder::new("s3").build()?; // Create table identifier let table_ident = TableIdent::from_strs(["default", "table"])?; + // Load the static table let static_table = StaticTable::from_metadata_file(&full_metadata_path, table_ident, file_io).await?; - let iceberg_table = static_table.into_table(); - - Ok(iceberg_table) - }); - - match result { - Ok(iceberg_table) => { - let table_ptr = Box::into_raw(Box::new(IcebergTableInternal { - table: iceberg_table, - })); - unsafe { - *table = table_ptr as *mut IcebergTable; - } - IcebergResult::IcebergOk - } - Err(e) => { - set_error(format!("Failed to open table: {}", e)); - IcebergResult::IcebergError - } - } -} + Ok::(IcebergTable { table: static_table.into_table() }) + }, + snapshot_path: *const c_char +); #[no_mangle] -pub extern "C" fn iceberg_table_free(table: *mut IcebergTable) { - if !table.is_null() { - unsafe { - let _ = Box::from_raw(table as *mut IcebergTableInternal); - } - } -} - -#[no_mangle] -pub extern "C" fn iceberg_table_scan( - table: *mut IcebergTable, - scan: *mut *mut IcebergScan, -) -> IcebergResult { - if table.is_null() || scan.is_null() { - set_error("Null pointer provided".to_string()); - return IcebergResult::IcebergNullPointer; +pub extern "C" fn iceberg_new_scan(table: *mut IcebergTable) -> *mut IcebergScan { + if table.is_null() { + return ptr::null_mut(); } - - clear_error(); - - let table_ref = unsafe { &*(table as *const IcebergTableInternal) }; - - let scan_ptr = Box::into_raw(Box::new(IcebergScanInternal { - table: Some(table_ref.table.clone()), - columns: None, - stream: None, + let table_ref = unsafe { &*table }; + let scan_builder = table_ref.table.scan(); + return Box::into_raw(Box::new(IcebergScan { + builder: Some(scan_builder), + scan: None, })); - - unsafe { - *scan = scan_ptr as *mut IcebergScan; - } - - IcebergResult::IcebergOk } #[no_mangle] -pub extern "C" fn iceberg_scan_select_columns( - scan: *mut IcebergScan, +pub extern "C" fn iceberg_select_columns( + scan: &mut *mut IcebergScan, column_names: *const *const c_char, num_columns: usize, -) -> IcebergResult { - if scan.is_null() || column_names.is_null() { - set_error("Null pointer provided".to_string()); - return IcebergResult::IcebergNullPointer; +) -> CResult { + if scan.is_null() || (*scan).is_null() || column_names.is_null() { + return CResult::Error; } - clear_error(); - - let scan_ref = unsafe { &mut *(scan as *mut IcebergScanInternal) }; - let mut columns = Vec::new(); for i in 0..num_columns { let col_ptr = unsafe { *column_names.add(i) }; if col_ptr.is_null() { - set_error("Null column name pointer".to_string()); - return IcebergResult::IcebergNullPointer; + return CResult::Error; } let col_str = unsafe { match CStr::from_ptr(col_ptr).to_str() { Ok(s) => s, - Err(e) => { - set_error(format!("Invalid UTF-8 in column name: {}", e)); - return IcebergResult::IcebergError; - } + Err(_) => return CResult::Error, } }; - columns.push(col_str.to_string()); } - scan_ref.columns = Some(columns); + let scan_ref = unsafe { Box::from_raw(*scan) }; - IcebergResult::IcebergOk + if scan_ref.builder.is_none() { + return CResult::Error; + } + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref.builder.map(|b| b.select(columns)), + scan: scan_ref.scan, + })); + + return CResult::Ok; } #[no_mangle] -pub extern "C" fn iceberg_scan_free(scan: *mut IcebergScan) { - if !scan.is_null() { - unsafe { - let _ = Box::from_raw(scan as *mut IcebergScanInternal); - } +pub extern "C" fn iceberg_scan_with_data_file_concurrency_limit( + scan: &mut *mut IcebergScan, + n: usize, +) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; } + let scan_ref = unsafe { Box::from_raw(*scan) }; + + if scan_ref.builder.is_none() { + return CResult::Error; + } + + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref + .builder + .map(|b| b.with_data_file_concurrency_limit(n)), + scan: scan_ref.scan, + })); + + return CResult::Ok; } #[no_mangle] -pub extern "C" fn iceberg_scan_next_batch( - scan: *mut IcebergScan, - batch: *mut *mut ArrowBatch, -) -> IcebergResult { - if scan.is_null() || batch.is_null() { - set_error("Null pointer provided".to_string()); - return IcebergResult::IcebergNullPointer; +pub extern "C" fn iceberg_scan_with_manifest_entry_concurrency_limit( + scan: &mut *mut IcebergScan, + n: usize, +) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; } + let scan_ref = unsafe { Box::from_raw(*scan) }; - clear_error(); + if scan_ref.builder.is_none() { + return CResult::Error; + } - let scan_ref = unsafe { &mut *(scan as *mut IcebergScanInternal) }; + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref + .builder + .map(|b| b.with_manifest_entry_concurrency_limit(n)), + scan: scan_ref.scan, + })); - // Initialize stream if not already done - if scan_ref.stream.is_none() { - if let Some(table) = &scan_ref.table { - let columns = scan_ref.columns.clone(); - let stream_result = get_runtime().block_on(async { - let mut scan_builder = table.scan(); + return CResult::Ok; +} - if let Some(cols) = columns { - scan_builder = scan_builder.select(cols); - } +#[no_mangle] +pub extern "C" fn iceberg_scan_with_batch_size(scan: &mut *mut IcebergScan, n: usize) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; + } + let scan_ref = unsafe { Box::from_raw(*scan) }; - match scan_builder.build() { - Ok(table_scan) => match table_scan.to_arrow().await { - Ok(stream) => Ok(stream), - Err(e) => { - set_error(format!("Failed to create arrow stream: {}", e)); - Err(e) - } - }, - Err(e) => { - set_error(format!("Failed to build scan: {}", e)); - Err(e) - } - } - }); + if scan_ref.builder.is_none() { + return CResult::Error; + } - match stream_result { - Ok(stream) => { - scan_ref.stream = Some(Mutex::new(stream)); - } - Err(_) => { - return IcebergResult::IcebergError; - } - } - } else { - set_error("Table not available".to_string()); - return IcebergResult::IcebergError; + assert!(scan_ref.scan.is_none()); + + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref.builder.map(|b| b.with_batch_size(Some(n))), + scan: None, + })); + + return CResult::Ok; +} + +#[no_mangle] +pub extern "C" fn iceberg_scan_build(scan: &mut *mut IcebergScan) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; + } + let scan_ref = unsafe { Box::from_raw(*scan) }; + if scan_ref.builder.is_none() { + return CResult::Error; + } + let builder = scan_ref.builder.unwrap(); + + match builder.build() { + Ok(table_scan) => { + *scan = Box::into_raw(Box::new(IcebergScan { + builder: None, + scan: Some(table_scan), + })); + CResult::Ok } + Err(_) => CResult::Error, } +} - // Get next batch from stream - if let Some(stream_mutex) = &scan_ref.stream { - let result = get_runtime().block_on(async { - let mut stream = stream_mutex.lock().unwrap(); - stream.next().await - }); +// Async function to initialize stream from a table scan without getting first batch +export_runtime_op!( + iceberg_arrow_stream, + IcebergArrowStreamResponse, + || { + if scan.is_null() { + return Err(anyhow::anyhow!("Null scan pointer provided")); + } + let scan_ref = unsafe { &(*scan).scan }; + if scan_ref.is_none() { + return Err(anyhow::anyhow!("Scan not initialized")); + } - match result { - Some(Ok(record_batch)) => match serialize_record_batch(record_batch) { - Ok(arrow_batch) => { - let batch_ptr = Box::into_raw(Box::new(arrow_batch)); - unsafe { - *batch = batch_ptr; - } - IcebergResult::IcebergOk - } - Err(e) => { - set_error(format!("Failed to serialize batch: {}", e)); - IcebergResult::IcebergError - } - }, - Some(Err(e)) => { - set_error(format!("Error reading batch: {}", e)); - IcebergResult::IcebergError + return Ok(scan_ref.as_ref().unwrap()); + }, + scan_ref, + async { + let stream = scan_ref.to_arrow().await?; + Ok::(IcebergArrowStream { + stream: AsyncMutex::new(stream), + }) + }, + scan: *mut IcebergScan +); + +// Async function to get next batch from existing stream +export_runtime_op!( + iceberg_next_batch, + IcebergBatchResponse, + || { + if stream.is_null() { + return Err(anyhow::anyhow!("Null stream pointer provided")); + } + let stream_ref = unsafe { &*stream }; + Ok(stream_ref) + }, + stream_ref, + async { + let mut stream_guard = stream_ref.stream.lock().await; + + match stream_guard.try_next().await { + Ok(Some(record_batch)) => { + Ok(Some(record_batch)) } - None => { + Ok(None) => { // End of stream - unsafe { - *batch = ptr::null_mut(); - } - IcebergResult::IcebergEndOfStream + tracing::debug!("End of stream reached"); + Ok(None) } + Err(e) => Err(anyhow::anyhow!("Error reading batch: {}", e)), + } + }, + stream: *mut IcebergArrowStream +); + +// Synchronous operations +#[no_mangle] +pub extern "C" fn iceberg_table_free(table: *mut IcebergTable) { + if !table.is_null() { + unsafe { + let _ = Box::from_raw(table); } - } else { - set_error("Stream not initialized".to_string()); - IcebergResult::IcebergError } } #[no_mangle] -pub extern "C" fn iceberg_arrow_batch_free(batch: *mut ArrowBatch) { - if !batch.is_null() { +pub extern "C" fn iceberg_scan_free(scan: &mut *mut IcebergScan) { + if !scan.is_null() { unsafe { - let batch_ref = Box::from_raw(batch); - if !batch_ref.rust_ptr.is_null() { - let _ = Box::from_raw(batch_ref.rust_ptr as *mut Vec); - } + let _ = Box::from_raw(*scan); + *scan = ptr::null_mut(); } } } #[no_mangle] -pub extern "C" fn iceberg_error_message() -> *const c_char { - LAST_ERROR.with(|e| { - if let Some(ref error) = *e.borrow() { - match CString::new(error.clone()) { - Ok(cstring) => cstring.into_raw(), - Err(_) => ptr::null(), - } - } else { - ptr::null() +pub extern "C" fn iceberg_arrow_stream_free(stream: *mut IcebergArrowStream) { + if !stream.is_null() { + unsafe { + let _ = Box::from_raw(stream); } - }) + } +} + +#[no_mangle] +pub extern "C" fn iceberg_arrow_batch_free(batch: *mut ArrowBatch) { + if batch.is_null() { + return; + } + + unsafe { + let batch_ref = Box::from_raw(batch); + if !batch_ref.rust_ptr.is_null() { + let _ = Box::from_raw(batch_ref.rust_ptr as *mut Vec); + } + } +} + +// Re-export object_store_ffi utilities +#[no_mangle] +pub extern "C" fn iceberg_destroy_cstring(string: *mut c_char) -> CResult { + destroy_cstring(string) +} + +#[no_mangle] +pub extern "C" fn iceberg_current_metrics() -> *const c_char { + current_metrics() +} + +// Re-export context management functions for cancellation support +#[no_mangle] +pub extern "C" fn iceberg_cancel_context(ctx_ptr: *const Context) -> CResult { + cancel_context(ctx_ptr) +} + +#[no_mangle] +pub extern "C" fn iceberg_destroy_context(ctx_ptr: *const Context) -> CResult { + destroy_context(ctx_ptr) } diff --git a/tests/integration_test.c b/tests/integration_test.c index 65e36cc..a641d2e 100644 --- a/tests/integration_test.c +++ b/tests/integration_test.c @@ -2,22 +2,50 @@ #include #include #include - -// Global function pointers -static iceberg_table_open_func_t iceberg_table_open_func = NULL; -static iceberg_table_free_func_t iceberg_table_free_func = NULL; -static iceberg_table_scan_func_t iceberg_table_scan_func = NULL; -static iceberg_scan_select_columns_func_t iceberg_scan_select_columns_func = NULL; -static iceberg_scan_free_func_t iceberg_scan_free_func = NULL; -static iceberg_scan_next_batch_func_t iceberg_scan_next_batch_func = NULL; -static iceberg_arrow_batch_free_func_t iceberg_arrow_batch_free_func = NULL; -static iceberg_error_message_func_t iceberg_error_message_func = NULL; +#include +#include +#include +#include + +// Global function pointers for new async API +static int (*iceberg_init_runtime_func)(IcebergStaticConfig config, int (*panic_callback)(void), int (*result_callback)(const void*)) = NULL; +static int (*iceberg_table_open_func)(const char*, IcebergTableResponse*, const void*) = NULL; +static IcebergScan* (*iceberg_new_scan_func)(IcebergTable*) = NULL; +static int (*iceberg_scan_build_func)(IcebergScan**) = NULL; +static int (*iceberg_select_columns_func)(IcebergScan**, const char**, size_t) = NULL; +static int (*iceberg_scan_with_batch_size_func)(IcebergScan**, size_t) = NULL; +static int (*iceberg_scan_with_data_file_concurrency_limit_func)(IcebergScan**, size_t) = NULL; +static int (*iceberg_scan_with_manifest_file_concurrency_limit_func)(IcebergScan**, size_t) = NULL; +static void (*iceberg_scan_free_func)(IcebergScan**) = NULL; +static int (*iceberg_arrow_stream_func)(IcebergScan*, IcebergArrowStreamResponse*, const void*) = NULL; +static int (*iceberg_next_batch_func)(IcebergArrowStream*, IcebergBatchResponse*, const void*) = NULL; +static void (*iceberg_table_free_func)(IcebergTable*) = NULL; +static void (*iceberg_arrow_stream_free_func)(IcebergArrowStream*) = NULL; +static void (*iceberg_arrow_batch_free_func)(ArrowBatch*) = NULL; +static int (*iceberg_destroy_cstring_func)(char*) = NULL; +static int (*iceberg_cancel_context_func)(const void*) = NULL; +static int (*iceberg_destroy_context_func)(const void*) = NULL; // Library handle static void* lib_handle = NULL; +// Callback implementations +static int panic_callback(void) { + printf("🚨 Rust panic occurred!\n"); + return 1; +} + +volatile int async_completed = 0; + +static int result_callback(const void* task) { + (void)task; // Suppress unused parameter warning + // Signal that async operation completed + async_completed = 1; + return 0; +} + // Function to load the library and resolve symbols -int load_iceberg_library(const char* library_path) { +static int load_iceberg_library(const char* library_path) { printf("Loading Iceberg C API library from %s...\n", library_path); // Try to open the dynamic library @@ -32,52 +60,105 @@ int load_iceberg_library(const char* library_path) { // Clear any existing error dlerror(); - // Resolve function symbols - iceberg_table_open_func = (iceberg_table_open_func_t)dlsym(lib_handle, "iceberg_table_open"); + // Resolve function symbols for new async API + iceberg_init_runtime_func = (int (*)(IcebergStaticConfig, int (*)(void), int (*)(const void*)))dlsym(lib_handle, "iceberg_init_runtime"); + if (!iceberg_init_runtime_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_init_runtime: %s\n", dlerror()); + return 0; + } + + iceberg_table_open_func = (int (*)(const char*, IcebergTableResponse*, const void*))dlsym(lib_handle, "iceberg_table_open"); if (!iceberg_table_open_func) { fprintf(stderr, "❌ Failed to resolve iceberg_table_open: %s\n", dlerror()); return 0; } - iceberg_table_free_func = (iceberg_table_free_func_t)dlsym(lib_handle, "iceberg_table_free"); - if (!iceberg_table_free_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_table_free: %s\n", dlerror()); + iceberg_new_scan_func = (IcebergScan* (*)(IcebergTable*))dlsym(lib_handle, "iceberg_new_scan"); + if (!iceberg_new_scan_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_new_scan: %s\n", dlerror()); + return 0; + } + + iceberg_scan_build_func = (int (*)(IcebergScan**))dlsym(lib_handle, "iceberg_scan_build"); + if (!iceberg_scan_build_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_scan_build: %s\n", dlerror()); + return 0; + } + + iceberg_select_columns_func = (int (*)(IcebergScan**, const char**, size_t))dlsym(lib_handle, "iceberg_select_columns"); + if (!iceberg_select_columns_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_select_columns: %s\n", dlerror()); return 0; } - iceberg_table_scan_func = (iceberg_table_scan_func_t)dlsym(lib_handle, "iceberg_table_scan"); - if (!iceberg_table_scan_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_table_scan: %s\n", dlerror()); + iceberg_scan_with_batch_size_func = (int (*)(IcebergScan**, size_t))dlsym(lib_handle, "iceberg_scan_with_batch_size"); + if (!iceberg_scan_with_batch_size_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_scan_with_batch_size: %s\n" , dlerror()); return 0; } - iceberg_scan_select_columns_func = (iceberg_scan_select_columns_func_t)dlsym(lib_handle, "iceberg_scan_select_columns"); - if (!iceberg_scan_select_columns_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_scan_select_columns: %s\n", dlerror()); + iceberg_scan_with_data_file_concurrency_limit_func = (int (*)(IcebergScan**, size_t))dlsym(lib_handle, "iceberg_scan_with_data_file_concurrency_limit"); + if (!iceberg_scan_with_data_file_concurrency_limit_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_scan_with_data_file_concurrency_limit: %s\n", dlerror()); return 0; } - iceberg_scan_free_func = (iceberg_scan_free_func_t)dlsym(lib_handle, "iceberg_scan_free"); + iceberg_scan_with_manifest_file_concurrency_limit_func = (int (*)(IcebergScan**, size_t))dlsym(lib_handle, "iceberg_scan_with_manifest_entry_concurrency_limit"); + if (!iceberg_scan_with_manifest_file_concurrency_limit_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_scan_with_manifest_entry_concurrency_limit: %s\n", dlerror()); + return 0; + } + + iceberg_arrow_stream_func = (int (*)(IcebergScan*, IcebergArrowStreamResponse*, const void*))dlsym(lib_handle, "iceberg_arrow_stream"); + if (!iceberg_arrow_stream_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_arrow_stream: %s\n", dlerror()); + return 0; + } + iceberg_next_batch_func = (int (*)(IcebergArrowStream*, IcebergBatchResponse*, const void*))dlsym(lib_handle, "iceberg_next_batch"); + if (!iceberg_next_batch_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_next_batch: %s\n", dlerror()); + return 0; + } + + iceberg_table_free_func = (void (*)(IcebergTable*))dlsym(lib_handle, "iceberg_table_free"); + if (!iceberg_table_free_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_table_free: %s\n", dlerror()); + return 0; + } + + iceberg_scan_free_func = (void (*)(IcebergScan**))dlsym(lib_handle, "iceberg_scan_free"); if (!iceberg_scan_free_func) { fprintf(stderr, "❌ Failed to resolve iceberg_scan_free: %s\n", dlerror()); return 0; } - iceberg_scan_next_batch_func = (iceberg_scan_next_batch_func_t)dlsym(lib_handle, "iceberg_scan_next_batch"); - if (!iceberg_scan_next_batch_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_scan_next_batch: %s\n", dlerror()); + iceberg_arrow_stream_free_func = (void (*)(IcebergArrowStream*))dlsym(lib_handle, "iceberg_arrow_stream_free"); + if (!iceberg_arrow_stream_free_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_arrow_stream_free: %s\n", dlerror()); return 0; } - iceberg_arrow_batch_free_func = (iceberg_arrow_batch_free_func_t)dlsym(lib_handle, "iceberg_arrow_batch_free"); + iceberg_arrow_batch_free_func = (void (*)(ArrowBatch*))dlsym(lib_handle, "iceberg_arrow_batch_free"); if (!iceberg_arrow_batch_free_func) { fprintf(stderr, "❌ Failed to resolve iceberg_arrow_batch_free: %s\n", dlerror()); return 0; } - iceberg_error_message_func = (iceberg_error_message_func_t)dlsym(lib_handle, "iceberg_error_message"); - if (!iceberg_error_message_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_error_message: %s\n", dlerror()); + iceberg_destroy_cstring_func = (int (*)(char*))dlsym(lib_handle, "iceberg_destroy_cstring"); + if (!iceberg_destroy_cstring_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_destroy_cstring: %s\n", dlerror()); + return 0; + } + + iceberg_cancel_context_func = (int (*)(const void*))dlsym(lib_handle, "iceberg_cancel_context"); + if (!iceberg_cancel_context_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_cancel_context: %s\n", dlerror()); + return 0; + } + + iceberg_destroy_context_func = (int (*)(const void*))dlsym(lib_handle, "iceberg_destroy_context"); + if (!iceberg_destroy_context_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_destroy_context: %s\n", dlerror()); return 0; } @@ -86,7 +167,7 @@ int load_iceberg_library(const char* library_path) { } // Function to unload the library -void unload_iceberg_library() { +static void unload_iceberg_library(void) { if (lib_handle) { dlclose(lib_handle); lib_handle = NULL; @@ -94,8 +175,16 @@ void unload_iceberg_library() { } } +void wait_until_completed(uintptr_t* async_completed, int timeout) { + printf("⏳ Waiting for table open to complete...\n"); + while (!(*async_completed) && timeout > 0) { + usleep(100000); // 100ms + timeout--; + } +} + int main(int argc, char* argv[]) { - printf("Starting Iceberg C API integration test with dynamic loading...\n"); + printf("Starting Iceberg C API integration test with new async API...\n"); // Check for one command line argument (the path to the library) if (argc < 2) { @@ -103,73 +192,222 @@ int main(int argc, char* argv[]) { return 1; } + // Check if environment variables are set + printf("Environment variables:\n"); + printf(" AWS_ACCESS_KEY_ID: %s\n", getenv("AWS_ACCESS_KEY_ID") ? "SET" : "NOT SET"); + printf(" AWS_SECRET_ACCESS_KEY: %s\n", getenv("AWS_SECRET_ACCESS_KEY") ? "SET" : "NOT SET"); + printf(" AWS_DEFAULT_REGION: %s\n", getenv("AWS_DEFAULT_REGION") ? getenv("AWS_DEFAULT_REGION") : "NOT SET"); + printf(" AWS_ENDPOINT_URL: %s\n", getenv("AWS_ENDPOINT_URL") ? getenv("AWS_ENDPOINT_URL") : "NOT SET"); + + // Load the library if (!load_iceberg_library(argv[1])) { fprintf(stderr, "Failed to load Iceberg library\n"); return 1; } - IcebergTable* table = NULL; - IcebergScan* scan = NULL; + // 1. Initialize the runtime + printf("Initializing Iceberg runtime...\n"); + IcebergStaticConfig config = {0}; // Default config - 0 threads means use default + int result = iceberg_init_runtime_func(config, panic_callback, result_callback); + if (result != CRESULT_OK) { + printf("❌ Failed to initialize runtime\n"); + unload_iceberg_library(); + return 1; + } + printf("✅ Runtime initialized successfully\n"); + + // 2. Open table using async API + const char* snapshot_path = "s3://warehouse/tpch.sf01/nation/metadata/00001-44f668fe-3688-49d5-851f-36e75d143321.metadata.json"; + printf("Opening table at: %s\n", snapshot_path); + + IcebergTableResponse table_response = {0}; + async_completed = 0; // Reset flag + result = iceberg_table_open_func(snapshot_path, &table_response, (const void*)(uintptr_t)&async_completed); + + if (result != CRESULT_OK) { + printf("❌ Failed to initiate table open operation\n"); + unload_iceberg_library(); + return 1; + } - // 1. Open table from folder path - const char* table_path = "s3://warehouse/tpch.sf01/nation"; - const char* metadata_path = "metadata/00001-44f668fe-3688-49d5-851f-36e75d143321.metadata.json"; - printf("Opening table at: %s\n", table_path); - printf("Using metadata file: %s\n", metadata_path); + // Wait for async operation to complete + wait_until_completed((uintptr_t*)&async_completed, 100); - IcebergResult result = iceberg_table_open_func(table_path, metadata_path, &table); - if (result != ICEBERG_OK) { - printf("❌ Failed to open table: %s\n", iceberg_error_message_func()); + if (!async_completed) { + printf("❌ Async operation timed out\n"); unload_iceberg_library(); return 1; } + + // Check if the operation was successful + if (table_response.result != CRESULT_OK) { + printf("❌ Failed to open table (result=%d)", table_response.result); + if (table_response.error_message) { + printf(": %s", table_response.error_message); + iceberg_destroy_cstring_func(table_response.error_message); + } + printf("\n"); + unload_iceberg_library(); + return 1; + } + + if (!table_response.table) { + printf("❌ No table returned from open operation\n"); + unload_iceberg_library(); + return 1; + } + printf("✅ Table opened successfully\n"); - // 2. Create a scan - result = iceberg_table_scan_func(table, &scan); - if (result != ICEBERG_OK) { - printf("❌ Failed to create scan: %s\n", iceberg_error_message_func()); - iceberg_table_free_func(table); + // 3. Create a scan using async API + IcebergScan *scan = iceberg_new_scan_func(table_response.table); + + if (scan == NULL) { + printf("❌ Failed to create scan\n"); + iceberg_table_free_func(table_response.table); unload_iceberg_library(); return 1; } - printf("✅ Scan created successfully\n"); - // 3. Optionally select specific columns (commented out since we don't know schema yet) - // const char* columns[] = {"id", "value"}; - // iceberg_scan_select_columns_func(scan, columns, 2); + result = iceberg_scan_with_data_file_concurrency_limit_func(&scan, 4); + if (result != CRESULT_OK) { + printf("❌ Failed to set data file concurrency limit\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + result = iceberg_scan_with_manifest_file_concurrency_limit_func(&scan, 4); + if (result != CRESULT_OK) { + printf("❌ Failed to set manifest file concurrency limit\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + result = iceberg_scan_with_batch_size_func(&scan, 1024); + if (result != CRESULT_OK) { + printf("❌ Failed to set batch size\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + result = iceberg_select_columns_func(&scan, (const char*[]){"n_nationkey", "n_name", "n_regionkey", "n_comment"}, 4); + if (result != CRESULT_OK) { + printf("❌ Failed to select columns\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + result = iceberg_scan_build_func(&scan); + + if (result != CRESULT_OK) { + printf("❌ Failed to initiate scan creation\n"); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + // Print the scan pointer + printf("Scan pointer: %p\n", (void*)scan); + + printf("✅ Scan created successfully\n"); - // 4. Iterate through Arrow batches as serialized bytes - int batch_count = 0; - size_t total_bytes = 0; + printf("Step 1: Initializing stream asynchronously...\n"); + IcebergArrowStreamResponse stream_response = {0}; + async_completed = 0; + result = iceberg_arrow_stream_func(scan, &stream_response, (const void*)(uintptr_t)&async_completed); + if (result != CRESULT_OK) { + printf("❌ Failed to create stream\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } - while (true) { - ArrowBatch* batch = NULL; + // Wait for completion + wait_until_completed((uintptr_t*)&async_completed, 100); - result = iceberg_scan_next_batch_func(scan, &batch); + if (!async_completed) { + printf("❌ Stream creation async operation timed out\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } - if (result == ICEBERG_END_OF_STREAM) { - printf("✅ Reached end of stream\n"); - break; + // Check if operation was successful + if (stream_response.result != CRESULT_OK) { + printf("❌ Failed to create stream"); + if (stream_response.error_message) { + printf(": %s", stream_response.error_message); + iceberg_destroy_cstring_func(stream_response.error_message); } + printf("\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + if (!stream_response.stream) { + printf("❌ No stream returned from stream creation\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + printf("✅ Stream created successfully\n"); - if (result != ICEBERG_OK) { - printf("❌ Failed to get next batch: %s\n", iceberg_error_message_func()); - break; + // 4. Try to get a batch using new two-step async API + printf("Step 2: Getting first batch from stream asynchronously...\n"); + IcebergBatchResponse batch_response = {0}; + async_completed = 0; // Reset flag + result = iceberg_next_batch_func(stream_response.stream, &batch_response, (const void*)(uintptr_t)&async_completed); + + if (result == CRESULT_OK) { + wait_until_completed((uintptr_t*)&async_completed, 100); + + if (!async_completed) { + printf("❌ Batch retrieval async operation timed out\n"); + iceberg_arrow_stream_free_func(stream_response.stream); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; } + } - if (batch == NULL) { - printf("❌ Received NULL batch\n"); - break; + if (result != CRESULT_OK) { + printf("❌ Failed to get first batch from stream\n"); + if (batch_response.error_message) { + printf(" Error: %s\n", batch_response.error_message); + iceberg_destroy_cstring_func(batch_response.error_message); } + iceberg_arrow_stream_free_func(stream_response.stream); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + printf("Step 3: Checking batch result...\n"); - batch_count++; - total_bytes += batch->length; + ArrowBatch* batch = batch_response.batch; - printf("📦 Batch %d:\n", batch_count); + if (batch) { + printf("✅ Successfully retrieved batch!\n"); + printf("📦 Batch details:\n"); printf(" - Serialized size: %zu bytes\n", batch->length); - printf(" - Data pointer: %p\n", (void*)batch->data); + printf(" - Data pointer: %p\n", (const void*)batch->data); printf(" - First few bytes: "); // Print first 8 bytes as hex for verification @@ -178,27 +416,47 @@ int main(int argc, char* argv[]) { printf("%02x ", batch->data[i]); } printf("\n"); - - // This is where you would pass the serialized Arrow data to Julia - // In Julia, you would: - // 1. Create an IOBuffer from the bytes: IOBuffer(unsafe_wrap(Array, batch->data, batch->length)) - // 2. Use Arrow.jl to read: Arrow.Stream(io_buffer) printf(" → Arrow IPC bytes ready for Julia Arrow.Stream()\n"); - // Free the batch (this calls back to Rust to free memory) + // Free the batch directly iceberg_arrow_batch_free_func(batch); + } else { + printf("✅ Reached end of stream (no more batches)\n"); } - printf("📊 Summary:\n"); - printf(" - Total batches: %d\n", batch_count); - printf(" - Total bytes processed: %zu\n", total_bytes); + // 4. Test context cancellation functions + printf("Testing context cancellation functions...\n"); + + // Test that cancellation functions can be called with valid context pointers + if (table_response.context != NULL) { + printf(" - Testing cancel_context with table context...\n"); + int cancel_result = iceberg_cancel_context_func(table_response.context); + if (cancel_result == 0) { + printf(" ✅ cancel_context succeeded\n"); + } else { + printf(" ⚠️ cancel_context returned: %d\n", cancel_result); + } + + printf(" - Testing destroy_context with table context...\n"); + int destroy_result = iceberg_destroy_context_func(table_response.context); + if (destroy_result == 0) { + printf(" ✅ destroy_context succeeded\n"); + } else { + printf(" ⚠️ destroy_context returned: %d\n", destroy_result); + } + table_response.context = NULL; // Mark as cleaned up + } + + printf("✅ Context cancellation functions tested successfully\n"); // 5. Cleanup - iceberg_scan_free_func(scan); - iceberg_table_free_func(table); + printf("Cleaning up resources...\n"); + iceberg_arrow_stream_free_func(stream_response.stream); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); unload_iceberg_library(); printf("✅ Integration test completed successfully!\n"); - printf("🚀 Ready for Julia bindings integration\n"); + printf("🚀 New async API is working correctly\n"); return 0; -} \ No newline at end of file +}