Skip to content

Commit 510937a

Browse files
committed
remove VersionStore::store_version_from_path and its implementations in favor of using VersionStore::store_version_from_reader
1 parent 16d3337 commit 510937a

File tree

10 files changed

+37
-59
lines changed

10 files changed

+37
-59
lines changed

.claude/CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ oxen push origin main # Push to remote
112112
- The Python project calls into the Rust project. Whenever changing the Rust code, check to see if the Python code needs to be updated.
113113
- After changing any Rust or Python code, verify that Rust tests pass with `bin/test-rust` and Python tests pass with `bin/test-rust -p`
114114
- When updating a dependency, prefer updating to the latest stable version.
115+
- Any new or changed Rust code that touches IO (file system, network, etc.) should be async code. Instead of std::io or std::fs, use equivalents from tokio. When an external dependency doesn't support async, use tokio's spawn_blocking functionality.
115116

116117
# Testing Rules
117118
- Use the test helpers in `crates/lib/src/test.rs` (e.g., `run_empty_local_repo_test`) for unit tests in the lib code.

TODO.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,10 @@
3535
* Storage Backends
3636
* Local Backend
3737
* S3 Backend
38-
* This would be ridiculous # of files if chunking is turned on...
38+
* This would be ridiculous # of files if chunking is turned on...
39+
40+
# Large File Support
41+
42+
- First phase: S3 backend -- gives us room on the server to store files
43+
- Second phase: Stream version files from disk -- allows manipulating files that don't fit in memory
44+
- Third phase: Move to chunked version file storage(?). We should discuss the pros/cons of this approach. See [Merkle Tree Refactor](#merkle-tree-refactor) above.

crates/lib/src/api/client/entries.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,13 @@ pub async fn download_entries_to_repo(
239239

240240
// Save contents to version store
241241
let version_store = local_repo.version_store()?;
242-
let file = std::fs::read(local_path).map_err(|e| {
243-
OxenError::basic_str(format!("Failed to read file '{remote_path:?}': {e}"))
244-
})?;
245-
let hash = util::hasher::hash_buffer(&file);
242+
243+
let file_bytes = tokio::fs::read(local_path).await?;
244+
let hash = util::hasher::hash_buffer(&file_bytes);
245+
let size = file_bytes.len() as u64;
246+
let cursor = std::io::Cursor::new(file_bytes);
246247
version_store
247-
.store_version_from_path(&hash, local_path)
248+
.store_version_from_reader(&hash, Box::new(cursor), size)
248249
.await?;
249250
}
250251
}

