diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index ffcf3dc7d..38560f93b 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -112,6 +112,7 @@ oxen push origin main # Push to remote - The Python project calls into the Rust project. Whenever changing the Rust code, check to see if the Python code needs to be updated. - After changing any Rust or Python code, verify that Rust tests pass with `bin/test-rust` and Python tests pass with `bin/test-rust -p` - When updating a dependency, prefer updating to the latest stable version. +- Any new or changed Rust code that touches IO (file system, network, etc.) should be async code. Instead of std::io or std::fs, use equivalents from tokio. When an external dependency doesn't support async, use tokio's spawn_blocking functionality. # Testing Rules - Use the test helpers in `crates/lib/src/test.rs` (e.g., `run_empty_local_repo_test`) for unit tests in the lib code. diff --git a/TODO.md b/TODO.md index cc4f05785..70126d2da 100644 --- a/TODO.md +++ b/TODO.md @@ -35,4 +35,10 @@ * Storage Backends * Local Backend * S3 Backend - * This would be ridiculous # of files if chunking is turned on... \ No newline at end of file + * This would be ridiculous # of files if chunking is turned on... + +# Large File Support + +- First phase: S3 backend -- gives us room on the server to store files +- Second phase: Stream version files from disk -- allows manipulating files that don't fit in memory +- Third phase: Move to chunked version file storage(?). We should discuss the pros/cons of this approach. See [Merkle Tree Refactor](#merkle-tree-refactor) above. diff --git a/crates/lib/src/api/client/entries.rs b/crates/lib/src/api/client/entries.rs index d39023337..4a859cc20 100644 --- a/crates/lib/src/api/client/entries.rs +++ b/crates/lib/src/api/client/entries.rs @@ -239,12 +239,13 @@ pub async fn download_entries_to_repo( // Save contents to version store let version_store = local_repo.version_store()?; - let file = std::fs::read(local_path).map_err(|e| { - OxenError::basic_str(format!("Failed to read file '{remote_path:?}': {e}")) - })?; - let hash = util::hasher::hash_buffer(&file); + + let file_bytes = tokio::fs::read(local_path).await?; + let hash = util::hasher::hash_buffer(&file_bytes); + let size = file_bytes.len() as u64; + let cursor = std::io::Cursor::new(file_bytes); version_store - .store_version_from_path(&hash, local_path) + .store_version_from_reader(&hash, Box::new(cursor), size) .await?; } } diff --git a/crates/lib/src/core/v_latest/add.rs b/crates/lib/src/core/v_latest/add.rs index 91cbc4469..fe368abe1 100644 --- a/crates/lib/src/core/v_latest/add.rs +++ b/crates/lib/src/core/v_latest/add.rs @@ -494,10 +494,14 @@ pub async fn process_add_dir( &conflicts, ) { Ok(Some(node)) => { + let file = tokio::fs::File::open(&path).await?; + let size = file.metadata().await?.len(); + let reader = tokio::io::BufReader::new(file); version_store - .store_version_from_path( + .store_version_from_reader( &file_status.hash.to_string(), - &path, + Box::new(reader), + size, ) .await?; @@ -657,8 +661,11 @@ async fn add_file_inner( let file_name = path.file_name().unwrap_or_default().to_string_lossy(); let file_status = determine_file_status(&maybe_dir_node, &file_name, path)?; + let file = tokio::fs::File::open(path).await?; + let size = file.metadata().await?.len(); + let reader = tokio::io::BufReader::new(file); version_store - .store_version_from_path(&file_status.hash.to_string(), path) + .store_version_from_reader(&file_status.hash.to_string(), Box::new(reader), size) .await?; let seen_dirs = Arc::new(Mutex::new(HashSet::new())); diff --git a/crates/lib/src/core/v_latest/branches.rs b/crates/lib/src/core/v_latest/branches.rs index fd55c41ab..dc5781eeb 100644 --- a/crates/lib/src/core/v_latest/branches.rs +++ b/crates/lib/src/core/v_latest/branches.rs @@ -416,8 +416,11 @@ async fn cleanup_removed_files( let version_store = repo.version_store()?; for (hash, full_path) in files_to_store { log::debug!("Storing hash {hash:?} and path {full_path:?}"); + let file = tokio::fs::File::open(&full_path).await?; + let size = file.metadata().await?.len(); + let reader = tokio::io::BufReader::new(file); version_store - .store_version_from_path(&hash.to_string(), &full_path) + .store_version_from_reader(&hash.to_string(), Box::new(reader), size) .await?; } } diff --git a/crates/lib/src/core/v_latest/workspaces/files.rs b/crates/lib/src/core/v_latest/workspaces/files.rs index cda1f0cc3..a42ce98a9 100644 --- a/crates/lib/src/core/v_latest/workspaces/files.rs +++ b/crates/lib/src/core/v_latest/workspaces/files.rs @@ -899,8 +899,11 @@ async fn p_add_file( // Store the file in the version store using the hash as the key let hash_str = file_status.hash.to_string(); + let file = tokio::fs::File::open(&full_path).await?; + let size = file.metadata().await?.len(); + let reader = tokio::io::BufReader::new(file); version_store - .store_version_from_path(&hash_str, &full_path) + .store_version_from_reader(&hash_str, Box::new(reader), size) .await?; let conflicts: HashSet = repositories::merge::list_conflicts(workspace_repo)? .into_iter() diff --git a/crates/lib/src/repositories/remote_mode/checkout.rs b/crates/lib/src/repositories/remote_mode/checkout.rs index a19e19c69..a6431f361 100644 --- a/crates/lib/src/repositories/remote_mode/checkout.rs +++ b/crates/lib/src/repositories/remote_mode/checkout.rs @@ -55,8 +55,11 @@ pub async fn create_checkout( let full_path = repo.path.join(&path); if full_path.exists() { + let file = tokio::fs::File::open(&full_path).await?; + let size = file.metadata().await?.len(); + let reader = tokio::io::BufReader::new(file); version_store - .store_version_from_path(&node.hash.to_string(), &full_path) + .store_version_from_reader(&node.hash.to_string(), Box::new(reader), size) .await?; } } @@ -448,7 +451,7 @@ mod tests { // Regression test: checkout in remote mode should not fail when a file // exists in the source branch's tree but is not materialized on disk. // Previously, r_remove_if_not_in_target would push non-existent paths - // into files_to_store, causing store_version_from_path to fail with NotFound. + // into files_to_store, causing store_version_from_reader to fail with NotFound. #[tokio::test] async fn test_remote_mode_checkout_file_in_tree_but_not_on_disk() -> Result<(), OxenError> { test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move { diff --git a/crates/lib/src/storage/local.rs b/crates/lib/src/storage/local.rs index 3fd43e8c5..41e3857e0 100644 --- a/crates/lib/src/storage/local.rs +++ b/crates/lib/src/storage/local.rs @@ -79,17 +79,6 @@ impl VersionStore for LocalVersionStore { Ok(()) } - async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError> { - let version_dir = self.version_dir(hash); - fs::create_dir_all(&version_dir).await?; - - let version_path = self.version_path(hash); - if !version_path.exists() { - fs::copy(file_path, &version_path).await?; - } - Ok(()) - } - async fn store_version_from_reader( &self, hash: &str, diff --git a/crates/lib/src/storage/s3.rs b/crates/lib/src/storage/s3.rs index de986115c..809210a7e 100644 --- a/crates/lib/src/storage/s3.rs +++ b/crates/lib/src/storage/s3.rs @@ -10,7 +10,6 @@ use bytes::Bytes; use futures::StreamExt; use log; use std::collections::HashMap; -use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -155,33 +154,6 @@ impl VersionStore for S3VersionStore { } } - async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError> { - // get the client - let client = self.init_client().await?; - // get file content from the path - let mut file = std::fs::File::open(file_path).map_err(|e| { - OxenError::basic_str(format!("Failed to open file {}: {e}", file_path.display())) - })?; - - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer).map_err(|e| { - OxenError::basic_str(format!("Failed to read file {}: {e}", file_path.display())) - })?; - - let key = self.generate_key(hash); - let body = ByteStream::from(buffer); - client - .put_object() - .bucket(&self.bucket) - .key(&key) - .body(body) - .send() - .await - .map_err(|e| OxenError::basic_str(format!("Failed to store version in S3: {e}")))?; - - Ok(()) - } - /// Streams file content to S3 without writing to disk. /// /// The caller must provide the file size up front. Files up to 100 MB are uploaded in a diff --git a/crates/lib/src/storage/version_store.rs b/crates/lib/src/storage/version_store.rs index e9781e70f..a415809a0 100644 --- a/crates/lib/src/storage/version_store.rs +++ b/crates/lib/src/storage/version_store.rs @@ -91,13 +91,6 @@ pub trait VersionStore: Debug + Send + Sync + 'static { /// Initialize the storage backend async fn init(&self) -> Result<(), OxenError>; - /// Store a version file from a file path - /// - /// # Arguments - /// * `hash` - The content hash that identifies this version - /// * `file_path` - Path to the file to store - async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError>; - /// Store a version file from an async reader /// /// # Arguments