diff --git a/cas_client/src/interface.rs b/cas_client/src/interface.rs index 3250175b..ce7cb95f 100644 --- a/cas_client/src/interface.rs +++ b/cas_client/src/interface.rs @@ -13,6 +13,12 @@ use utils::progress::ProgressUpdater; use crate::error::Result; use crate::CasClientError; +/// Metrics regarding a XORB upload. +#[derive(Debug)] +pub struct UploadMetrics { + pub n_bytes: usize, +} + /// A Client to the CAS (Content Addressed Storage) service to allow storage and /// management of XORBs (Xet Object Remote Block). A XORB represents a collection /// of arbitrary bytes. These bytes are hashed according to a Xet Merkle Hash @@ -35,7 +41,7 @@ pub trait UploadClient { hash: &MerkleHash, data: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result<()>; + ) -> Result; /// Check if a XORB already exists. async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result; diff --git a/cas_client/src/lib.rs b/cas_client/src/lib.rs index 4c2edf29..7a44fa17 100644 --- a/cas_client/src/lib.rs +++ b/cas_client/src/lib.rs @@ -4,7 +4,7 @@ pub use chunk_cache::CacheConfig; pub use http_client::{build_auth_http_client, build_http_client}; use interface::RegistrationClient; -pub use interface::{Client, ReconstructionClient, UploadClient}; +pub use interface::{Client, ReconstructionClient, UploadClient, UploadMetrics}; pub use local_client::{tests_utils, LocalClient}; pub use remote_client::RemoteClient; diff --git a/cas_client/src/local_client.rs b/cas_client/src/local_client.rs index 1aa26928..4a9bc03e 100644 --- a/cas_client/src/local_client.rs +++ b/cas_client/src/local_client.rs @@ -12,6 +12,7 @@ use tracing::{debug, info}; use crate::error::{CasClientError, Result}; use crate::interface::UploadClient; +use crate::UploadMetrics; #[derive(Debug)] pub struct LocalClient { @@ -105,7 +106,7 @@ impl UploadClient for LocalClient { hash: &MerkleHash, data: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result<()> { + ) -> Result { // no empty writes if chunk_and_boundaries.is_empty() || data.is_empty() { return Err(CasClientError::InvalidArguments); @@ -120,7 +121,7 @@ impl UploadClient for LocalClient { if self.exists(prefix, hash).await? { info!("{prefix:?}/{hash:?} already exists in Local CAS; returning."); - return Ok(()); + return Ok(UploadMetrics { n_bytes: data.len() }); } let file_path = self.get_path_for_entry(prefix, hash); @@ -136,8 +137,7 @@ impl UploadClient for LocalClient { CasClientError::InternalError(anyhow!("Unable to create temporary file for staging Xorbs, got {e:?}")) })?; - let total_bytes_written; - { + let bytes_written = { let mut writer = BufWriter::new(&tempfile); let (_, bytes_written) = CasObject::serialize( &mut writer, @@ -148,8 +148,8 @@ impl UploadClient for LocalClient { )?; // flush before persisting writer.flush()?; - total_bytes_written = bytes_written; - } + bytes_written + }; tempfile.persist(&file_path).map_err(|e| e.error)?; @@ -161,9 +161,9 @@ impl UploadClient for LocalClient { let _ = std::fs::set_permissions(&file_path, permissions); } - info!("{file_path:?} successfully written with {total_bytes_written:?} bytes."); + info!("{file_path:?} successfully written with {bytes_written:?} bytes."); - Ok(()) + Ok(UploadMetrics { n_bytes: bytes_written }) } async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result { diff --git a/cas_client/src/remote_client.rs b/cas_client/src/remote_client.rs index ea448fb8..5de93876 100644 --- a/cas_client/src/remote_client.rs +++ b/cas_client/src/remote_client.rs @@ -83,13 +83,13 @@ impl UploadClient for RemoteClient { hash: &MerkleHash, data: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result<()> { + ) -> Result { let key = Key { prefix: prefix.to_string(), hash: *hash, }; - let was_uploaded = self.upload(&key, &data, chunk_and_boundaries).await?; + let (was_uploaded, metrics) = self.upload(&key, &data, chunk_and_boundaries).await?; if !was_uploaded { debug!("{key:?} not inserted into CAS."); @@ -97,7 +97,7 @@ impl UploadClient for RemoteClient { debug!("{key:?} inserted into CAS."); } - Ok(()) + Ok(metrics) } async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result { @@ -245,12 +245,12 @@ impl RemoteClient { key: &Key, contents: &[u8], chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result { + ) -> Result<(bool, UploadMetrics)> { let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?; let mut writer = Cursor::new(Vec::new()); - let (_, _) = CasObject::serialize( + let (_, n_bytes) = CasObject::serialize( &mut writer, &key.hash, contents, @@ -265,7 +265,9 @@ impl RemoteClient { let response = self.http_auth_client.post(url).body(data).send().await?; let response_parsed: UploadXorbResponse = response.json().await?; - Ok(response_parsed.was_inserted) + let metrics = UploadMetrics { n_bytes }; + + Ok((response_parsed.was_inserted, metrics)) } /// use the reconstruction response from CAS to re-create the described file for any calls diff --git a/cas_object/src/cas_object_format.rs b/cas_object/src/cas_object_format.rs index f45bfc06..cce67f48 100644 --- a/cas_object/src/cas_object_format.rs +++ b/cas_object/src/cas_object_format.rs @@ -328,6 +328,7 @@ impl CasObject { /// Serialize into Cas Object from uncompressed data and chunk boundaries. /// Assumes correctness from caller: it's the receiver's responsibility to validate a cas object. + /// Returns a tuple of CasObject and number of bytes in the serialized XORB. pub fn serialize( writer: &mut W, hash: &MerkleHash, diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index 02ab050a..43c55fc7 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -1,13 +1,15 @@ use std::sync::Arc; +use std::time::{Duration, Instant}; use async_trait::async_trait; -use cas_client::Client; +use cas_client::{Client, UploadMetrics}; use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}; use mdb_shard::ShardFileManager; use merkledb::aggregate_hashes::cas_node_hash; use merklehash::MerkleHash; use tokio::sync::{Mutex, Semaphore}; use tokio::task::JoinSet; +use tracing::info; use utils::progress::ProgressUpdater; use utils::ThreadPool; @@ -15,6 +17,8 @@ use crate::data_processing::CASDataAggregator; use crate::errors::DataProcessingError::*; use crate::errors::*; +const DEFAULT_NETWORK_STAT_REPORT_INTERVAL_SEC: u32 = 2; // 2 s + #[async_trait] pub(crate) trait XorbUpload { /// Register a block of data ready for upload and dedup, return the hash of the produced xorb. @@ -23,7 +27,75 @@ pub(crate) trait XorbUpload { async fn flush(&self) -> Result<()>; } -type XorbUploadValueType = (MerkleHash, Vec, Vec<(MerkleHash, usize)>); +struct NetworkStatCheckPoint { + n_bytes: u64, + start: Instant, +} + +impl Default for NetworkStatCheckPoint { + fn default() -> Self { + Self { + n_bytes: 0, + start: Instant::now(), + } + } +} + +struct NetworkStat { + accumulated: NetworkStatCheckPoint, + last_check_point: NetworkStatCheckPoint, + report_interval: Duration, +} + +impl NetworkStat { + fn new(report_interval_sec: u32) -> Self { + Self { + accumulated: Default::default(), + last_check_point: Default::default(), + report_interval: Duration::from_secs(report_interval_sec.into()), + } + } + + fn update_and_report(&mut self, metrics: &UploadMetrics, what: &str) { + self.accumulated.n_bytes += metrics.n_bytes as u64; + let now = Instant::now(); + if now.duration_since(self.last_check_point.start) >= self.report_interval { + Self::report_rate(&format!("{what} accumulated"), self.accumulated.n_bytes, self.accumulated.start, now); + Self::report_rate( + &format!("{what} instantaneous"), + self.accumulated.n_bytes - self.last_check_point.n_bytes, + self.last_check_point.start, + now, + ); + self.last_check_point = NetworkStatCheckPoint { + n_bytes: self.accumulated.n_bytes, + start: Instant::now(), + }; + } + } + + fn report_rate(what: &str, n_bytes: u64, start: Instant, end: Instant) { + const RATE_UNIT: [(f64, &str); 3] = [(1e9, "Gbps"), (1e6, "Mbps"), (1e3, "Kbps")]; + + let duration = end.duration_since(start); + + if n_bytes == 0 { + info!("{what} rate: 0 bps"); + } + + let bps = n_bytes as f64 * 8. / duration.as_secs_f64(); + + for (base, unit) in RATE_UNIT { + let curr = bps / base; + if curr > 1. { + info!("{what} rate: {curr:.2} {unit}"); + return; + } + } + + info!("{what} rate: {bps:.2} bps"); + } +} /// Helper to parallelize xorb upload and registration. /// Calls to registering xorbs return immediately after computing a xorb hash so callers @@ -40,7 +112,7 @@ pub(crate) struct ParallelXorbUploader { cas: Arc, // Internal worker - upload_tasks: Mutex>>, + upload_tasks: Mutex>>, // Rate limiter rate_limiter: Arc, @@ -48,6 +120,9 @@ pub(crate) struct ParallelXorbUploader { // Theadpool threadpool: Arc, + // Network metrics + egress_stat: Mutex, + // Upload Progress upload_progress_updater: Option>, } @@ -68,6 +143,7 @@ impl ParallelXorbUploader { upload_tasks: Mutex::new(JoinSet::new()), rate_limiter, threadpool, + egress_stat: Mutex::new(NetworkStat::new(DEFAULT_NETWORK_STAT_REPORT_INTERVAL_SEC)), // report every 2 s upload_progress_updater, }) } @@ -75,7 +151,9 @@ impl ParallelXorbUploader { async fn status_is_ok(&self) -> Result<()> { let mut upload_tasks = self.upload_tasks.lock().await; while let Some(result) = upload_tasks.try_join_next() { - result??; + let metrics = result??; + let mut egress_rate = self.egress_stat.lock().await; + egress_rate.update_and_report(&metrics, "Xorb upload"); } Ok(()) @@ -139,24 +217,28 @@ impl XorbUpload for ParallelXorbUploader { let mut upload_tasks = self.upload_tasks.lock().await; while let Some(result) = upload_tasks.join_next().await { - result??; + let metrics = result??; + let mut egress_rate = self.egress_stat.lock().await; + egress_rate.update_and_report(&metrics, "Xorb upload"); } Ok(()) } } +type XorbUploadValueType = (MerkleHash, Vec, Vec<(MerkleHash, usize)>); + async fn upload_and_register_xorb( item: XorbUploadValueType, shard_manager: Arc, cas: Arc, cas_prefix: String, -) -> Result<()> { +) -> Result { let (cas_hash, data, chunks) = item; let raw_bytes_len = data.len(); // upload xorb - { + let metrics = { let mut pos = 0; let chunk_and_boundaries = chunks .iter() @@ -165,8 +247,8 @@ async fn upload_and_register_xorb( (*hash, pos as u32) }) .collect(); - cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await?; - } + cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await? + }; // register for dedup // This should happen after uploading xorb above succeeded so not to @@ -188,5 +270,5 @@ async fn upload_and_register_xorb( shard_manager.add_cas_block(cas_info).await?; } - Ok(()) + Ok(metrics) } diff --git a/data/src/test_utils/local_test_client.rs b/data/src/test_utils/local_test_client.rs index a1d6aaba..b88c1283 100644 --- a/data/src/test_utils/local_test_client.rs +++ b/data/src/test_utils/local_test_client.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; use cas_client::tests_utils::*; -use cas_client::{CasClientError, Client, LocalClient, ReconstructionClient, UploadClient}; +use cas_client::{CasClientError, Client, LocalClient, ReconstructionClient, UploadClient, UploadMetrics}; use cas_types::FileRange; use mdb_shard::shard_file_reconstructor::FileReconstructor; use mdb_shard::ShardFileManager; @@ -41,7 +41,7 @@ impl UploadClient for LocalTestClient { hash: &MerkleHash, data: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result<(), CasClientError> { + ) -> Result { self.cas.put(prefix, hash, data, chunk_and_boundaries).await }