diff --git a/.gitignore b/.gitignore index 5d54f0d..04bf5f6 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ target/ # IDE Specific configurations .idea/ .helix/ + +.claude/ diff --git a/Cargo.lock b/Cargo.lock index 7d8697c..dc9f50c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.100" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" [[package]] name = "arrayref" @@ -75,6 +75,17 @@ dependencies = [ "syn", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -103,12 +114,6 @@ dependencies = [ "fs_extra", ] -[[package]] -name = "base64" -version = "0.22.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" - [[package]] name = "base64ct" version = "1.8.3" @@ -132,7 +137,7 @@ dependencies = [ "cc", "cfg-if", "constant_time_eq", - "cpufeatures", + "cpufeatures 0.2.17", ] [[package]] @@ -168,12 +173,6 @@ dependencies = [ "shlex", ] -[[package]] -name = "cesu8" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" - [[package]] name = "cfg-if" version = "1.0.4" @@ -181,10 +180,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" [[package]] -name = "cfg_aliases" -version = "0.2.1" +name = "chacha20" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.0", +] [[package]] name = "cmake" @@ -195,16 +199,6 @@ dependencies = [ "cc", ] -[[package]] -name = "combine" -version = "4.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" -dependencies = [ - "bytes", - "memchr", -] - [[package]] name = "compact_str" version = "0.9.0" @@ -270,6 +264,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -425,6 +428,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -542,10 +551,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", - "js-sys", "libc", "wasi", - "wasm-bindgen", ] [[package]] @@ -555,11 +562,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", - "js-sys", "libc", "r-efi", "wasip2", - "wasm-bindgen", +] + +[[package]] +name = "getrandom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "rand_core 0.10.0", + "wasip2", + "wasip3", ] [[package]] @@ -568,6 +587,34 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + [[package]] name = "hashbrown" version = "0.16.1" @@ -629,6 +676,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http", "http-body", "httparse", @@ -650,6 +698,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -662,16 +711,13 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ - "base64", "bytes", "futures-channel", "futures-util", "http", "http-body", "hyper", - "ipnet", "libc", - "percent-encoding", "pin-project-lite", "socket2", "tokio", @@ -760,6 +806,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "ident_case" version = "1.0.1" @@ -794,23 +846,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", - "hashbrown", -] - -[[package]] -name = "ipnet" -version = "2.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" - -[[package]] -name = "iri-string" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" -dependencies = [ - "memchr", + "hashbrown 0.16.1", "serde", + "serde_core", ] [[package]] @@ -828,28 +866,6 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" -[[package]] -name = "jni" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" -dependencies = [ - "cesu8", - "cfg-if", - "combine", - "jni-sys", - "log", - "thiserror 1.0.69", - "walkdir", - "windows-sys 0.45.0", -] - -[[package]] -name = "jni-sys" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" - [[package]] name = "jobserver" version = "0.1.34" @@ -870,11 +886,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" -version = "0.2.180" +version = "0.2.181" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5" [[package]] name = "litemap" @@ -897,17 +919,11 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" -[[package]] -name = "lru-slab" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" - [[package]] name = "memchr" -version = "2.7.6" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "mime" @@ -1045,6 +1061,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -1086,62 +1112,6 @@ dependencies = [ "syn", ] -[[package]] -name = "quinn" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" -dependencies = [ - "bytes", - "cfg_aliases", - "pin-project-lite", - "quinn-proto", - "quinn-udp", - "rustc-hash", - "rustls", - "socket2", - "thiserror 2.0.18", - "tokio", - "tracing", - "web-time", -] - -[[package]] -name = "quinn-proto" -version = "0.11.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" -dependencies = [ - "aws-lc-rs", - "bytes", - "getrandom 0.3.4", - "lru-slab", - "rand", - "ring", - "rustc-hash", - "rustls", - "rustls-pki-types", - "slab", - "thiserror 2.0.18", - "tinyvec", - "tracing", - "web-time", -] - -[[package]] -name = "quinn-udp" -version = "0.5.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" -dependencies = [ - "cfg_aliases", - "libc", - "once_cell", - "socket2", - "tracing", - "windows-sys 0.60.2", -] - [[package]] name = "quote" version = "1.0.44" @@ -1164,7 +1134,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ "rand_chacha", - "rand_core", + "rand_core 0.9.5", +] + +[[package]] +name = "rand" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" +dependencies = [ + "chacha20", + "getrandom 0.4.1", + "rand_core 0.10.0", ] [[package]] @@ -1174,7 +1155,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.5", ] [[package]] @@ -1186,6 +1167,12 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_core" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" + [[package]] name = "redox_syscall" version = "0.5.18" @@ -1230,47 +1217,6 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" -[[package]] -name = "reqwest" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" -dependencies = [ - "base64", - "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-rustls", - "hyper-util", - "js-sys", - "log", - "percent-encoding", - "pin-project-lite", - "quinn", - "rustls", - "rustls-pki-types", - "rustls-platform-verifier", - "serde", - "serde_json", - "serde_urlencoded", - "sync_wrapper", - "tokio", - "tokio-rustls", - "tokio-util", - "tower", - "tower-http", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "wasm-streams", - "web-sys", -] - [[package]] name = "ring" version = "0.17.14" @@ -1314,12 +1260,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "rustc-hash" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" - [[package]] name = "rustc_version" version = "0.4.1" @@ -1361,37 +1301,9 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" dependencies = [ - "web-time", "zeroize", ] -[[package]] -name = "rustls-platform-verifier" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" -dependencies = [ - "core-foundation", - "core-foundation-sys", - "jni", - "log", - "once_cell", - "rustls", - "rustls-native-certs", - "rustls-platform-verifier-android", - "rustls-webpki", - "security-framework", - "security-framework-sys", - "webpki-root-certs", - "windows-sys 0.61.2", -] - -[[package]] -name = "rustls-platform-verifier-android" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" - [[package]] name = "rustls-webpki" version = "0.103.9" @@ -1412,9 +1324,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" [[package]] name = "s2-api" @@ -1436,7 +1348,7 @@ dependencies = [ "serde", "serde_json", "strum", - "thiserror 2.0.18", + "thiserror", "time", "tokio-util", "zstd", @@ -1456,7 +1368,7 @@ dependencies = [ "http", "serde", "strum", - "thiserror 2.0.18", + "thiserror", "time", ] @@ -1467,21 +1379,26 @@ dependencies = [ "assert_matches", "async-compression", "async-stream", + "async-trait", "bytes", "futures", "http", + "http-body-util", "hyper", + "hyper-rustls", + "hyper-util", "prost", - "rand", - "reqwest", + "rand 0.10.0", "rstest", + "rustls", "s2-api", "s2-common", "secrecy", "serde", "serde_json", + "serde_urlencoded", "test-context", - "thiserror 2.0.18", + "thiserror", "time", "tokio", "tokio-muxt", @@ -1494,15 +1411,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.28" @@ -1705,15 +1613,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "sync_wrapper" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" -dependencies = [ - "futures-core", -] - [[package]] name = "synstructure" version = "0.13.2" @@ -1746,33 +1645,13 @@ dependencies = [ "syn", ] -[[package]] -name = "thiserror" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" -dependencies = [ - "thiserror-impl 1.0.69", -] - [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.18", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "thiserror-impl", ] [[package]] @@ -1827,21 +1706,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "tinyvec" -version = "1.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" - [[package]] name = "tokio" version = "1.49.0" @@ -1960,57 +1824,13 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.6+spec-1.1.0" +version = "1.0.7+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" +checksum = "247eaa3197818b831697600aadf81514e577e0cba5eab10f7e064e78ae154df1" dependencies = [ "winnow", ] -[[package]] -name = "tower" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" -dependencies = [ - "futures-core", - "futures-util", - "pin-project-lite", - "sync_wrapper", - "tokio", - "tower-layer", - "tower-service", -] - -[[package]] -name = "tower-http" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" -dependencies = [ - "async-compression", - "bitflags", - "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-body-util", - "iri-string", - "pin-project-lite", - "tokio", - "tokio-util", - "tower", - "tower-layer", - "tower-service", -] - -[[package]] -name = "tower-layer" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" - [[package]] name = "tower-service" version = "0.3.3" @@ -2056,9 +1876,15 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "unicode-ident" -version = "1.0.22" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "537dd038a89878be9b64dd4bd1b260315c1bb94f4d784956b81e27a088d9a09e" + +[[package]] +name = "unicode-xid" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" [[package]] name = "untrusted" @@ -2098,20 +1924,10 @@ checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" dependencies = [ "getrandom 0.3.4", "js-sys", - "rand", + "rand 0.9.2", "wasm-bindgen", ] -[[package]] -name = "walkdir" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" -dependencies = [ - "same-file", - "winapi-util", -] - [[package]] name = "want" version = "0.3.1" @@ -2136,6 +1952,15 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.108" @@ -2149,20 +1974,6 @@ dependencies = [ "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.58" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70a6e77fd0ae8029c9ea0063f87c46fde723e7d887703d74ad2616d792e51e6f" -dependencies = [ - "cfg-if", - "futures-util", - "js-sys", - "once_cell", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "wasm-bindgen-macro" version = "0.2.108" @@ -2196,54 +2007,37 @@ dependencies = [ ] [[package]] -name = "wasm-streams" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d1ec4f6517c9e11ae630e200b2b65d193279042e28edd4a2cda233e46670bbb" -dependencies = [ - "futures-util", - "js-sys", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - -[[package]] -name = "web-sys" -version = "0.3.85" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "312e32e551d92129218ea9a2452120f4aabc03529ef03e4d0d82fb2780608598" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - -[[package]] -name = "web-time" -version = "1.1.0" +name = "wasm-encoder" +version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" dependencies = [ - "js-sys", - "wasm-bindgen", + "leb128fmt", + "wasmparser", ] [[package]] -name = "webpki-root-certs" -version = "1.0.6" +name = "wasm-metadata" +version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ - "rustls-pki-types", + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", ] [[package]] -name = "winapi-util" -version = "0.1.11" +name = "wasmparser" +version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "windows-sys 0.61.2", + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", ] [[package]] @@ -2252,15 +2046,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" -[[package]] -name = "windows-sys" -version = "0.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.2", -] - [[package]] name = "windows-sys" version = "0.52.0" @@ -2288,21 +2073,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-targets" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows-targets" version = "0.52.6" @@ -2336,12 +2106,6 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -2354,12 +2118,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -2372,12 +2130,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" -[[package]] -name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2402,12 +2154,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" - [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -2420,12 +2166,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -2438,12 +2178,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -2456,12 +2190,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -2488,6 +2216,88 @@ name = "wit-bindgen" version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] [[package]] name = "writeable" @@ -2520,18 +2330,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.38" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57cf3aa6855b23711ee9852dfc97dfaa51c45feaba5b645d0c777414d494a961" +checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.38" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a616990af1a287837c4fe6596ad77ef57948f787e46ce28e166facc0cc1cb75" +checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" dependencies = [ "proc-macro2", "quote", @@ -2600,9 +2410,9 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" +checksum = "4de98dfa5d5b7fef4ee834d0073d560c9ca7b6c46a71d058c48db7960f8cfaf7" [[package]] name = "zstd" diff --git a/Cargo.toml b/Cargo.toml index 15dc85d..4e35d65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,30 +18,41 @@ name = "create_basin" doc-scrape-examples = true [dependencies] -async-compression = "0.4.39" +async-compression = { version = "0.4", features = ["tokio", "gzip", "zstd"] } async-stream = "0.3.6" +async-trait = "0.1" bytes = "1.11.0" futures = "0.3.31" http = "1.4.0" +http-body-util = "0.1" hyper = "1.8.1" +hyper-rustls = { version = "0.27", default-features = false, features = [ + "http2", + "native-tokio", + "aws-lc-rs", + "tls12", +] } +hyper-util = { version = "0.1", features = [ + "client-legacy", + "tokio", + "http1", + "http2", +] } prost = "0.14.3" -rand = "0.9.2" -reqwest = { version = "0.13.2", default-features = false, features = [ - "json", - "stream", - "query", - "gzip", - "zstd", - "rustls", +rand = "0.10.0" +rustls = { version = "0.23", default-features = false, features = [ + "aws-lc-rs", + "std", ] } s2-api = "0.27" s2-common = "0.27" secrecy = "0.10.3" serde = { version = "1.0.228" } serde_json = "1.0.149" +serde_urlencoded = "0.7" thiserror = "2.0.18" time = "0.3.47" -tokio = { version = "1.49.0", features = ["time", "macros", "io-util"] } +tokio = { version = "1.49.0", features = ["time", "macros", "io-util", "sync"] } tokio-muxt = "0.7.0" tokio-stream = "0.1.18" tokio-util = { version = "0.7.18", features = ["rt", "codec"] } diff --git a/src/api.rs b/src/api.rs index a547191..205011f 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,18 +1,17 @@ +use crate::client::{self, StreamingResponse, UnaryResponse}; use crate::retry::{RetryBackoff, RetryBackoffBuilder}; use crate::types::{ AccessTokenId, BasinAuthority, BasinName, Compression, RetryConfig, S2Config, S2Endpoints, StreamName, }; -use async_compression::Level; -use async_compression::tokio::write::{GzipEncoder, ZstdEncoder}; use async_stream::try_stream; +use async_trait::async_trait; use bytes::BytesMut; use futures::{Stream, StreamExt}; use http::header::InvalidHeaderValue; -use http::header::{ACCEPT, AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE}; +use http::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE}; use http::{HeaderMap, HeaderValue, StatusCode}; use prost::{self, Message}; -use reqwest::{Request, Response}; use s2_api::v1::access::{ AccessTokenInfo, IssueAccessTokenResponse, ListAccessTokensRequest, ListAccessTokensResponse, }; @@ -32,7 +31,6 @@ use std::ops::Deref; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tokio::io::AsyncWriteExt; use tokio_util::codec::Decoder; use tracing::{debug, warn}; use url::Url; @@ -43,7 +41,6 @@ const ACCEPT_PROTO: &str = "application/protobuf"; const S2_REQUEST_TOKEN: &str = "s2-request-token"; const S2_BASIN: &str = "s2-basin"; const RETRY_AFTER_MS_HEADER: &str = "retry-after-ms"; -const SESSION_REQUEST_TIMEOUT: Duration = Duration::from_secs(u64::MAX); #[derive(Debug, Clone)] pub struct AccountClient { @@ -53,14 +50,13 @@ pub struct AccountClient { } impl AccountClient { - pub fn init(config: S2Config) -> Result { + pub fn init(config: S2Config, client: BaseClient) -> Self { let base_url = base_url(&config.endpoints, ClientKind::Account); - let client = BaseClient::init(&config)?; - Ok(Self { + Self { client, config: Arc::new(config), base_url, - }) + } } pub fn basin_client(&self, name: BasinName) -> BasinClient { @@ -74,7 +70,7 @@ impl AccountClient { let url = self.base_url.join("v1/access-tokens")?; let request = self.get(url).query(&request).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn issue_access_token( @@ -84,7 +80,7 @@ impl AccountClient { let url = self.base_url.join("v1/access-tokens")?; let request = self.post(url).json(&info).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn revoke_access_token(&self, id: AccessTokenId) -> Result<(), ApiError> { @@ -103,7 +99,7 @@ impl AccountClient { let url = self.base_url.join("v1/basins")?; let request = self.get(url).query(&request).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn create_basin( @@ -118,14 +114,14 @@ impl AccountClient { .json(&request) .build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn get_basin_config(&self, name: BasinName) -> Result { let url = self.base_url.join(&format!("v1/basins/{name}"))?; let request = self.get(url).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn reconfigure_basin( @@ -136,7 +132,7 @@ impl AccountClient { let url = self.base_url.join(&format!("v1/basins/{name}"))?; let request = self.patch(url).json(&config).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn delete_basin( @@ -160,7 +156,7 @@ impl AccountClient { let url = self.base_url.join("v1/metrics")?; let request = self.get(url).query(&request).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn get_basin_metrics( @@ -171,7 +167,7 @@ impl AccountClient { let url = self.base_url.join(&format!("v1/metrics/{name}"))?; let request = self.get(url).query(&request).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn get_stream_metrics( @@ -186,7 +182,7 @@ impl AccountClient { ))?; let request = self.get(url).query(&request).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } } @@ -217,7 +213,7 @@ impl BasinClient { } } - fn request(&self, mut request: Request) -> RequestBuilder<'_> { + fn request(&self, mut request: client::Request) -> RequestBuilder<'_> { if matches!( self.config.endpoints.basin_authority, BasinAuthority::Direct(_) @@ -237,7 +233,7 @@ impl BasinClient { let url = self.base_url.join("v1/streams")?; let request = self.get(url).query(&request).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn create_stream( @@ -252,7 +248,7 @@ impl BasinClient { .json(&request) .build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn get_stream_config(&self, name: StreamName) -> Result { @@ -261,7 +257,7 @@ impl BasinClient { .join(&format!("v1/streams/{}", urlencoding::encode(&name)))?; let request = self.get(url).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn reconfigure_stream( @@ -274,7 +270,7 @@ impl BasinClient { .join(&format!("v1/streams/{}", urlencoding::encode(&name)))?; let request = self.patch(url).json(&config).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn delete_stream( @@ -300,7 +296,7 @@ impl BasinClient { ))?; let request = self.get(url).build()?; let response = self.request(request).send().await?; - Ok(response.json::().await?) + Ok(response.json::()?) } pub async fn append( @@ -321,21 +317,21 @@ impl BasinClient { let response = self .request(request) .with_retry_enabled(retry_enabled) - .error_handler(|status, response| async move { + .error_handler(|status, response| { if status == StatusCode::PRECONDITION_FAILED { Err(ApiError::AppendConditionFailed( - response.json::().await?, + response.json::()?, )) } else { Err(ApiError::Server( status, - response.json::().await?, + response.json::()?, )) } }) .send() .await?; - Ok(AppendAck::decode(response.bytes().await?)?) + Ok(AppendAck::decode(response.into_bytes())?) } pub async fn read( @@ -358,7 +354,7 @@ impl BasinClient { .error_handler(read_response_error_handler) .send() .await?; - Ok(ReadBatch::decode(response.bytes().await?)?) + Ok(ReadBatch::decode(response.into_bytes())?) } pub async fn append_session( @@ -379,14 +375,21 @@ impl BasinClient { s2s::SessionMessage::regular(compression, &input).map(|msg| msg.encode()) }); - let mut request = self + let mut request_builder = self + .client .post(url) .header(CONTENT_TYPE, CONTENT_TYPE_S2S) - .body(reqwest::Body::wrap_stream(encoded_stream)) - .timeout(SESSION_REQUEST_TIMEOUT); - request = add_basin_header_if_required(request, &self.config.endpoints, &self.name); - let response = request.send().await?.into_result().await?; - let mut bytes_stream = response.bytes_stream(); + .body(client::Body::wrap_stream(encoded_stream)) + .timeout(self.client.request_timeout); + request_builder = + add_basin_header_if_required(request_builder, &self.config.endpoints, &self.name); + let response = self + .client + .init_streaming(request_builder.build()?) + .await? + .into_result() + .await?; + let mut bytes_stream = response.stream(); let mut buffer = BytesMut::new(); let mut decoder = FrameDecoder; @@ -422,20 +425,22 @@ impl BasinClient { .base_url .join(&format!("v1/streams/{}/records", urlencoding::encode(name)))?; - let mut request = self + let mut request_builder = self .client .get(url) .header(CONTENT_TYPE, CONTENT_TYPE_S2S) .query(&start) .query(&end) - .timeout(SESSION_REQUEST_TIMEOUT); - request = add_basin_header_if_required(request, &self.config.endpoints, &self.name); - let response = request - .send() + .timeout(self.client.request_timeout); + request_builder = + add_basin_header_if_required(request_builder, &self.config.endpoints, &self.name); + let response = self + .client + .init_streaming(request_builder.build()?) .await? - .into_result_with_handler(read_response_error_handler) + .into_result() .await?; - let mut bytes_stream = response.bytes_stream(); + let mut bytes_stream = response.stream(); let mut buffer = BytesMut::new(); let mut decoder = FrameDecoder; @@ -462,18 +467,16 @@ impl BasinClient { } } -async fn read_response_error_handler( +fn read_response_error_handler( status: StatusCode, - response: Response, -) -> Result { + response: UnaryResponse, +) -> Result { if status == StatusCode::RANGE_NOT_SATISFIABLE { - Err(ApiError::ReadUnwritten( - response.json::().await?, - )) + Err(ApiError::ReadUnwritten(response.json::()?)) } else { Err(ApiError::Server( status, - response.json::().await?, + response.json::()?, )) } } @@ -535,8 +538,8 @@ impl ApiError { } } -impl From for ApiError { - fn from(err: reqwest::Error) -> Self { +impl From for ApiError { + fn from(err: client::Error) -> Self { ClientError::from(err).into() } } @@ -545,8 +548,8 @@ impl From for ApiError { pub enum ClientError { #[error("connect: {0}")] Connect(String), - #[error("timeout: {0}")] - Timeout(String), + #[error("timeout")] + Timeout, #[error("connection closed early: {0}")] ConnectionClosedEarly(String), #[error("request canceled: {0}")] @@ -569,45 +572,48 @@ impl ClientError { } } -impl From for ClientError { - fn from(err: reqwest::Error) -> Self { +impl From for ClientError { + fn from(err: client::Error) -> Self { let err_msg = err.to_string(); - if err.is_connect() { - Self::Connect(err_msg) - } else if err.is_timeout() { - Self::Timeout(err_msg) - } else if let Some(io_err) = source_err::(&err) { - let io_err_msg = format!("{io_err} -> {err_msg}"); - if io_err.kind() == std::io::ErrorKind::UnexpectedEof { - Self::UnexpectedEof(io_err_msg) - } else if io_err.kind() == std::io::ErrorKind::ConnectionReset { - Self::ConnectionReset(io_err_msg) - } else if io_err.kind() == std::io::ErrorKind::ConnectionAborted { - Self::ConnectionAborted(io_err_msg) - } else if io_err.kind() == std::io::ErrorKind::ConnectionRefused { - Self::ConnectionRefused(io_err_msg) - } else { - Self::Others(io_err_msg) + match err { + client::Error::Send(ref send_err) if send_err.is_connect() => { + classify_io_source(&err, &err_msg).unwrap_or(Self::Connect(err_msg)) } - } else if err.is_request() { - if let Some(hyper_err) = source_err::(&err) { - let hyper_err_msg = format!("{hyper_err} -> {err_msg}"); - if hyper_err.is_incomplete_message() { - Self::ConnectionClosedEarly(hyper_err_msg) - } else if hyper_err.is_canceled() { - Self::RequestCanceled(hyper_err_msg) - } else { - Self::Others(hyper_err_msg) - } - } else { - Self::Others(err_msg) + client::Error::Send(_) | client::Error::Receive(_) => { + classify_hyper_source(&err, &err_msg) + .or_else(|| classify_io_source(&err, &err_msg)) + .unwrap_or(Self::Others(err_msg)) } - } else { - Self::Others(err_msg) + client::Error::Timeout => Self::Timeout, + _ => Self::Others(err_msg), } } } +fn classify_hyper_source(err: &client::Error, err_msg: &str) -> Option { + let hyper_err = source_err::(err)?; + let err_msg = format!("{hyper_err} -> {err_msg}"); + if hyper_err.is_incomplete_message() { + Some(ClientError::ConnectionClosedEarly(err_msg)) + } else if hyper_err.is_canceled() { + Some(ClientError::RequestCanceled(err_msg)) + } else { + None + } +} + +fn classify_io_source(err: &client::Error, err_msg: &str) -> Option { + let io_err = source_err::(err)?; + let err_msg = format!("{io_err} -> {err_msg}"); + Some(match io_err.kind() { + std::io::ErrorKind::UnexpectedEof => ClientError::UnexpectedEof(err_msg), + std::io::ErrorKind::ConnectionReset => ClientError::ConnectionReset(err_msg), + std::io::ErrorKind::ConnectionAborted => ClientError::ConnectionAborted(err_msg), + std::io::ErrorKind::ConnectionRefused => ClientError::ConnectionRefused(err_msg), + _ => return None, + }) +} + fn source_err(err: &dyn std::error::Error) -> Option<&T> { let mut source = err.source(); @@ -632,8 +638,8 @@ pub enum S2STerminalDecodeError { impl From for ApiError { fn from(msg: TerminalMessage) -> Self { let status = match StatusCode::from_u16(msg.status) { - Ok(s) => s, - Err(e) => return ApiError::S2STerminalDecode(e.into()), + Ok(status) => status, + Err(err) => return ApiError::S2STerminalDecode(err.into()), }; if status == StatusCode::PRECONDITION_FAILED { let condition_failed = match serde_json::from_str::(&msg.body) { @@ -665,71 +671,111 @@ impl From for ApiError { pub type Streaming = Pin>>>; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct BaseClient { - client: reqwest::Client, + client: Arc, + default_headers: HeaderMap, + request_timeout: Duration, retry_builder: RetryBackoffBuilder, compression: Compression, } +impl std::fmt::Debug for BaseClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BaseClient").finish_non_exhaustive() + } +} + impl BaseClient { pub fn init(config: &S2Config) -> Result { - let mut headers = HeaderMap::new(); - headers.insert( + let connector = client::default_connector( + Some(config.connection_timeout), + config.insecure_skip_cert_verification, + ) + .map_err(|e| ClientError::Others(format!("failed to load TLS certificates: {e}")))?; + Self::init_with_connector(config, connector) + } + + pub fn init_with_connector(config: &S2Config, connector: C) -> Result + where + C: client::Connect + Clone + Send + Sync + 'static, + { + let mut default_headers = HeaderMap::new(); + default_headers.insert( AUTHORIZATION, format!("Bearer {}", config.access_token.expose_secret()).try_into()?, ); - let mut client_builder = reqwest::ClientBuilder::new() - .timeout(config.request_timeout) - .connect_timeout(config.connection_timeout) - .user_agent(config.user_agent.clone()) - .default_headers(headers) - .danger_accept_invalid_certs(config.insecure_skip_cert_verification); + default_headers.insert(http::header::USER_AGENT, config.user_agent.clone()); match config.compression { Compression::Gzip => { - client_builder = client_builder.gzip(true); + default_headers.insert( + http::header::ACCEPT_ENCODING, + HeaderValue::from_static("gzip"), + ); } Compression::Zstd => { - client_builder = client_builder.zstd(true); + default_headers.insert( + http::header::ACCEPT_ENCODING, + HeaderValue::from_static("zstd"), + ); } Compression::None => {} } + + let client = client::Pool::new(connector); + Ok(Self { - client: client_builder.build()?, + client: Arc::new(client), + default_headers, + request_timeout: config.request_timeout, retry_builder: retry_builder(&config.retry), compression: config.compression, }) } - async fn compress_request(&self, request: &mut Request) -> Result<(), ApiError> { - if let Some(body) = request.body_mut() { - let bytes = body.as_bytes().expect("should not be a stream"); - match self.compression { - Compression::None => {} - Compression::Gzip => { - let mut encoder = GzipEncoder::with_quality(Vec::new(), Level::Fastest); - encoder.write_all(bytes).await?; - encoder.shutdown().await?; - *body = encoder.into_inner().into(); - request - .headers_mut() - .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip")); - } - Compression::Zstd => { - let mut encoder = ZstdEncoder::with_quality(Vec::new(), Level::Fastest); - encoder.write_all(bytes).await?; - encoder.shutdown().await?; - *body = encoder.into_inner().into(); - request - .headers_mut() - .insert(CONTENT_ENCODING, HeaderValue::from_static("zstd")); - } - } - } - Ok(()) + pub fn get(&self, url: Url) -> client::RequestBuilder { + client::RequestBuilder::get(url) + .timeout(self.request_timeout) + .headers(&self.default_headers) + .compression(self.compression) + } + + pub fn post(&self, url: Url) -> client::RequestBuilder { + client::RequestBuilder::post(url) + .timeout(self.request_timeout) + .headers(&self.default_headers) + .compression(self.compression) + } + + pub fn patch(&self, url: Url) -> client::RequestBuilder { + client::RequestBuilder::patch(url) + .timeout(self.request_timeout) + .headers(&self.default_headers) + .compression(self.compression) + } + + pub fn delete(&self, url: Url) -> client::RequestBuilder { + client::RequestBuilder::delete(url) + .timeout(self.request_timeout) + .headers(&self.default_headers) + .compression(self.compression) + } + + pub async fn init_streaming( + &self, + request: client::Request, + ) -> Result { + self.client.init_streaming(request).await + } + + async fn execute_unary( + &self, + request: client::Request, + ) -> Result { + self.client.execute_unary(request).await } - fn request(&self, request: Request) -> RequestBuilder<'_> { + fn request(&self, request: client::Request) -> RequestBuilder<'_> { RequestBuilder { client: self, request, @@ -739,14 +785,6 @@ impl BaseClient { } } -impl Deref for BaseClient { - type Target = reqwest::Client; - - fn deref(&self) -> &Self::Target { - &self.client - } -} - pub fn retry_builder(config: &RetryConfig) -> RetryBackoffBuilder { RetryBackoffBuilder::default() .with_min_base_delay(config.min_base_delay) @@ -754,15 +792,12 @@ pub fn retry_builder(config: &RetryConfig) -> RetryBackoffBuilder { .with_max_retries(config.max_retries()) } -type ErrorHandlerFn = Box< - dyn Fn(StatusCode, Response) -> Pin> + Send>> - + Send - + Sync, ->; +type ErrorHandlerFn = + Box Result + Send + Sync>; struct RequestBuilder<'a> { client: &'a BaseClient, - request: Request, + request: client::Request, retry_enabled: bool, error_handler: Option, } @@ -775,22 +810,18 @@ impl<'a> RequestBuilder<'a> { } } - fn error_handler(self, handler: F) -> Self + fn error_handler(self, handler: F) -> Self where - F: Fn(StatusCode, Response) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, + F: Fn(StatusCode, UnaryResponse) -> Result + Send + Sync + 'static, { Self { - error_handler: Some(Box::new(move |status, response| { - Box::pin(handler(status, response)) - })), + error_handler: Some(Box::new(handler)), ..self } } - async fn send(self) -> Result { - let mut request = self.request; - self.client.compress_request(&mut request).await?; + async fn send(self) -> Result { + let request = self.request; let mut retry_backoffs: Option = self .retry_enabled @@ -799,7 +830,7 @@ impl<'a> RequestBuilder<'a> { loop { let response = self .client - .execute(request.try_clone().expect("body should not be a stream")) + .execute_unary(request.try_clone().expect("body should not be a stream")) .await; let (err, retry_after) = match response { @@ -827,9 +858,9 @@ impl<'a> RequestBuilder<'a> { .map(Duration::from_millis); let result = if let Some(ref handler) = self.error_handler { - resp.into_result_with_handler(handler).await + resp.into_result_with_handler(handler) } else { - resp.into_result().await + resp.into_result() }; match result { @@ -846,7 +877,7 @@ impl<'a> RequestBuilder<'a> { if err.is_retryable() && let Some(backoff) = retry_backoffs.as_mut().and_then(|b| b.next()) { - let backoff = retry_after.unwrap_or(backoff); + let backoff = retry_after.map_or(backoff, |ra| ra.max(backoff)); debug!( %err, ?backoff, @@ -869,10 +900,10 @@ impl<'a> RequestBuilder<'a> { } fn add_basin_header_if_required( - request: reqwest::RequestBuilder, + request: client::RequestBuilder, endpoints: &S2Endpoints, name: &BasinName, -) -> reqwest::RequestBuilder { +) -> client::RequestBuilder { if matches!(endpoints.basin_authority, BasinAuthority::Direct(_)) { return request.header( S2_BASIN, @@ -902,37 +933,61 @@ fn base_url(endpoints: &S2Endpoints, kind: ClientKind) -> Url { Url::parse(&format!("{scheme}://{authority}")).expect("valid url") } -trait IntoResult { - async fn into_result(self) -> Result; - async fn into_result_with_handler(self, handler: F) -> Result +trait UnaryResult { + fn into_result(self) -> Result; + fn into_result_with_handler(self, handler: F) -> Result where - F: Fn(StatusCode, Response) -> Fut, - Fut: Future>; + F: FnOnce(StatusCode, UnaryResponse) -> Result; } -impl IntoResult for Response { - async fn into_result(self) -> Result { +impl UnaryResult for UnaryResponse { + fn into_result(self) -> Result { let status = self.status(); if status.is_success() { Ok(self) } else { - Err(ApiError::Server( - status, - self.json::().await?, - )) + Err(ApiError::Server(status, self.json::()?)) } } - async fn into_result_with_handler(self, handler: F) -> Result + fn into_result_with_handler(self, handler: F) -> Result where - F: Fn(StatusCode, Response) -> Fut, - Fut: Future>, + F: FnOnce(StatusCode, UnaryResponse) -> Result, { let status = self.status(); if status.is_success() { Ok(self) } else { - handler(status, self).await + handler(status, self) + } + } +} + +#[async_trait] +trait StreamingResult { + async fn into_result(self) -> Result; +} + +#[async_trait] +impl StreamingResult for StreamingResponse { + async fn into_result(self) -> Result { + if self.status().is_success() { + return Ok(self); + } + + let status = self.status(); + let bytes = self.into_bytes().await?; + if status == StatusCode::RANGE_NOT_SATISFIABLE + && let Ok(tail) = serde_json::from_slice::(&bytes) + { + return Err(ApiError::ReadUnwritten(tail)); + } + match serde_json::from_slice::(&bytes) { + Ok(response) => Err(ApiError::Server(status, response)), + Err(_) => Err(ApiError::Client(ClientError::Others(format!( + "server error {status}: {}", + String::from_utf8_lossy(&bytes) + )))), } } } @@ -941,7 +996,7 @@ trait IgnoreNotFound { fn ignore_not_found(self, enabled: bool) -> Result<(), ApiError>; } -impl IgnoreNotFound for Result { +impl IgnoreNotFound for Result { fn ignore_not_found(self, enabled: bool) -> Result<(), ApiError> { match self { Ok(_) => Ok(()), diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..390c20a --- /dev/null +++ b/src/client.rs @@ -0,0 +1,964 @@ +use std::{ + collections::HashMap, + convert::Infallible, + sync::{ + Arc, Mutex, + atomic::{AtomicUsize, Ordering}, + }, + time::{Duration, Instant}, +}; + +use async_compression::{ + Level, + tokio::{ + bufread::{GzipDecoder, ZstdDecoder}, + write::{GzipEncoder, ZstdEncoder}, + }, +}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::{Stream, StreamExt}; +use http::{ + HeaderMap, Method, StatusCode, + header::{CONTENT_ENCODING, CONTENT_TYPE, HeaderName, HeaderValue}, +}; +use http_body_util::{BodyExt, Empty, Full, StreamBody, combinators::UnsyncBoxBody}; +use hyper::body::{Frame, Incoming}; +use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +pub use hyper_util::client::legacy::connect::Connect; +use hyper_util::{ + client::legacy::{Client as HyperClient, connect::HttpConnector}, + rt::TokioExecutor, +}; +use serde::{Serialize, de::DeserializeOwned}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::RwLock, + time::timeout, +}; +use tokio_util::task::AbortOnDropHandle; +use url::Url; + +const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json"); +const MAX_CONCURRENT_REQUESTS_PER_CLIENT: usize = 90; +const IDLE_TIMEOUT: Duration = Duration::from_secs(90); +const REAPER_INTERVAL: Duration = Duration::from_secs(30); + +type BoxError = Box; +type BoxBody = UnsyncBoxBody; + +#[derive(Debug, Clone, Copy, Default)] +pub enum Compression { + #[default] + None, + Gzip, + Zstd, +} + +impl From for Compression { + fn from(c: crate::types::Compression) -> Self { + match c { + crate::types::Compression::None => Compression::None, + crate::types::Compression::Gzip => Compression::Gzip, + crate::types::Compression::Zstd => Compression::Zstd, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("send error: {0}")] + Send(#[from] hyper_util::client::legacy::Error), + #[error("receive error: {0}")] + Receive(#[from] hyper::Error), + #[error("http error: {0}")] + Http(#[from] http::Error), + #[error("json error: {0}")] + Json(#[from] serde_json::Error), + #[error("url encoding error: {0}")] + UrlEncoded(#[from] serde_urlencoded::ser::Error), + #[error("timeout")] + Timeout, + #[error("compression error: {0}")] + Compression(String), +} + +impl Error { + pub fn is_connect(&self) -> bool { + matches!(self, Error::Send(e) if e.is_connect()) + } +} + +enum BodyInner { + Empty, + Full(Bytes), + Streaming(BoxBody), +} + +pub struct Body(BodyInner); + +impl Body { + fn empty() -> Self { + Self(BodyInner::Empty) + } + + pub fn wrap_stream(stream: S) -> Self + where + S: Stream> + Send + 'static, + E: Into + 'static, + { + let stream_body = StreamBody::new(stream.map(|r| r.map(Frame::data).map_err(Into::into))); + Self(BodyInner::Streaming(BoxBody::new(stream_body))) + } + + fn as_bytes(&self) -> Option<&[u8]> { + match &self.0 { + BodyInner::Empty => Some(&[]), + BodyInner::Full(bytes) => Some(bytes), + BodyInner::Streaming(_) => None, + } + } + + fn into_http_body(self) -> BoxBody { + match self.0 { + BodyInner::Empty => BoxBody::new(Empty::new().map_err(|e: Infallible| match e {})), + BodyInner::Full(bytes) => { + BoxBody::new(Full::new(bytes).map_err(|e: Infallible| match e {})) + } + BodyInner::Streaming(stream) => stream, + } + } +} + +impl Default for Body { + fn default() -> Self { + Self::empty() + } +} + +impl From> for Body { + fn from(data: Vec) -> Self { + Self(BodyInner::Full(Bytes::from(data))) + } +} + +pub struct Request { + method: Method, + url: Url, + headers: HeaderMap, + body: Body, + timeout: Option, + compression: Compression, +} + +impl Request { + pub fn headers_mut(&mut self) -> &mut HeaderMap { + &mut self.headers + } + + pub fn authority(&self) -> &str { + self.url.authority() + } + + pub fn try_clone(&self) -> Option { + let body = match &self.body.0 { + BodyInner::Empty => Body::empty(), + BodyInner::Full(bytes) => Body(BodyInner::Full(bytes.clone())), + BodyInner::Streaming(_) => return None, + }; + + Some(Self { + method: self.method.clone(), + url: self.url.clone(), + headers: self.headers.clone(), + body, + timeout: self.timeout, + compression: self.compression, + }) + } +} + +pub struct RequestBuilder { + method: Method, + url: Url, + headers: HeaderMap, + body: Option, + timeout: Option, + compression: Compression, + error: Option, +} + +impl RequestBuilder { + pub fn new(method: Method, url: Url) -> Self { + Self { + method, + url, + headers: HeaderMap::new(), + body: None, + timeout: None, + compression: Compression::None, + error: None, + } + } + + pub fn get(url: Url) -> Self { + Self::new(Method::GET, url) + } + + pub fn post(url: Url) -> Self { + Self::new(Method::POST, url) + } + + pub fn patch(url: Url) -> Self { + Self::new(Method::PATCH, url) + } + + pub fn delete(url: Url) -> Self { + Self::new(Method::DELETE, url) + } + + pub fn query(mut self, query: &T) -> Self { + if self.error.is_some() { + return self; + } + + match serde_urlencoded::to_string(query) { + Ok(query_string) => { + if !query_string.is_empty() { + let existing = self.url.query().unwrap_or(""); + if existing.is_empty() { + self.url.set_query(Some(&query_string)); + } else { + self.url + .set_query(Some(&format!("{existing}&{query_string}"))); + } + } + } + Err(e) => self.error = Some(Error::UrlEncoded(e)), + } + self + } + + pub fn json(mut self, json: &T) -> Self { + if self.error.is_some() { + return self; + } + + match serde_json::to_vec(json) { + Ok(data) => { + self.headers.insert(CONTENT_TYPE, APPLICATION_JSON); + self.body = Some(Body::from(data)); + } + Err(e) => self.error = Some(Error::Json(e)), + } + self + } + + pub fn body>(mut self, body: B) -> Self { + self.body = Some(body.into()); + self + } + + pub fn header(mut self, key: K, value: V) -> Self + where + K: TryInto, + K::Error: Into, + V: TryInto, + V::Error: Into, + { + match (key.try_into(), value.try_into()) { + (Ok(name), Ok(value)) => { + self.headers.insert(name, value); + } + (Err(e), _) => self.error = Some(Error::Http(e.into())), + (_, Err(e)) => self.error = Some(Error::Http(e.into())), + } + self + } + + pub fn headers(mut self, headers: &HeaderMap) -> Self { + for (key, value) in headers { + self.headers.insert(key.clone(), value.clone()); + } + self + } + + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self + } + + pub fn compression(mut self, compression: impl Into) -> Self { + self.compression = compression.into(); + self + } + + pub fn build(self) -> Result { + if let Some(e) = self.error { + return Err(e); + } + + Ok(Request { + method: self.method, + url: self.url, + headers: self.headers, + body: self.body.unwrap_or_default(), + timeout: self.timeout, + compression: self.compression, + }) + } +} + +pub struct UnaryResponse { + status: StatusCode, + headers: HeaderMap, + bytes: Bytes, +} + +impl UnaryResponse { + pub fn status(&self) -> StatusCode { + self.status + } + + pub fn headers(&self) -> &HeaderMap { + &self.headers + } + + pub fn into_bytes(self) -> Bytes { + self.bytes + } + + pub fn json(self) -> Result { + Ok(serde_json::from_slice(&self.bytes)?) + } +} + +pub struct StreamingResponse { + status: StatusCode, + headers: HeaderMap, + body: Incoming, + permit: RequestPermit, +} + +impl StreamingResponse { + fn new(status: StatusCode, headers: HeaderMap, body: Incoming, permit: RequestPermit) -> Self { + Self { + status, + headers, + body, + permit, + } + } + + pub fn status(&self) -> StatusCode { + self.status + } + + pub async fn into_bytes(self) -> Result { + let bytes = self.body.collect().await?.to_bytes(); + decompress_body(&self.headers, bytes).await + } + + pub fn stream(self) -> impl Stream> { + let permit = self.permit; + http_body_util::BodyStream::new(self.body).filter_map(move |result| { + let _ = &permit; + std::future::ready(match result { + Ok(frame) => frame.into_data().ok().map(Ok), + Err(e) => Some(Err(Error::Receive(e))), + }) + }) + } +} + +#[async_trait] +pub trait RequestExecutor: Send + Sync { + async fn execute_unary(&self, request: Request) -> Result; + async fn init_streaming(&self, request: Request) -> Result; +} + +pub fn default_connector( + connect_timeout: Option, + insecure_skip_cert_verification: bool, +) -> Result, std::io::Error> { + let mut connector = HttpConnector::new(); + connector.enforce_http(false); + if let Some(timeout) = connect_timeout { + connector.set_connect_timeout(Some(timeout)); + } + + let builder = if insecure_skip_cert_verification { + HttpsConnectorBuilder::new().with_tls_config( + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoVerifier)) + .with_no_client_auth(), + ) + } else { + HttpsConnectorBuilder::new().with_native_roots()? + }; + + Ok(builder + .https_or_http() + .enable_http2() + .wrap_connector(connector)) +} + +#[derive(Debug)] +struct NoVerifier; + +impl rustls::client::danger::ServerCertVerifier for NoVerifier { + fn verify_server_cert( + &self, + _: &rustls::pki_types::CertificateDer<'_>, + _: &[rustls::pki_types::CertificateDer<'_>], + _: &rustls::pki_types::ServerName<'_>, + _: &[u8], + _: rustls::pki_types::UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _: &[u8], + _: &rustls::pki_types::CertificateDer<'_>, + _: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _: &[u8], + _: &rustls::pki_types::CertificateDer<'_>, + _: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + rustls::crypto::aws_lc_rs::default_provider() + .signature_verification_algorithms + .supported_schemes() + } +} + +fn build_http_request( + method: Method, + url: &Url, + headers: HeaderMap, + body: BoxBody, + content_encoding: Option, +) -> Result, Error> { + let uri: http::Uri = url + .as_str() + .parse() + .map_err(|e: http::uri::InvalidUri| Error::Http(e.into()))?; + + let mut builder = http::Request::builder().method(method).uri(uri); + + if let Some(req_headers) = builder.headers_mut() { + for (key, value) in headers { + if let Some(key) = key { + req_headers.insert(key, value); + } + } + if let Some(encoding) = content_encoding { + req_headers.insert(CONTENT_ENCODING, encoding); + } + } + + Ok(builder.body(body)?) +} + +async fn execute_unary_with( + client: &HyperClient, + request: Request, +) -> Result +where + C: Connect + Clone + Send + Sync + 'static, +{ + let request_timeout = request.timeout; + + let (body, content_encoding) = compress_body(request.body, request.compression).await?; + + let http_request = build_http_request( + request.method, + &request.url, + request.headers, + body.into_http_body(), + content_encoding, + )?; + + let operation = async { + let response = client.request(http_request).await?; + let (parts, body) = response.into_parts(); + let bytes = body.collect().await?.to_bytes(); + + Ok::<_, Error>((parts.status, parts.headers, bytes)) + }; + + let (status, headers, bytes) = if let Some(timeout_duration) = request_timeout { + timeout(timeout_duration, operation) + .await + .map_err(|_| Error::Timeout)?? + } else { + operation.await? + }; + + let bytes = decompress_body(&headers, bytes).await?; + + Ok(UnaryResponse { + status, + headers, + bytes, + }) +} + +async fn init_streaming_with( + client: &HyperClient, + request: Request, + permit: RequestPermit, +) -> Result +where + C: Connect + Clone + Send + Sync + 'static, +{ + let request_timeout = request.timeout; + + let http_request = build_http_request( + request.method, + &request.url, + request.headers, + request.body.into_http_body(), + None, + )?; + + let operation = async { + let response = client.request(http_request).await?; + let (parts, body) = response.into_parts(); + + Ok::<_, Error>(StreamingResponse::new( + parts.status, + parts.headers, + body, + permit, + )) + }; + + if let Some(duration) = request_timeout { + timeout(duration, operation) + .await + .map_err(|_| Error::Timeout)? + } else { + operation.await + } +} + +async fn compress_body( + body: Body, + compression: Compression, +) -> Result<(Body, Option), Error> { + match compression { + Compression::None => Ok((body, None)), + Compression::Gzip => { + let Some(data) = body.as_bytes() else { + return Err(Error::Compression( + "streaming request bodies cannot be compressed".into(), + )); + }; + let mut encoder = GzipEncoder::with_quality(Vec::new(), Level::Fastest); + encoder + .write_all(data) + .await + .map_err(|e| Error::Compression(e.to_string()))?; + encoder + .shutdown() + .await + .map_err(|e| Error::Compression(e.to_string()))?; + let compressed = encoder.into_inner(); + Ok(( + Body::from(compressed), + Some(HeaderValue::from_static("gzip")), + )) + } + Compression::Zstd => { + let Some(data) = body.as_bytes() else { + return Err(Error::Compression( + "streaming request bodies cannot be compressed".into(), + )); + }; + let mut encoder = ZstdEncoder::with_quality(Vec::new(), Level::Fastest); + encoder + .write_all(data) + .await + .map_err(|e| Error::Compression(e.to_string()))?; + encoder + .shutdown() + .await + .map_err(|e| Error::Compression(e.to_string()))?; + let compressed = encoder.into_inner(); + Ok(( + Body::from(compressed), + Some(HeaderValue::from_static("zstd")), + )) + } + } +} + +async fn decompress_body(headers: &HeaderMap, bytes: Bytes) -> Result { + let content_encoding = headers.get(CONTENT_ENCODING).and_then(|v| v.to_str().ok()); + + match content_encoding { + Some("gzip") => { + let mut decoder = GzipDecoder::new(bytes.as_ref()); + let mut decompressed = Vec::new(); + decoder + .read_to_end(&mut decompressed) + .await + .map_err(|e| Error::Compression(e.to_string()))?; + Ok(Bytes::from(decompressed)) + } + Some("zstd") => { + let mut decoder = ZstdDecoder::new(bytes.as_ref()); + let mut decompressed = Vec::new(); + decoder + .read_to_end(&mut decompressed) + .await + .map_err(|e| Error::Compression(e.to_string()))?; + Ok(Bytes::from(decompressed)) + } + _ => Ok(bytes), + } +} + +struct RequestPermit { + active_requests: Arc, + idle_since: Arc>>, +} + +impl Drop for RequestPermit { + fn drop(&mut self) { + let prev = self.active_requests.fetch_sub(1, Ordering::Relaxed); + if prev == 1 { + *self.idle_since.lock().unwrap() = Some(Instant::now()); + } + } +} + +struct PooledClient { + client: Arc>, + active_requests: Arc, + idle_since: Arc>>, +} + +impl PooledClient { + fn new(client: HyperClient) -> Self { + Self { + client: Arc::new(client), + active_requests: Arc::new(AtomicUsize::new(0)), + idle_since: Arc::new(Mutex::new(Some(Instant::now()))), + } + } + + fn request_permit(&self) -> Option { + self.active_requests + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |ar| { + (ar < MAX_CONCURRENT_REQUESTS_PER_CLIENT).then_some(ar + 1) + }) + .ok()?; + *self.idle_since.lock().unwrap() = None; + Some(RequestPermit { + active_requests: self.active_requests.clone(), + idle_since: self.idle_since.clone(), + }) + } + + fn should_reap(&self, idle_timeout: Duration) -> bool { + if self.active_requests.load(Ordering::Relaxed) != 0 { + return false; + } + if let Some(idle_since) = *self.idle_since.lock().unwrap() { + return idle_since.elapsed() > idle_timeout; + } + false + } +} + +struct HostPool { + clients: RwLock>>, + connector: C, +} + +impl HostPool +where + C: Connect + Clone + Send + Sync + 'static, +{ + fn new(connector: C) -> Self { + Self { + clients: RwLock::new(Vec::new()), + connector, + } + } + + fn create_client(&self) -> PooledClient { + let client = HyperClient::builder(TokioExecutor::new()).build(self.connector.clone()); + PooledClient::new(client) + } + + async fn checkout(&self) -> (Arc>, RequestPermit) { + { + let clients = self.clients.read().await; + for pooled in clients.iter() { + if let Some(permit) = pooled.request_permit() { + return (pooled.client.clone(), permit); + } + } + } + let mut clients = self.clients.write().await; + for pooled in clients.iter() { + if let Some(permit) = pooled.request_permit() { + return (pooled.client.clone(), permit); + } + } + let new_client = self.create_client(); + let permit = new_client + .request_permit() + .expect("new client must have a permit"); + let client = new_client.client.clone(); + clients.push(new_client); + (client, permit) + } + + async fn reap_idle_clients(&self) { + self.clients + .write() + .await + .retain(|pooled| !pooled.should_reap(IDLE_TIMEOUT)); + } + + fn is_empty(&self) -> bool { + self.clients + .try_read() + .map(|clients| clients.is_empty()) + .unwrap_or(false) + } +} + +pub struct Pool { + hosts: Arc>>>>, + connector: C, + _reaper: AbortOnDropHandle<()>, +} + +impl Pool +where + C: Connect + Clone + Send + Sync + 'static, +{ + pub fn new(connector: C) -> Self { + let hosts = Arc::new(RwLock::new(HashMap::new())); + + let _reaper = AbortOnDropHandle::new(tokio::spawn({ + let hosts = hosts.clone(); + async move { + let mut interval = tokio::time::interval(REAPER_INTERVAL); + loop { + interval.tick().await; + reap_idle_clients(&hosts).await; + } + } + })); + + Self { + hosts, + connector, + _reaper, + } + } + + async fn get_or_create_host_pool(&self, host: &str) -> Arc> { + { + let hosts = self.hosts.read().await; + if let Some(pool) = hosts.get(host) { + return pool.clone(); + } + } + let mut hosts = self.hosts.write().await; + hosts + .entry(host.to_owned()) + .or_insert_with(|| Arc::new(HostPool::new(self.connector.clone()))) + .clone() + } + + async fn checkout(&self, host: &str) -> (Arc>, RequestPermit) { + self.get_or_create_host_pool(host).await.checkout().await + } +} + +async fn reap_idle_clients( + hosts: &RwLock>>>, +) { + let pools: Vec>> = { + let hosts = hosts.read().await; + hosts.values().cloned().collect() + }; + + for pool in &pools { + pool.reap_idle_clients().await; + } + + hosts.write().await.retain(|_, pool| !pool.is_empty()); +} + +#[async_trait] +impl RequestExecutor for Pool +where + C: Connect + Clone + Send + Sync + 'static, +{ + async fn execute_unary(&self, request: Request) -> Result { + let (client, _permit) = self.checkout(request.authority()).await; + execute_unary_with(&client, request).await + } + + async fn init_streaming(&self, request: Request) -> Result { + let (client, permit) = self.checkout(request.authority()).await; + init_streaming_with(&client, request, permit).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const TEST_HOST: &str = "localhost:8080"; + + fn test_pool() -> Pool { + Pool::new(HttpConnector::new()) + } + + async fn host_client_count(pool: &Pool, host: &str) -> usize { + let hosts = pool.hosts.read().await; + match hosts.get(host) { + Some(pool) => pool.clients.read().await.len(), + None => 0, + } + } + + #[tokio::test] + async fn checkout_within_capacity() { + let pool = test_pool(); + let mut permits = Vec::new(); + for _ in 0..MAX_CONCURRENT_REQUESTS_PER_CLIENT { + let (_client, permit) = pool.checkout(TEST_HOST).await; + permits.push(permit); + } + assert_eq!(host_client_count(&pool, TEST_HOST).await, 1); + } + + #[tokio::test] + async fn overflow_creates_new_client() { + let pool = test_pool(); + let mut permits = Vec::new(); + for _ in 0..MAX_CONCURRENT_REQUESTS_PER_CLIENT { + let (_client, permit) = pool.checkout(TEST_HOST).await; + permits.push(permit); + } + assert_eq!(host_client_count(&pool, TEST_HOST).await, 1); + + let (_client, permit) = pool.checkout(TEST_HOST).await; + permits.push(permit); + assert_eq!(host_client_count(&pool, TEST_HOST).await, 2); + } + + #[tokio::test] + async fn permit_drop_frees_capacity() { + let pool = test_pool(); + let mut permits = Vec::new(); + for _ in 0..MAX_CONCURRENT_REQUESTS_PER_CLIENT { + let (_client, permit) = pool.checkout(TEST_HOST).await; + permits.push(permit); + } + permits.pop(); + + let (_client, permit) = pool.checkout(TEST_HOST).await; + permits.push(permit); + assert_eq!(host_client_count(&pool, TEST_HOST).await, 1); + } + + #[tokio::test] + async fn reaper_removes_idle_clients() { + let pool = test_pool(); + let mut permits = Vec::new(); + for _ in 0..MAX_CONCURRENT_REQUESTS_PER_CLIENT { + let (_client, permit) = pool.checkout(TEST_HOST).await; + permits.push(permit); + } + let (_client, permit) = pool.checkout(TEST_HOST).await; + permits.push(permit); + assert_eq!(host_client_count(&pool, TEST_HOST).await, 2); + + permits.clear(); + { + let hosts = pool.hosts.read().await; + let pool = hosts.get(TEST_HOST).unwrap(); + let clients = pool.clients.read().await; + for pooled in clients.iter() { + *pooled.idle_since.lock().unwrap() = + Some(Instant::now() - IDLE_TIMEOUT - Duration::from_secs(1)); + } + } + + reap_idle_clients(&pool.hosts).await; + assert_eq!(host_client_count(&pool, TEST_HOST).await, 0); + assert!(pool.hosts.read().await.get(TEST_HOST).is_none()); + } + + #[tokio::test] + async fn different_hosts_get_independent_pools() { + let pool = test_pool(); + let host_a = "host-a:443"; + let host_b = "host-b:443"; + + let mut permits_a = Vec::new(); + for _ in 0..MAX_CONCURRENT_REQUESTS_PER_CLIENT { + let (_client, permit) = pool.checkout(host_a).await; + permits_a.push(permit); + } + assert_eq!(host_client_count(&pool, host_a).await, 1); + + let (_client, permit_b) = pool.checkout(host_b).await; + assert_eq!(host_client_count(&pool, host_b).await, 1); + assert_eq!(host_client_count(&pool, host_a).await, 1); + + drop(permit_b); + drop(permits_a); + } + + #[tokio::test] + async fn reaper_removes_empty_host_entries() { + let pool = test_pool(); + + let (_client, permit_a) = pool.checkout("host-a:443").await; + let (_client, permit_b) = pool.checkout("host-b:443").await; + assert_eq!(pool.hosts.read().await.len(), 2); + + drop(permit_a); + { + let hosts = pool.hosts.read().await; + let pool_a = hosts.get("host-a:443").unwrap(); + let clients = pool_a.clients.read().await; + for pooled in clients.iter() { + *pooled.idle_since.lock().unwrap() = + Some(Instant::now() - IDLE_TIMEOUT - Duration::from_secs(1)); + } + } + + reap_idle_clients(&pool.hosts).await; + + let hosts = pool.hosts.read().await; + assert!(hosts.get("host-a:443").is_none()); + assert!(hosts.get("host-b:443").is_some()); + + drop(permit_b); + } +} diff --git a/src/lib.rs b/src/lib.rs index 35a8113..78d8ad0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ issue. #[rustfmt::skip] mod api; +mod client; mod session; pub mod batching; diff --git a/src/ops.rs b/src/ops.rs index 1a8774c..34dba63 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -1,7 +1,9 @@ use futures::StreamExt; +#[cfg(feature = "_hidden")] +use crate::client::Connect; use crate::{ - api::{AccountClient, BasinClient}, + api::{AccountClient, BaseClient, BasinClient}, producer::{Producer, ProducerConfig}, session::{self, AppendSession, AppendSessionConfig}, types::{ @@ -24,8 +26,21 @@ pub struct S2 { impl S2 { /// Create a new [`S2`]. pub fn new(config: S2Config) -> Result { + let base_client = BaseClient::init(&config)?; Ok(Self { - client: AccountClient::init(config)?, + client: AccountClient::init(config, base_client), + }) + } + + #[doc(hidden)] + #[cfg(feature = "_hidden")] + pub fn new_with_connector(config: S2Config, connector: C) -> Result + where + C: Connect + Clone + Send + Sync + 'static, + { + let base_client = BaseClient::init_with_connector(&config, connector)?; + Ok(Self { + client: AccountClient::init(config, base_client), }) } diff --git a/src/retry.rs b/src/retry.rs index 70faa31..4ec25ee 100644 --- a/src/retry.rs +++ b/src/retry.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use rand::{Rng, rng}; +use rand::{RngExt, rng}; #[derive(Debug, Clone, Copy)] pub struct RetryBackoffBuilder { diff --git a/src/types.rs b/src/types.rs index d535762..0a360f8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -16,7 +16,7 @@ use http::{ header::HeaderValue, uri::{Authority, Scheme}, }; -use rand::Rng; +use rand::RngExt; use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm}; pub use s2_common::caps::RECORD_BATCH_MAX; /// Validation error. @@ -2426,7 +2426,7 @@ pub struct ListStreamsInput { /// /// Defaults to `""`. pub start_after: StreamNameStartAfter, - /// Number of streams to return in a page. Will be clamped to a maximum of `1000`. + /// Number of streams to return in a page. Will be clamped to a maximum of `1000`. /// /// Defaults to `1000`. pub limit: Option, @@ -3218,8 +3218,12 @@ impl ReadLimits { /// When to stop reading. pub struct ReadStop { /// Limits on how much to read. + /// + /// See [`ReadLimits`] for defaults. pub limits: ReadLimits, /// Timestamp at which to stop (exclusive). + /// + /// Defaults to `None`. pub until: Option>, /// Duration in seconds to wait for new records before stopping. Will be clamped to `60` /// seconds for [`read`](crate::S2Stream::read). @@ -3278,8 +3282,12 @@ impl From for api::stream::ReadEnd { /// operations. pub struct ReadInput { /// Where to start reading. + /// + /// See [`ReadStart`] for defaults. pub start: ReadStart, /// When to stop reading. + /// + /// See [`ReadStop`] for defaults. pub stop: ReadStop, /// Whether to filter out command records from the stream when reading. ///