diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index ff9aeba93..50157ad4f 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -107,6 +107,8 @@ oxen push origin main # Push to remote - When calling `get_staged_db_manager`, follow the doc comment on that function: drop the returned `StagedDBManager` as soon as possible (via a block scope or explicit `drop()`) to avoid holding the shared database handle longer than necessary. - When altering the `OxenError` enum, consider whether a hint needs to be added or updated in the `hint` method. - After changing any Rust code, verify that tests pass with the `bin/test-rust` script (not `cargo`). The script is documented in a comment at the top of its file. +- Prefer using inline code over creating a new function when the function would only be called once and the function body would be less than 15 lines. +- Preserve comments whenever possible. Comments that were written by someone other than Claude should always be preserved or updated if possible. - The Python project calls into the Rust project. Whenever changing the Rust code, check to see if the Python code needs to be updated. - After changing any Rust or Python code, verify that Rust tests pass with `bin/test-rust` and Python tests pass with `bin/test-rust -p` diff --git a/Cargo.lock b/Cargo.lock index f3abd3999..98aff3a4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,7 @@ dependencies = [ "derive_more 2.1.1", "encoding_rs", "flate2", - "foldhash", + "foldhash 0.1.5", "futures-core", "h2 0.3.27", "http 0.2.12", @@ -219,7 +219,7 @@ dependencies = [ "cookie", "derive_more 2.1.1", "encoding_rs", - "foldhash", + "foldhash 0.1.5", "futures-core", "futures-util", "impl-more", @@ -1078,9 +1078,9 @@ dependencies = [ [[package]] name = "aws-config" -version = "1.8.13" +version = "1.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c456581cb3c77fafcc8c67204a70680d40b61112d6da78c77bd31d945b65f1b5" +checksum = "11493b0bad143270fb8ad284a096dd529ba91924c5409adeac856cc1bf047dbc" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1088,8 +1088,8 @@ dependencies = [ "aws-sdk-ssooidc", "aws-sdk-sts", "aws-smithy-async", - "aws-smithy-http 0.63.3", - "aws-smithy-json 0.62.3", + "aws-smithy-http", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1098,7 +1098,7 @@ dependencies = [ "fastrand", "hex", "http 1.4.0", - "ring 0.17.14", + "sha1", "time", "tokio", "tracing", @@ -1108,9 +1108,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.11" +version = "1.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cd362783681b15d136480ad555a099e82ecd8e2d10a841e14dfd0078d67fee3" +checksum = "8f20799b373a1be121fe3005fba0c2090af9411573878f224df44b42727fcaf7" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -1142,20 +1142,21 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.6.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c635c2dc792cb4a11ce1a4f392a925340d1bdf499289b5ec1ec6810954eb43f5" +checksum = "5fc0651c57e384202e47153c1260b84a9936e19803d747615edf199dc3b98d17" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", - "aws-smithy-http 0.63.3", + "aws-smithy-http", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", "bytes", + "bytes-utils", "fastrand", "http 0.2.12", "http 1.4.0", @@ -1169,9 +1170,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.119.0" +version = "1.127.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d65fddc3844f902dfe1864acb8494db5f9342015ee3ab7890270d36fbd2e01c" +checksum = "151783f64e0dcddeb4965d08e36c276b4400a46caa88805a2e36d497deaf031a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1179,8 +1180,9 @@ dependencies = [ "aws-smithy-async", "aws-smithy-checksums", "aws-smithy-eventstream", - "aws-smithy-http 0.62.6", - "aws-smithy-json 0.61.9", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1192,8 +1194,8 @@ dependencies = [ "hmac", "http 0.2.12", "http 1.4.0", - "http-body 0.4.6", - "lru 0.12.5", + "http-body 1.0.1", + "lru 0.16.3", "percent-encoding", "regex-lite", "sha2", @@ -1203,15 +1205,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.93.0" +version = "1.97.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcb38bb33fc0a11f1ffc3e3e85669e0a11a37690b86f77e75306d8f369146a0" +checksum = "9aadc669e184501caaa6beafb28c6267fc1baef0810fb58f9b205485ca3f2567" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.63.3", - "aws-smithy-json 0.62.3", + "aws-smithy-http", + "aws-smithy-json", "aws-smithy-observability", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -1227,15 +1229,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.95.0" +version = "1.99.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ada8ffbea7bd1be1f53df1dadb0f8fdb04badb13185b3321b929d1ee3caad09" +checksum = "1342a7db8f358d3de0aed2007a0b54e875458e39848d54cc1d46700b2bfcb0a8" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.63.3", - "aws-smithy-json 0.62.3", + "aws-smithy-http", + "aws-smithy-json", "aws-smithy-observability", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -1251,15 +1253,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.97.0" +version = "1.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6443ccadc777095d5ed13e21f5c364878c9f5bad4e35187a6cdbd863b0afcad" +checksum = "ab41ad64e4051ecabeea802d6a17845a91e83287e1dd249e6963ea1ba78c428a" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.63.3", - "aws-smithy-json 0.62.3", + "aws-smithy-http", + "aws-smithy-json", "aws-smithy-observability", "aws-smithy-query", "aws-smithy-runtime", @@ -1276,13 +1278,13 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.8" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efa49f3c607b92daae0c078d48a4571f599f966dce3caee5f1ea55c4d9073f99" +checksum = "b0b660013a6683ab23797778e21f1f854744fdf05f68204b4cca4c8c04b5d1f4" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", - "aws-smithy-http 0.63.3", + "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -1304,9 +1306,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.11" +version = "1.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52eec3db979d18cb807fc1070961cc51d87d069abe9ab57917769687368a8c6c" +checksum = "2ffcaf626bdda484571968400c326a244598634dc75fd451325a54ad1a59acfc" dependencies = [ "futures-util", "pin-project-lite", @@ -1315,17 +1317,18 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.63.12" +version = "0.64.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87294a084b43d649d967efe58aa1f9e0adc260e13a6938eb904c0ae9b45824ae" +checksum = "6750f3dd509b0694a4377f0293ed2f9630d710b1cebe281fa8bac8f099f88bc6" dependencies = [ - "aws-smithy-http 0.62.6", + "aws-smithy-http", "aws-smithy-types", "bytes", "crc-fast", "hex", - "http 0.2.12", - "http-body 0.4.6", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", "md-5", "pin-project-lite", "sha1", @@ -1335,9 +1338,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.18" +version = "0.60.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35b9c7354a3b13c66f60fe4616d6d1969c9fd36b1b5333a5dfb3ee716b33c588" +checksum = "faf09d74e5e32f76b8762da505a3cd59303e367a664ca67295387baa8c1d7548" dependencies = [ "aws-smithy-types", "bytes", @@ -1346,32 +1349,11 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.6" +version = "0.63.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "826141069295752372f8203c17f28e30c464d22899a43a0c9fd9c458d469c88b" +checksum = "ba1ab2dc1c2c3749ead27180d333c42f11be8b0e934058fb4b2258ee8dbe5231" dependencies = [ "aws-smithy-eventstream", - "aws-smithy-runtime-api", - "aws-smithy-types", - "bytes", - "bytes-utils", - "futures-core", - "futures-util", - "http 0.2.12", - "http 1.4.0", - "http-body 0.4.6", - "percent-encoding", - "pin-project-lite", - "pin-utils", - "tracing", -] - -[[package]] -name = "aws-smithy-http" -version = "0.63.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630e67f2a31094ffa51b210ae030855cb8f3b7ee1329bdd8d085aaf61e8b97fc" -dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -1389,9 +1371,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.1.9" +version = "1.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12fb0abf49ff0cab20fd31ac1215ed7ce0ea92286ba09e2854b42ba5cabe7525" +checksum = "6a2f165a7feee6f263028b899d0a181987f4fa7179a6411a32a439fba7c5f769" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -1419,36 +1401,27 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.61.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" -dependencies = [ - "aws-smithy-types", -] - -[[package]] -name = "aws-smithy-json" -version = "0.62.3" +version = "0.62.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb96aa208d62ee94104645f7b2ecaf77bf27edf161590b6224bfbac2832f979" +checksum = "9648b0bb82a2eedd844052c6ad2a1a822d1f8e3adee5fbf668366717e428856a" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-observability" -version = "0.2.4" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0a46543fbc94621080b3cf553eb4cbbdc41dd9780a30c4756400f0139440a1d" +checksum = "a06c2315d173edbf1920da8ba3a7189695827002e4c0fc961973ab1c54abca9c" dependencies = [ "aws-smithy-runtime-api", ] [[package]] name = "aws-smithy-query" -version = "0.60.13" +version = "0.60.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cebbddb6f3a5bd81553643e9c7daf3cc3dc5b0b5f398ac668630e8a84e6fff0" +checksum = "1a56d79744fb3edb5d722ef79d86081e121d3b9422cb209eb03aea6aa4f21ebd" dependencies = [ "aws-smithy-types", "urlencoding", @@ -1456,12 +1429,12 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.10.0" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3df87c14f0127a0d77eb261c3bc45d5b4833e2a1f63583ebfb728e4852134ee" +checksum = "028999056d2d2fd58a697232f9eec4a643cf73a71cf327690a7edad1d2af2110" dependencies = [ "aws-smithy-async", - "aws-smithy-http 0.63.3", + "aws-smithy-http", "aws-smithy-http-client", "aws-smithy-observability", "aws-smithy-runtime-api", @@ -1481,9 +1454,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.11.3" +version = "1.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49952c52f7eebb72ce2a754d3866cc0f87b97d2a46146b79f80f3a93fb2b3716" +checksum = "876ab3c9c29791ba4ba02b780a3049e21ec63dabda09268b175272c3733a79e6" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1498,9 +1471,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.3" +version = "1.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3a26048eeab0ddeba4b4f9d51654c79af8c3b32357dc5f336cee85ab331c33" +checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" dependencies = [ "base64-simd", "bytes", @@ -1524,18 +1497,18 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.13" +version = "0.60.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b2f670422ff42bf7065031e72b45bc52a3508bd089f743ea90731ca2b6ea57" +checksum = "0ce02add1aa3677d022f8adf81dcbe3046a95f17a1b1e8979c145cd21d3d22b3" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.3.11" +version = "1.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d980627d2dd7bfc32a3c025685a033eeab8d365cc840c631ef59d1b8f428164" +checksum = "47c8323699dd9b3c8d5b3c13051ae9cdef58fd179957c882f8374dd8725962d9" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -2404,9 +2377,9 @@ dependencies = [ [[package]] name = "crc" -version = "3.4.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" dependencies = [ "crc-catalog", ] @@ -2419,15 +2392,14 @@ checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" [[package]] name = "crc-fast" -version = "1.6.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ddc2d09feefeee8bd78101665bd8645637828fa9317f9f292496dbbd8c65ff3" +checksum = "2fd92aca2c6001b1bf5ba0ff84ee74ec8501b52bbef0cac80bf25a6c1d87a83d" dependencies = [ "crc", "digest", - "rand 0.9.2", - "regex", "rustversion", + "spin 0.10.0", ] [[package]] @@ -3261,6 +3233,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -3590,7 +3568,7 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", "rayon", "serde", ] @@ -3600,6 +3578,11 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] [[package]] name = "hashlink" @@ -4448,6 +4431,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "aws-smithy-runtime-api", "bincode", "bytecount", "bytes", @@ -4670,20 +4654,20 @@ dependencies = [ [[package]] name = "lru" -version = "0.12.5" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +checksum = "9f8cc7106155f10bdf99a6f379688f543ad6596a415375b36a59a054ceda1198" dependencies = [ "hashbrown 0.15.5", ] [[package]] name = "lru" -version = "0.14.0" +version = "0.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f8cc7106155f10bdf99a6f379688f543ad6596a415375b36a59a054ceda1198" +checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593" dependencies = [ - "hashbrown 0.15.5", + "hashbrown 0.16.1", ] [[package]] @@ -6063,7 +6047,7 @@ dependencies = [ "bytes", "compact_str", "flate2", - "foldhash", + "foldhash 0.1.5", "hashbrown 0.15.5", "indexmap 2.13.0", "libc", @@ -7652,6 +7636,12 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spin" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" + [[package]] name = "spki" version = "0.6.0" diff --git a/Cargo.toml b/Cargo.toml index ff473a9b9..c1214465c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,8 +40,9 @@ async-tar = "0.5.0" async-tempfile = "0.7.0" async-trait = "0.1.80" async_zip = { version = "0.0.18", features = ["full"] } -aws-config = "1.8.11" -aws-sdk-s3 = "1.118.0" +aws-config = "1.8.15" +aws-sdk-s3 = "1.127.0" +aws-smithy-runtime-api = "1.11.6" bincode = "1.3.3" bytecount = "0.6.3" bytes = "1.5.0" diff --git a/crates/lib/Cargo.toml b/crates/lib/Cargo.toml index 18fe71850..41508c89c 100644 --- a/crates/lib/Cargo.toml +++ b/crates/lib/Cargo.toml @@ -40,6 +40,7 @@ async-tempfile = { workspace = true } async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-s3 = { workspace = true } +aws-smithy-runtime-api = { workspace = true } bincode = { workspace = true } bytecount = { workspace = true } bytes = { workspace = true } diff --git a/crates/lib/src/api/client/versions.rs b/crates/lib/src/api/client/versions.rs index 1eee5b482..93c8cd7b0 100644 --- a/crates/lib/src/api/client/versions.rs +++ b/crates/lib/src/api/client/versions.rs @@ -264,7 +264,7 @@ pub async fn try_download_data_from_version_paths( // Iterate over archive entries and stream them to version store let mut entries = archive.entries()?; while let Some(file) = entries.next().await { - let mut file = match file { + let file = match file { Ok(file) => file, Err(err) => { let err = format!("Could not unwrap file -> {err:?}"); @@ -284,7 +284,7 @@ pub async fn try_download_data_from_version_paths( // Stream the file content directly to version store without loading into memory match version_store - .store_version_from_reader(&file_hash, &mut file) + .store_version_from_reader(&file_hash, Box::new(file), file_size) .await { Ok(_) => { diff --git a/crates/lib/src/core/v_latest/workspaces/commit.rs b/crates/lib/src/core/v_latest/workspaces/commit.rs index 4a142ffe9..3f73a48cb 100644 --- a/crates/lib/src/core/v_latest/workspaces/commit.rs +++ b/crates/lib/src/core/v_latest/workspaces/commit.rs @@ -361,11 +361,12 @@ async fn compute_staged_merkle_tree_node( // Copy file to the version store log::debug!("compute_staged_merkle_tree_node writing file to version store"); + let file_size = tokio::fs::metadata(path).await?.len(); let file = File::open(path).await?; - let mut reader = BufReader::new(file); + let reader = BufReader::new(file); let version_store = workspace.base_repo.version_store()?; version_store - .store_version_from_reader(&hash.to_string(), &mut reader) + .store_version_from_reader(&hash.to_string(), Box::new(reader), file_size) .await?; let file_extension = path.extension().unwrap_or_default().to_string_lossy(); diff --git a/crates/lib/src/error.rs b/crates/lib/src/error.rs index ca3de92a3..e400b54d0 100644 --- a/crates/lib/src/error.rs +++ b/crates/lib/src/error.rs @@ -3,6 +3,8 @@ //! Enumeration for all errors that can occur in the oxen library //! +use aws_smithy_runtime_api::client::orchestrator::HttpResponse; +use aws_smithy_runtime_api::client::result::SdkError; use duckdb::arrow::error::ArrowError; use std::io; use std::num::ParseIntError; @@ -139,6 +141,12 @@ pub enum OxenError { #[error("Invalid version: {0}")] InvalidVersion(StringError), + // Version Store + /// An error uploading a file to the version store + #[error("{0}")] + Upload(StringError), + + // Entry /// A commit entry is not present in the repository. #[error("{0}")] CommitEntryNotFound(StringError), @@ -204,6 +212,10 @@ pub enum OxenError { // Wrappers // // + /// An error encountered dealing with AWS S3 + #[error("AWS S3 error: {0}")] + AwsS3Error(Box, HttpResponse>>), + /// Wraps the error from std::path::strip_prefix. #[error("Error stripping prefix: {0}")] StripPrefixError(#[from] std::path::StripPrefixError), @@ -416,6 +428,11 @@ impl OxenError { OxenError::InvalidVersion(StringError::from(s.as_ref())) } + /// Makes an OxenError::Upload error. + pub fn upload(s: &str) -> Self { + OxenError::Upload(StringError::from(s)) + } + /// Make a new OxenError::OxenUpdateRequired error. pub fn oxen_update_required(s: impl AsRef) -> Self { OxenError::OxenUpdateRequired(StringError::from(s.as_ref())) @@ -431,6 +448,15 @@ impl OxenError { OxenError::ImportFileError(StringError::from(s.as_ref())) } + /// Make a new OxenError::AwsS3Error error. + pub fn aws_s3_error( + e: SdkError, + ) -> Self { + OxenError::AwsS3Error(Box::new(e.map_service_error(|e| { + Box::new(e) as Box + }))) + } + /// Makes a new OxenError::ResourceNotFound error. pub fn resource_not_found(value: impl AsRef) -> Self { OxenError::ResourceNotFound(StringError::from(value.as_ref())) diff --git a/crates/lib/src/storage/local.rs b/crates/lib/src/storage/local.rs index 8d0268af5..3fd43e8c5 100644 --- a/crates/lib/src/storage/local.rs +++ b/crates/lib/src/storage/local.rs @@ -93,7 +93,8 @@ impl VersionStore for LocalVersionStore { async fn store_version_from_reader( &self, hash: &str, - reader: &mut (dyn tokio::io::AsyncRead + Send + Unpin), + mut reader: Box, + _size: u64, ) -> Result<(), OxenError> { let version_dir = self.version_dir(hash); fs::create_dir_all(&version_dir).await?; @@ -102,7 +103,7 @@ impl VersionStore for LocalVersionStore { if !version_path.exists() { let mut file = File::create(&version_path).await?; - tokio::io::copy(reader, &mut file).await?; + tokio::io::copy(&mut *reader, &mut file).await?; } Ok(()) @@ -627,11 +628,11 @@ mod tests { let data = b"test data from reader"; // Create a cursor with the test data - let mut cursor = Cursor::new(data.to_vec()); + let cursor = Cursor::new(data.to_vec()); // Store using the reader store - .store_version_from_reader(hash, &mut cursor) + .store_version_from_reader(hash, Box::new(cursor), data.len() as u64) .await .unwrap(); diff --git a/crates/lib/src/storage/s3.rs b/crates/lib/src/storage/s3.rs index 478adbe2c..6fa73e78f 100644 --- a/crates/lib/src/storage/s3.rs +++ b/crates/lib/src/storage/s3.rs @@ -4,14 +4,16 @@ use async_trait::async_trait; use aws_config::meta::region::RegionProviderChain; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::head_object::HeadObjectError; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use aws_sdk_s3::{Client, config::Region, primitives::ByteStream}; use bytes::Bytes; +use futures::StreamExt; use log; use std::collections::HashMap; use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::OnceCell; use tokio_stream::Stream; @@ -169,15 +171,151 @@ impl VersionStore for S3VersionStore { Ok(()) } + /// Streams file content to S3 without writing to disk. + /// + /// The caller must provide the file size up front. Files up to 100 MB are uploaded in a + /// single PUT request (per AWS best-practice guidelines). Larger files are uploaded via + /// multipart upload with a dynamically chosen part size and up to 16 concurrent part uploads. + /// If any part fails, the multipart upload is cancelled so no orphaned parts are left behind. async fn store_version_from_reader( &self, - _hash: &str, - _reader: &mut (dyn tokio::io::AsyncRead + Send + Unpin), + hash: &str, + reader: Box, + size: u64, ) -> Result<(), OxenError> { - // TODO: Implement S3 version storage from reader - Err(OxenError::basic_str( - "S3VersionStore store_version_from_reader not yet implemented", - )) + let client = self.init_client().await?; + let key = self.generate_key(hash); + + const ONESHOT_SIZE: u64 = 100 * 1024 * 1024; // 100 MB + const MIN_PART_SIZE: usize = 5 * 1024 * 1024; // 5 MB, S3 minimum + const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024; // 5 GB, S3 maximum + const MAX_PARTS: usize = 10_000; // S3 maximum parts per upload + const MAX_CONCURRENT_UPLOADS: usize = 16; + + let mut reader = tokio::io::BufReader::new(reader); + + // Files up to 100 MB: single put_object + if size <= ONESHOT_SIZE { + let mut buf = Vec::with_capacity(size as usize); + tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf) + .await + .map_err(|e| OxenError::upload(&format!("Failed to read: {e}")))?; + client + .put_object() // AWS recommends switching to multipart uploads for > 100 MB + .bucket(&self.bucket) + .key(&key) + .body(ByteStream::from(buf)) + .send() + .await + .map_err(OxenError::aws_s3_error)?; + return Ok(()); + } + + // Scale part size to fit within S3's 10,000 part limit for the file size. + let part_size = ((size as usize).div_ceil(MAX_PARTS)).clamp(MIN_PART_SIZE, MAX_PART_SIZE); + + // Large file: multipart upload + let upload = client + .create_multipart_upload() + .bucket(&self.bucket) + .key(&key) + .send() + .await + .map_err(OxenError::aws_s3_error)?; + + let upload_id = upload + .upload_id() + .ok_or_else(|| OxenError::upload("S3 multipart upload missing upload_id"))? + .to_string(); + + // Read parts sequentially and upload them concurrently via spawned tasks. Each upload + // starts running immediately on the tokio runtime, so uploads proceed in the background + // while we read the next part. FuturesUnordered holds the JoinHandles for collecting + // results and enforcing backpressure. + let mut uploads = futures::stream::FuturesUnordered::new(); + let mut completed_parts = Vec::new(); + let mut part_num = 1; + + let result: Result<(), OxenError> = async { + loop { + let mut buf = vec![0u8; part_size]; + let n = read_full(&mut reader, &mut buf).await?; + if n == 0 { + break; + } + buf.truncate(n); + uploads.push(tokio::spawn(upload_part( + client.clone(), + self.bucket.clone(), + key.clone(), + upload_id.clone(), + part_num, + buf, + ))); + part_num += 1; + + // Stop reading until we have less than MAX_CONCURRENT_UPLOADS in flight + while uploads.len() >= MAX_CONCURRENT_UPLOADS { + match uploads.next().await { + Some(Ok(result)) => completed_parts.push(result?), + Some(Err(e)) => { + return Err(OxenError::upload(&format!("Upload task panicked: {e}"))); + } + None => break, // Shouldn't be possible since we check uploads.len() first + } + } + } + + // Wait for remaining uploads + while let Some(join_result) = uploads.next().await { + match join_result { + Ok(result) => completed_parts.push(result?), + Err(e) => return Err(OxenError::upload(&format!("Upload task panicked: {e}"))), + } + } + Ok(()) + } + .await; + + match result { + // All parts uploaded successfully + Ok(()) => { + // Complete the multipart upload with the required special request + completed_parts.sort_by_key(|p| p.part_number); + + let completed = CompletedMultipartUpload::builder() + .set_parts(Some(completed_parts)) + .build(); + + client + .complete_multipart_upload() + .bucket(&self.bucket) + .key(&key) + .upload_id(&upload_id) + .multipart_upload(completed) + .send() + .await + .map_err(OxenError::aws_s3_error)?; + Ok(()) + } + // Upload failed + Err(e) => { + // cancel in-flight tasks + for handle in uploads.iter() { + // we don't need to await aborted handles--they stop at the next await point automatically + handle.abort(); + } + // abort the multipart upload + let _ = client + .abort_multipart_upload() + .bucket(&self.bucket) + .key(&key) + .upload_id(&upload_id) + .send() + .await; + Err(e) + } + } } async fn store_version(&self, hash: &str, data: &[u8]) -> Result<(), OxenError> { @@ -500,6 +638,58 @@ impl VersionStore for S3VersionStore { } } +/// Uploads a single part in an ongoing multipart S3 upload operation. +async fn upload_part( + client: Arc, + bucket: String, + key: String, + upload_id: String, + part_num: i32, + data: Vec, +) -> Result { + let resp = client + .upload_part() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .part_number(part_num) + .body(ByteStream::from(data)) + .send() + .await + .map_err(OxenError::aws_s3_error)?; + + let etag = resp + .e_tag() + .map(|s| s.to_string()) + .ok_or_else(|| OxenError::upload("S3 upload_part response missing ETag"))?; + + Ok(CompletedPart::builder() + .part_number(part_num) + .e_tag(etag) + .build()) +} + +/// Read from `reader` until `buf` is full or EOF, returning the number of +/// bytes read. Unlike a single `read()` call this won't return a short read +/// unless EOF is reached. +async fn read_full( + reader: &mut (dyn tokio::io::AsyncRead + Send + Unpin), + buf: &mut [u8], +) -> Result { + let mut offset = 0; + while offset < buf.len() { + let n = reader + .read(&mut buf[offset..]) + .await + .map_err(|e| OxenError::upload(&format!("Failed to read from reader: {e}")))?; + if n == 0 { + break; + } + offset += n; + } + Ok(offset) +} + use std::io; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/crates/lib/src/storage/version_store.rs b/crates/lib/src/storage/version_store.rs index 67ad48d61..e9781e70f 100644 --- a/crates/lib/src/storage/version_store.rs +++ b/crates/lib/src/storage/version_store.rs @@ -102,11 +102,13 @@ pub trait VersionStore: Debug + Send + Sync + 'static { /// /// # Arguments /// * `hash` - The content hash that identifies this version - /// * `reader` - Any type that implements Read trait + /// * `reader` - An owned async reader + /// * `size` - Total size in bytes, used to tune upload chunk sizes async fn store_version_from_reader( &self, hash: &str, - reader: &mut (dyn AsyncRead + Send + Unpin), + reader: Box, + size: u64, ) -> Result<(), OxenError>; /// Store a version file from bytes diff --git a/crates/server/src/controllers/commits.rs b/crates/server/src/controllers/commits.rs index 07f9c7666..90ef23686 100644 --- a/crates/server/src/controllers/commits.rs +++ b/crates/server/src/controllers/commits.rs @@ -990,7 +990,7 @@ async fn unpack_compressed_data( } // Unpack tarball to our hidden dir using async streaming - unpack_entry_tarball_async(repo, &buffer).await?; + unpack_entry_tarball_async(repo, buffer).await?; Ok(()) } @@ -1025,7 +1025,7 @@ pub async fn upload( let repo = get_repo(&app_data.path, &namespace, &name)?; // Read bytes from body - let mut bytes = web::BytesMut::new(); + let mut bytes = Vec::new(); while let Some(item) = body.next().await { bytes.extend_from_slice(&item.map_err(|_| OxenHttpError::FailedToReadRequestPayload)?); } @@ -1039,8 +1039,6 @@ pub async fn upload( ByteSize::b(total_size) ); - // Unpack in background thread because could take awhile - // std::thread::spawn(move || { // Get tar.gz bytes for history/COMMIT_ID data log::debug!( "Decompressing {} bytes to repo at {}", @@ -1048,8 +1046,7 @@ pub async fn upload( repo.path.display() ); // Unpack tarball to repo using async streaming - unpack_entry_tarball_async(&repo, &bytes).await?; - // }); + unpack_entry_tarball_async(&repo, bytes).await?; Ok(HttpResponse::Ok().json(StatusMessage::resource_created())) } @@ -1245,7 +1242,7 @@ async fn unpack_tree_tarball(tmp_dir: &Path, data: &[u8]) -> Result<(), OxenErro async fn unpack_entry_tarball_async( repo: &LocalRepository, - compressed_data: &[u8], + compressed_data: Vec, ) -> Result<(), OxenError> { let hidden_dir = util::fs::oxen_hidden_dir(&repo.path); let version_store = repo.version_store()?; @@ -1267,13 +1264,10 @@ async fn unpack_entry_tarball_async( if path.starts_with("versions") && path.to_string_lossy().contains("files") { // Handle version files with streaming let hash = extract_hash_from_path(&path)?; + let entry_size = file.header().size()?; - // Convert futures::io::AsyncRead to tokio::io::AsyncRead using compat - // let mut tokio_reader = file.compat(); - - // Use streaming storage - no memory buffering needed! version_store - .store_version_from_reader(&hash, &mut file) + .store_version_from_reader(&hash, Box::new(file), entry_size) .await?; } else { // For non-version files, unpack to hidden dir diff --git a/crates/server/src/controllers/versions.rs b/crates/server/src/controllers/versions.rs index 565dc7f95..0488f91a1 100644 --- a/crates/server/src/controllers/versions.rs +++ b/crates/server/src/controllers/versions.rs @@ -541,7 +541,8 @@ pub async fn save_multiparts( field_bytes.extend_from_slice(&chunk); } - let mut reader: Box = if is_gzipped { + let field_size = field_bytes.len() as u64; + let reader: Box = if is_gzipped { // async decompression let cursor = std::io::Cursor::new(field_bytes); let buf_reader = BufReader::new(cursor); @@ -552,7 +553,7 @@ pub async fn save_multiparts( }; match version_store - .store_version_from_reader(&upload_filehash, &mut reader) + .store_version_from_reader(&upload_filehash, reader, field_size) .await { Ok(_) => {