Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install Rust 1.89
uses: dtolnay/rust-toolchain@1.89.0
- name: Install Rust 1.90
uses: dtolnay/rust-toolchain@1.90.0
with:
components: clippy
- uses: ./.github/actions/cache-rust-build
Expand All @@ -56,8 +56,8 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install Rust 1.89
uses: dtolnay/rust-toolchain@1.89.0
- name: Install Rust 1.90
uses: dtolnay/rust-toolchain@1.90.0
with:
components: clippy
- uses: ./.github/actions/cache-rust-build
Expand All @@ -69,8 +69,8 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install Rust 1.89
uses: dtolnay/rust-toolchain@1.89.0
- name: Install Rust 1.90
uses: dtolnay/rust-toolchain@1.90.0
with:
components: clippy
- name: Set up Git LFS
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/git-xet-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ jobs:
target: aarch64
steps:
- uses: actions/checkout@v4
- name: Install Rust 1.89
uses: dtolnay/rust-toolchain@1.89.0
- name: Install Rust 1.90
uses: dtolnay/rust-toolchain@1.90.0
- uses: ./.github/actions/cache-rust-build
- name: Build
run: |
Expand All @@ -50,8 +50,8 @@ jobs:
target: aarch64
steps:
- uses: actions/checkout@v4
- name: Install Rust 1.89
uses: dtolnay/rust-toolchain@1.89.0
- name: Install Rust 1.90
uses: dtolnay/rust-toolchain@1.90.0
- uses: ./.github/actions/cache-rust-build
- name: Build
run: |
Expand Down Expand Up @@ -85,8 +85,8 @@ jobs:
target: x86_64
steps:
- uses: actions/checkout@v4
- name: Install Rust 1.89
uses: dtolnay/rust-toolchain@1.89.0
- name: Install Rust 1.90
uses: dtolnay/rust-toolchain@1.90.0
- uses: ./.github/actions/cache-rust-build
- name: Install WiX
run: |
Expand Down
61 changes: 61 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"merklehash",
"progress_tracking",
"utils",
"xet-mount",
"xet_runtime",
]

Expand Down
4 changes: 2 additions & 2 deletions cas_client/src/download_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use utils::singleflight::Group;

use crate::error::{CasClientError, Result};
use crate::http_client::Api;
use crate::output_provider::OutputProvider;
use crate::output_provider::SeekingOutputProvider;
use crate::remote_client::{PREFIX_DEFAULT, get_reconstruction_with_endpoint_and_client};
use crate::retry_wrapper::{RetryWrapper, RetryableReqwestError};

Expand Down Expand Up @@ -296,7 +296,7 @@ pub(crate) struct ChunkRangeWrite {
pub(crate) struct FetchTermDownloadOnceAndWriteEverywhereUsed {
pub download: FetchTermDownload,
// pub write_offset: u64, // start position of the writer to write to
pub output: OutputProvider,
pub output: SeekingOutputProvider,
pub writes: Vec<ChunkRangeWrite>,
}

Expand Down
26 changes: 13 additions & 13 deletions cas_client/src/interface.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -9,9 +8,9 @@ use merklehash::MerkleHash;
use progress_tracking::item_tracking::SingleItemProgressUpdater;
use progress_tracking::upload_tracking::CompletionTracker;

#[cfg(not(target_family = "wasm"))]
use crate::OutputProvider;
use crate::error::Result;
#[cfg(not(target_family = "wasm"))]
use crate::{SeekingOutputProvider, SequentialOutput};

/// A Client to the Shard service. The shard service
/// provides for
Expand All @@ -25,24 +24,25 @@ pub trait Client {
///
/// The http_client passed in is a non-authenticated client. This is used to directly communicate
/// with the backing store (S3) to retrieve xorbs.
///
/// Content is written in-order to the provided SequentialOutput
#[cfg(not(target_family = "wasm"))]
async fn get_file(
async fn get_file_with_sequential_writer(
&self,
hash: &MerkleHash,
byte_range: Option<FileRange>,
output_provider: &OutputProvider,
output_provider: SequentialOutput,
progress_updater: Option<Arc<SingleItemProgressUpdater>>,
) -> Result<u64>;

#[cfg(not(target_family = "wasm"))]
async fn batch_get_file(&self, files: HashMap<MerkleHash, &OutputProvider>) -> Result<u64> {
let mut n_bytes = 0;
// Provide the basic naive implementation as a default.
for (h, w) in files {
n_bytes += self.get_file(&h, None, w, None).await?;
}
Ok(n_bytes)
}
async fn get_file_with_parallel_writer(
&self,
hash: &MerkleHash,
byte_range: Option<FileRange>,
output_provider: SeekingOutputProvider,
progress_updater: Option<Arc<SingleItemProgressUpdater>>,
) -> Result<u64>;

async fn get_file_reconstruction_info(
&self,
Expand Down
2 changes: 1 addition & 1 deletion cas_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use interface::Client;
#[cfg(not(target_family = "wasm"))]
pub use local_client::LocalClient;
#[cfg(not(target_family = "wasm"))]
pub use output_provider::{FileProvider, OutputProvider};
pub use output_provider::*;
pub use remote_client::RemoteClient;

pub use crate::error::CasClientError;
Expand Down
25 changes: 18 additions & 7 deletions cas_client/src/local_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use merklehash::MerkleHash;
use progress_tracking::item_tracking::SingleItemProgressUpdater;
use progress_tracking::upload_tracking::CompletionTracker;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
use tokio::runtime::Handle;
use tracing::{debug, error, info, warn};

use crate::Client;
use crate::error::{CasClientError, Result};
use crate::output_provider::OutputProvider;
use crate::{Client, SeekingOutputProvider, SequentialOutput};

pub struct LocalClient {
tmp_dir: Option<TempDir>, // To hold directory to use for local testing
Expand Down Expand Up @@ -232,14 +232,14 @@ impl LocalClient {
}
}

/// LocalClient is responsible for writing/reading Xorbs on local disk.
/// LocalClient is responsible for writing/reading Xorbs on the local disk.
#[async_trait]
impl Client for LocalClient {
async fn get_file(
async fn get_file_with_sequential_writer(
&self,
hash: &MerkleHash,
byte_range: Option<FileRange>,
output_provider: &OutputProvider,
mut output_provider: SequentialOutput,
_progress_updater: Option<Arc<SingleItemProgressUpdater>>,
) -> Result<u64> {
let Some((file_info, _)) = self
Expand All @@ -250,7 +250,6 @@ impl Client for LocalClient {
else {
return Err(CasClientError::FileNotFound(*hash));
};
let mut writer = output_provider.get_writer_at(0)?;

// This is just used for testing, so inefficient is fine.
let mut file_vec = Vec::new();
Expand All @@ -269,11 +268,23 @@ impl Client for LocalClient {
.unwrap_or(file_vec.len())
.min(file_vec.len());

writer.write_all(&file_vec[start..end])?;
output_provider.write_all(&file_vec[start..end]).await?;

Ok((end - start) as u64)
}

async fn get_file_with_parallel_writer(
&self,
hash: &MerkleHash,
byte_range: Option<FileRange>,
output_provider: SeekingOutputProvider,
progress_updater: Option<Arc<SingleItemProgressUpdater>>,
) -> Result<u64> {
let sequential = output_provider.try_into()?;
self.get_file_with_sequential_writer(hash, byte_range, sequential, progress_updater)
.await
}

/// Query the shard server for the file reconstruction info.
/// Returns the FileInfo for reconstructing the file and the shard ID that
/// defines the file info.
Expand Down
Loading