diff --git a/chunk_cache/Cargo.toml b/chunk_cache/Cargo.toml index 77021375..cb7ac172 100644 --- a/chunk_cache/Cargo.toml +++ b/chunk_cache/Cargo.toml @@ -32,3 +32,4 @@ required-features = ["analysis"] [features] analysis = ["dep:clap"] +no-default-cache = [] diff --git a/chunk_cache/src/disk.rs b/chunk_cache/src/disk.rs index 3f3de4d9..d9cbb136 100644 --- a/chunk_cache/src/disk.rs +++ b/chunk_cache/src/disk.rs @@ -28,8 +28,10 @@ pub mod test_utils; // consistently use URL_SAFE (also file path safe) base64 codec pub(crate) const BASE64_ENGINE: GeneralPurpose = URL_SAFE; +#[cfg(not(feature = "no-default-cache"))] pub const DEFAULT_CHUNK_CACHE_CAPACITY: u64 = 10_000_000_000; // 10 GB -const MAX_CACHE_FILE_SIZE: u64 = 10_000_000_000; // 10 GB - max size for a single cache file +#[cfg(feature = "no-default-cache")] +pub const DEFAULT_CHUNK_CACHE_CAPACITY: u64 = 0; const PREFIX_DIR_NAME_LEN: usize = 2; type OptionResult = Result, E>; @@ -688,10 +690,10 @@ fn try_parse_cache_file(file_result: io::Result, capacity: u64) -> Opt if !md.is_file() { return Ok(None); } - if md.len() > MAX_CACHE_FILE_SIZE { + if md.len() > DEFAULT_CHUNK_CACHE_CAPACITY { return Err(ChunkCacheError::general(format!( "Cache directory contains a file larger than {} GB, cache directory state is invalid", - (MAX_CACHE_FILE_SIZE as f64 / (1 << 30) as f64) + (DEFAULT_CHUNK_CACHE_CAPACITY as f64 / (1 << 30) as f64) ))); } @@ -821,9 +823,10 @@ mod tests { use tempdir::TempDir; use utils::output_bytes; + use super::{DEFAULT_CHUNK_CACHE_CAPACITY, DiskCache}; use crate::disk::test_utils::*; use crate::disk::try_parse_key; - use crate::{CacheConfig, ChunkCache, DEFAULT_CHUNK_CACHE_CAPACITY, DiskCache}; + use crate::{CacheConfig, ChunkCache}; const RANDOM_SEED: u64 = 9089 << 20 | 120043; @@ -1259,7 +1262,9 @@ mod tests { mod concurrency_tests { use tempdir::TempDir; - use crate::{CacheConfig, ChunkCache, DEFAULT_CHUNK_CACHE_CAPACITY, DiskCache, RANGE_LEN, RandomEntryIterator}; + use super::DiskCache; + use crate::disk::DEFAULT_CHUNK_CACHE_CAPACITY; + use crate::{CacheConfig, ChunkCache, RANGE_LEN, RandomEntryIterator}; const NUM_ITEMS_PER_TASK: usize = 20; const RANDOM_SEED: u64 = 878987298749287; diff --git a/data/src/configurations.rs b/data/src/configurations.rs index fb8d4324..5287e0e6 100644 --- a/data/src/configurations.rs +++ b/data/src/configurations.rs @@ -123,19 +123,6 @@ impl TranslatorConfig { } } - pub fn with_cache_size(self, cache_size: u64) -> Self { - Self { - data_config: DataConfig { - cache_config: CacheConfig { - cache_size, - ..self.data_config.cache_config - }, - ..self.data_config - }, - ..self - } - } - pub fn with_session_id(self, session_id: &str) -> Self { if session_id.is_empty() { return self; diff --git a/data/src/data_client.rs b/data/src/data_client.rs index 64a3e48a..cfe133ce 100644 --- a/data/src/data_client.rs +++ b/data/src/data_client.rs @@ -106,19 +106,16 @@ pub async fn upload_bytes_async( token_info: Option<(String, u64)>, token_refresher: Option>, progress_updater: Option>, - cache_size: Option, user_agent: String, ) -> errors::Result> { - let mut config = default_config( + let config = default_config( endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.clone()), None, token_info, token_refresher, user_agent, )?; - if let Some(size) = cache_size { - config = config.with_cache_size(size); - } + Span::current().record("session_id", &config.session_id); let semaphore = XetRuntime::current().global_semaphore(*CONCURRENT_FILE_INGESTION_LIMITER); @@ -152,23 +149,19 @@ pub async fn upload_async( token_info: Option<(String, u64)>, token_refresher: Option>, progress_updater: Option>, - cache_size: Option, user_agent: String, ) -> errors::Result> { // chunk files // produce Xorbs + Shards // upload shards and xorbs // for each file, return the filehash - let mut config = default_config( + let config = default_config( endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.clone()), None, token_info, token_refresher, user_agent, )?; - if let Some(size) = cache_size { - config = config.with_cache_size(size); - } let span = Span::current(); @@ -199,7 +192,6 @@ pub async fn download_async( token_info: Option<(String, u64)>, token_refresher: Option>, progress_updaters: Option>>, - cache_size: Option, user_agent: String, ) -> errors::Result> { lazy_static! { @@ -212,16 +204,13 @@ pub async fn download_async( { return Err(DataProcessingError::ParameterError("updaters are not same length as pointer_files".to_string())); } - let mut config = default_config( + let config = default_config( endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), None, token_info, token_refresher, user_agent, )?; - if let Some(size) = cache_size { - config = config.with_cache_size(size); - } Span::current().record("session_id", &config.session_id); let processor = Arc::new(FileDownloader::new(config.into()).await?); diff --git a/hf_xet/Cargo.lock b/hf_xet/Cargo.lock index b89ced9a..6848d429 100644 --- a/hf_xet/Cargo.lock +++ b/hf_xet/Cargo.lock @@ -1296,6 +1296,7 @@ dependencies = [ "bipbuffer", "cas_client", "chrono", + "chunk_cache", "ctrlc", "data", "error_printer", diff --git a/hf_xet/Cargo.toml b/hf_xet/Cargo.toml index 45c3559a..daccb80e 100644 --- a/hf_xet/Cargo.toml +++ b/hf_xet/Cargo.toml @@ -11,6 +11,7 @@ crate-type = ["cdylib"] [dependencies] cas_client = { path = "../cas_client" } +chunk_cache = { path = "../chunk_cache" } data = { path = "../data" } error_printer = { path = "../error_printer" } progress_tracking = { path = "../progress_tracking" } @@ -47,8 +48,10 @@ signal-hook = "0.3" ctrlc = "3.4" [features] +default = ["no-default-cache"] # By default, hf_xet disables the disk cache. native-tls = ["cas_client/native-tls-vendored"] native-tls-vendored = ["cas_client/native-tls-vendored"] +no-default-cache = ["chunk_cache/no-default-cache"] profiling = ["pprof"] tokio-console = ["xet_logging/tokio-console"] diff --git a/hf_xet/src/lib.rs b/hf_xet/src/lib.rs index 52fc9763..0cd68437 100644 --- a/hf_xet/src/lib.rs +++ b/hf_xet/src/lib.rs @@ -65,7 +65,6 @@ pub fn upload_bytes( token_info, refresher.map(|v| v as Arc<_>), updater.map(|v| v as Arc<_>), - Some(0), // Disable DiskCache for hf_xet USER_AGENT.to_string(), ) .await @@ -112,7 +111,6 @@ pub fn upload_files( token_info, refresher.map(|v| v as Arc<_>), updater.map(|v| v as Arc<_>), - Some(0), // Disable DiskCache for hf_xet USER_AGENT.to_string(), ) .await @@ -157,7 +155,6 @@ pub fn download_files( token_info, refresher.map(|v| v as Arc<_>), updaters, - Some(0), // Disable DiskCache for hf_xet USER_AGENT.to_string(), ) .await