Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .claude/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ oxen push origin main # Push to remote
- The Python project calls into the Rust project. Whenever changing the Rust code, check to see if the Python code needs to be updated.
- After changing any Rust or Python code, verify that Rust tests pass with `bin/test-rust` and Python tests pass with `bin/test-rust -p`
- When updating a dependency, prefer updating to the latest stable version.
- Any new or changed Rust code that touches IO (file system, network, etc.) should be async code. Instead of std::io or std::fs, use equivalents from tokio. When an external dependency doesn't support async, use tokio's spawn_blocking functionality.

# Testing Rules
- Use the test helpers in `crates/lib/src/test.rs` (e.g., `run_empty_local_repo_test`) for unit tests in the lib code.
Expand Down
8 changes: 7 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@
* Storage Backends
* Local Backend
* S3 Backend
* This would be ridiculous # of files if chunking is turned on...
* This would be ridiculous # of files if chunking is turned on...

# Large File Support

- First phase: S3 backend -- gives us room on the server to store files
- Second phase: Stream version files from disk -- allows manipulating files that don't fit in memory
- Third phase: Move to chunked version file storage(?). We should discuss the pros/cons of this approach. See [Merkle Tree Refactor](#merkle-tree-refactor) above.
11 changes: 6 additions & 5 deletions crates/lib/src/api/client/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,13 @@ pub async fn download_entries_to_repo(

// Save contents to version store
let version_store = local_repo.version_store()?;
let file = std::fs::read(local_path).map_err(|e| {
OxenError::basic_str(format!("Failed to read file '{remote_path:?}': {e}"))
})?;
let hash = util::hasher::hash_buffer(&file);

let file_bytes = tokio::fs::read(local_path).await?;
let hash = util::hasher::hash_buffer(&file_bytes);
let size = file_bytes.len() as u64;
let cursor = std::io::Cursor::new(file_bytes);
version_store
.store_version_from_path(&hash, local_path)
.store_version_from_reader(&hash, Box::new(cursor), size)
.await?;
}
}
Expand Down
13 changes: 10 additions & 3 deletions crates/lib/src/core/v_latest/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,14 @@ pub async fn process_add_dir(
&conflicts,
) {
Ok(Some(node)) => {
let file = tokio::fs::File::open(&path).await?;
let size = file.metadata().await?.len();
let reader = tokio::io::BufReader::new(file);
version_store
.store_version_from_path(
.store_version_from_reader(
&file_status.hash.to_string(),
&path,
Box::new(reader),
size,
)
.await?;

Expand Down Expand Up @@ -657,8 +661,11 @@ async fn add_file_inner(

let file_name = path.file_name().unwrap_or_default().to_string_lossy();
let file_status = determine_file_status(&maybe_dir_node, &file_name, path)?;
let file = tokio::fs::File::open(path).await?;
let size = file.metadata().await?.len();
let reader = tokio::io::BufReader::new(file);
version_store
.store_version_from_path(&file_status.hash.to_string(), path)
.store_version_from_reader(&file_status.hash.to_string(), Box::new(reader), size)
.await?;

let seen_dirs = Arc::new(Mutex::new(HashSet::new()));
Expand Down
5 changes: 4 additions & 1 deletion crates/lib/src/core/v_latest/branches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,11 @@ async fn cleanup_removed_files(
let version_store = repo.version_store()?;
for (hash, full_path) in files_to_store {
log::debug!("Storing hash {hash:?} and path {full_path:?}");
let file = tokio::fs::File::open(&full_path).await?;
let size = file.metadata().await?.len();
let reader = tokio::io::BufReader::new(file);
version_store
.store_version_from_path(&hash.to_string(), &full_path)
.store_version_from_reader(&hash.to_string(), Box::new(reader), size)
.await?;
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/lib/src/core/v_latest/workspaces/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,11 @@ async fn p_add_file(

// Store the file in the version store using the hash as the key
let hash_str = file_status.hash.to_string();
let file = tokio::fs::File::open(&full_path).await?;
let size = file.metadata().await?.len();
let reader = tokio::io::BufReader::new(file);
version_store
.store_version_from_path(&hash_str, &full_path)
.store_version_from_reader(&hash_str, Box::new(reader), size)
.await?;
let conflicts: HashSet<PathBuf> = repositories::merge::list_conflicts(workspace_repo)?
.into_iter()
Expand Down
7 changes: 5 additions & 2 deletions crates/lib/src/repositories/remote_mode/checkout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ pub async fn create_checkout(
let full_path = repo.path.join(&path);

if full_path.exists() {
let file = tokio::fs::File::open(&full_path).await?;
let size = file.metadata().await?.len();
let reader = tokio::io::BufReader::new(file);
version_store
.store_version_from_path(&node.hash.to_string(), &full_path)
.store_version_from_reader(&node.hash.to_string(), Box::new(reader), size)
.await?;
}
}
Expand Down Expand Up @@ -448,7 +451,7 @@ mod tests {
// Regression test: checkout in remote mode should not fail when a file
// exists in the source branch's tree but is not materialized on disk.
// Previously, r_remove_if_not_in_target would push non-existent paths
// into files_to_store, causing store_version_from_path to fail with NotFound.
// into files_to_store, causing store_version_from_reader to fail with NotFound.
#[tokio::test]
async fn test_remote_mode_checkout_file_in_tree_but_not_on_disk() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
Expand Down
11 changes: 0 additions & 11 deletions crates/lib/src/storage/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,6 @@ impl VersionStore for LocalVersionStore {
Ok(())
}

async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError> {
let version_dir = self.version_dir(hash);
fs::create_dir_all(&version_dir).await?;

let version_path = self.version_path(hash);
if !version_path.exists() {
fs::copy(file_path, &version_path).await?;
}
Ok(())
}

async fn store_version_from_reader(
&self,
hash: &str,
Expand Down
28 changes: 0 additions & 28 deletions crates/lib/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use bytes::Bytes;
use futures::StreamExt;
use log;
use std::collections::HashMap;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand Down Expand Up @@ -155,33 +154,6 @@ impl VersionStore for S3VersionStore {
}
}

async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError> {
// get the client
let client = self.init_client().await?;
// get file content from the path
let mut file = std::fs::File::open(file_path).map_err(|e| {
OxenError::basic_str(format!("Failed to open file {}: {e}", file_path.display()))
})?;

let mut buffer = Vec::new();
file.read_to_end(&mut buffer).map_err(|e| {
OxenError::basic_str(format!("Failed to read file {}: {e}", file_path.display()))
})?;

let key = self.generate_key(hash);
let body = ByteStream::from(buffer);
client
.put_object()
.bucket(&self.bucket)
.key(&key)
.body(body)
.send()
.await
.map_err(|e| OxenError::basic_str(format!("Failed to store version in S3: {e}")))?;

Ok(())
}

/// Streams file content to S3 without writing to disk.
///
/// The caller must provide the file size up front. Files up to 100 MB are uploaded in a
Expand Down
7 changes: 0 additions & 7 deletions crates/lib/src/storage/version_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ pub trait VersionStore: Debug + Send + Sync + 'static {
/// Initialize the storage backend
async fn init(&self) -> Result<(), OxenError>;

/// Store a version file from a file path
///
/// # Arguments
/// * `hash` - The content hash that identifies this version
/// * `file_path` - Path to the file to store
async fn store_version_from_path(&self, hash: &str, file_path: &Path) -> Result<(), OxenError>;

/// Store a version file from an async reader
///
/// # Arguments
Expand Down
Loading