crates/lib/src/core/v_latest/add.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -494,10 +494,14 @@ pub async fn process_add_dir(
494494
&conflicts,
495495
) {
496496
Ok(Some(node)) => {
497+
let file = tokio::fs::File::open(&path).await?;
498+
let size = file.metadata().await?.len();
499+
let reader = tokio::io::BufReader::new(file);
497500
version_store
498-
.store_version_from_path(
501+
.store_version_from_reader(
499502
&file_status.hash.to_string(),
500-
&path,
503+
Box::new(reader),
504+
size,
501505
)
502506
.await?;
503507

@@ -657,8 +661,11 @@ async fn add_file_inner(
657661

658662
let file_name = path.file_name().unwrap_or_default().to_string_lossy();
659663
let file_status = determine_file_status(&maybe_dir_node, &file_name, path)?;
664+
let file = tokio::fs::File::open(path).await?;
665+
let size = file.metadata().await?.len();
666+
let reader = tokio::io::BufReader::new(file);
660667
version_store
661-
.store_version_from_path(&file_status.hash.to_string(), path)
668+
.store_version_from_reader(&file_status.hash.to_string(), Box::new(reader), size)
662669
.await?;
663670

664671
let seen_dirs = Arc::new(Mutex::new(HashSet::new()));

crates/lib/src/core/v_latest/branches.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,11 @@ async fn cleanup_removed_files(
416416
let version_store = repo.version_store()?;
417417
for (hash, full_path) in files_to_store {
418418
log::debug!("Storing hash {hash:?} and path {full_path:?}");
419+
let file = tokio::fs::File::open(&full_path).await?;
420+
let size = file.metadata().await?.len();
421+
let reader = tokio::io::BufReader::new(file);
419422
version_store
420-
.store_version_from_path(&hash.to_string(), &full_path)
423+
.store_version_from_reader(&hash.to_string(), Box::new(reader), size)
421424
.await?;
422425
}
423426
}

crates/lib/src/core/v_latest/workspaces/files.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -843,8 +843,11 @@ async fn p_add_file(
843843

844844
// Store the file in the version store using the hash as the key
845845
let hash_str = file_status.hash.to_string();
846+
let file = tokio::fs::File::open(&full_path).await?;
847+
let size = file.metadata().await?.len();
848+
let reader = tokio::io::BufReader::new(file);
846849
version_store
847-
.store_version_from_path(&hash_str, &full_path)
850+
.store_version_from_reader(&hash_str, Box::new(reader), size)
848851
.await?;
849852
let conflicts: HashSet<PathBuf> = repositories::merge::list_conflicts(workspace_repo)?
850853
.into_iter()

crates/lib/src/repositories/remote_mode/checkout.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,11 @@ pub async fn create_checkout(
5555
let full_path = repo.path.join(&path);
5656

5757
if full_path.exists() {
58+
let file = tokio::fs::File::open(&full_path).await?;
59+
let size = file.metadata().await?.len();
60+
let reader = tokio::io::BufReader::new(file);
5861
version_store
59-
.store_version_from_path(&node.hash.to_string(), &full_path)
62+
.store_version_from_reader(&node.hash.to_string(), Box::new(reader), size)
6063
.await?;
6164
}
6265
}
@@ -448,7 +451,7 @@ mod tests {
448451
// Regression test: checkout in remote mode should not fail when a file
449452
// exists in the source branch's tree but is not materialized on disk.
450453
// Previously, r_remove_if_not_in_target would push non-existent paths
451-
// into files_to_store, causing store_version_from_path to fail with NotFound.
454+
// into files_to_store, causing store_version_from_reader to fail with NotFound.
452455
#[tokio::test]
453456
async fn test_remote_mode_checkout_file_in_tree_but_not_on_disk() -> Result<(), OxenError> {
454457
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {

crates/lib/src/storage/local.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,6 @@ impl VersionStore for LocalVersionStore {
7979
Ok(())
8080
}
8181

82-
async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError> {
83-
let version_dir = self.version_dir(hash);
84-
fs::create_dir_all(&version_dir).await?;
85-
86-
let version_path = self.version_path(hash);
87-
if !version_path.exists() {
88-
fs::copy(file_path, &version_path).await?;
89-
}
90-
Ok(())
91-
}
92-
9382
async fn store_version_from_reader(
9483
&self,
9584
hash: &str,

crates/lib/src/storage/s3.rs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use bytes::Bytes;
1010
use futures::StreamExt;
1111
use log;
1212
use std::collections::HashMap;
13-
use std::io::Read;
1413
use std::path::{Path, PathBuf};
1514
use std::sync::Arc;
1615
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -155,33 +154,6 @@ impl VersionStore for S3VersionStore {
155154
}
156155
}
157156

158-
async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError> {
159-
// get the client
160-
let client = self.init_client().await?;
161-
// get file content from the path
162-
let mut file = std::fs::File::open(file_path).map_err(|e| {
163-
OxenError::basic_str(format!("Failed to open file {}: {e}", file_path.display()))
164-
})?;
165-
166-
let mut buffer = Vec::new();
167-
file.read_to_end(&mut buffer).map_err(|e| {
168-
OxenError::basic_str(format!("Failed to read file {}: {e}", file_path.display()))
169-
})?;
170-
171-
let key = self.generate_key(hash);
172-
let body = ByteStream::from(buffer);
173-
client
174-
.put_object()
175-
.bucket(&self.bucket)
176-
.key(&key)
177-
.body(body)
178-
.send()
179-
.await
180-
.map_err(|e| OxenError::basic_str(format!("Failed to store version in S3: {e}")))?;
181-
182-
Ok(())
183-
}
184-
185157
async fn store_version_from_reader(
186158
&self,
187159
hash: &str,

crates/lib/src/storage/version_store.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,6 @@ pub trait VersionStore: Debug + Send + Sync + 'static {
9191
/// Initialize the storage backend
9292
async fn init(&self) -> Result<(), OxenError>;
9393

94-
/// Store a version file from a file path
95-
///
96-
/// # Arguments
97-
/// * `hash` - The content hash that identifies this version
98-
/// * `file_path` - Path to the file to store
99-
async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError>;
100-
10194
/// Store a version file from an async reader
10295
///
10396
/// # Arguments

0 commit comments

Comments
 (0)