Skip to content

Commit 0f05b62

Browse files
author
Ajit Banerjee
committed
Collect telemetry for upload path
Get the telemetry data to the py layer. This will be sent over the telemetry endpoint later. Related to STO-7
1 parent 71c6b62 commit 0f05b62

File tree

7 files changed

+96
-17
lines changed

7 files changed

+96
-17
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ debug/
1717
.vscode
1818
venv
1919
**/*.env
20+
**/uv.lock

data/src/clean.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::mem::take;
2525
use std::ops::DerefMut;
2626
use std::path::{Path, PathBuf};
2727
use std::sync::Arc;
28+
use std::time::Instant;
2829
use tokio::sync::mpsc::error::TryRecvError;
2930
use tokio::sync::mpsc::{channel, Receiver, Sender};
3031
use tokio::sync::Mutex;
@@ -80,6 +81,9 @@ pub struct Cleaner {
8081

8182
// Auxiliary info
8283
file_name: Option<PathBuf>,
84+
85+
// Telemetry
86+
start: Instant,
8387
}
8488

8589
impl Cleaner {
@@ -117,6 +121,7 @@ impl Cleaner {
117121
tracking_info: Mutex::new(Default::default()),
118122
small_file_buffer: Mutex::new(Some(Vec::with_capacity(small_file_threshold))),
119123
file_name: file_name.map(|f| f.to_owned()),
124+
start: Instant::now(),
120125
});
121126

122127
Self::run(cleaner.clone(), chunk_c).await;
@@ -242,8 +247,9 @@ impl Cleaner {
242247
Ok(false)
243248
}
244249

245-
async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<()> {
250+
async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<u64> {
246251
info!("Dedup {} chunks", chunks.len());
252+
let mut total_compressed_bytes = 0;
247253
let mut tracking_info = self.tracking_info.lock().await;
248254

249255
let enable_global_dedup = self.enable_global_dedup_queries;
@@ -472,13 +478,14 @@ impl Cleaner {
472478
tracking_info.cas_data.data.extend(bytes);
473479

474480
if tracking_info.cas_data.data.len() > TARGET_CAS_BLOCK_SIZE {
475-
let cas_hash = register_new_cas_block(
481+
let (cas_hash, compressed_bytes) = register_new_cas_block(
476482
&mut tracking_info.cas_data,
477483
&self.shard_manager,
478484
&self.cas,
479485
&self.cas_prefix,
480486
)
481487
.await?;
488+
total_compressed_bytes += compressed_bytes;
482489

483490
for i in take(&mut tracking_info.current_cas_file_info_indices) {
484491
tracking_info.file_info[i].cas_hash = cas_hash;
@@ -492,7 +499,7 @@ impl Cleaner {
492499
}
493500
}
494501

495-
Ok(())
502+
Ok(total_compressed_bytes)
496503
}
497504

498505
async fn finish(&self) -> Result<()> {
@@ -525,7 +532,8 @@ impl Cleaner {
525532
Ok(())
526533
}
527534

528-
async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64)> {
535+
async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64, u64)> {
536+
let mut total_compressed_bytes = 0;
529537
let mut tracking_info = self.tracking_info.lock().await;
530538

531539
let file_hash = file_node_hash(
@@ -585,13 +593,14 @@ impl Cleaner {
585593
if cas_data_accumulator.data.len() >= TARGET_CAS_BLOCK_SIZE {
586594
let mut new_cas_data = take(cas_data_accumulator.deref_mut());
587595
drop(cas_data_accumulator); // Release the lock.
588-
register_new_cas_block(
596+
let (_cas_hash, compressed_bytes) = register_new_cas_block(
589597
&mut new_cas_data,
590598
&self.shard_manager,
591599
&self.cas,
592600
&self.cas_prefix,
593601
)
594602
.await?;
603+
total_compressed_bytes += compressed_bytes;
595604
} else {
596605
drop(cas_data_accumulator);
597606
}
@@ -601,11 +610,11 @@ impl Cleaner {
601610

602611
*tracking_info = Default::default();
603612

604-
Ok((file_hash, file_size))
613+
Ok((file_hash, file_size, total_compressed_bytes))
605614
}
606615

607616
async fn to_pointer_file(&self) -> Result<String> {
608-
let (hash, filesize) = self.summarize_dedup_info().await?;
617+
let (hash, filesize, compressed_size) = self.summarize_dedup_info().await?;
609618
let pointer_file = PointerFile::init_from_info(
610619
&self
611620
.file_name
@@ -614,6 +623,8 @@ impl Cleaner {
614623
.unwrap_or_default(),
615624
&hash.hex(),
616625
filesize,
626+
compressed_size,
627+
self.start.elapsed(),
617628
);
618629
Ok(pointer_file.to_string())
619630
}

data/src/data_processing.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::mem::take;
1919
use std::ops::DerefMut;
2020
use std::path::Path;
2121
use std::sync::Arc;
22+
use std::time::Instant;
2223
use tokio::sync::Mutex;
2324
use tracing::error;
2425

@@ -59,6 +60,9 @@ pub struct PointerFileTranslator {
5960

6061
/* ----- Deduped data shared across files ----- */
6162
global_cas_data: Arc<Mutex<CASDataAggregator>>,
63+
// Telemetry
64+
/* ----- Telemetry ----- */
65+
pub start: Instant,
6266
}
6367

6468
// Constructors
@@ -93,6 +97,7 @@ impl PointerFileTranslator {
9397
remote_shards,
9498
cas: cas_client,
9599
global_cas_data: Default::default(),
100+
start: Instant::now(),
96101
})
97102
}
98103
}
@@ -207,7 +212,7 @@ pub(crate) async fn register_new_cas_block(
207212
shard_manager: &Arc<ShardFileManager>,
208213
cas: &Arc<dyn Staging + Send + Sync>,
209214
cas_prefix: &str,
210-
) -> Result<MerkleHash> {
215+
) -> Result<(MerkleHash, u64)> {
211216
let cas_hash = cas_node_hash(&cas_data.chunks[..]);
212217

213218
let raw_bytes_len = cas_data.data.len();
@@ -282,7 +287,7 @@ pub(crate) async fn register_new_cas_block(
282287
cas_data.chunks.clear();
283288
cas_data.pending_file_info.clear();
284289

285-
Ok(cas_hash)
290+
Ok((cas_hash, compressed_bytes_len as u64))
286291
}
287292

288293
/// Smudge operations

data/src/pointer_file.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::constants::POINTER_FILE_LIMIT;
33
use merklehash::{DataHashHexParseError, MerkleHash};
44
use static_assertions::const_assert;
55
use std::{collections::BTreeMap, fs, path::Path};
6+
use std::time::Duration;
67
use toml::Value;
78
use tracing::{debug, error, warn};
89

@@ -33,6 +34,12 @@ pub struct PointerFile {
3334

3435
/// The size of the file pointed to by this pointer file
3536
filesize: u64,
37+
38+
/// The addition to CAS bytes due to this file
39+
compressed_size: Option<u64>,
40+
41+
/// The duration for cleaning or smudging this file
42+
pub latency: Option<Duration>,
3643
}
3744

3845
impl PointerFile {
@@ -59,6 +66,8 @@ impl PointerFile {
5966
is_valid,
6067
hash,
6168
filesize,
69+
compressed_size: None,
70+
latency: None,
6271
};
6372
}
6473

@@ -73,6 +82,8 @@ impl PointerFile {
7382
is_valid,
7483
hash,
7584
filesize,
85+
compressed_size: None,
86+
latency: None,
7687
};
7788
}
7889

@@ -117,6 +128,8 @@ impl PointerFile {
117128
is_valid,
118129
hash,
119130
filesize,
131+
compressed_size: None,
132+
latency: None,
120133
}
121134
}
122135

@@ -134,6 +147,8 @@ impl PointerFile {
134147
is_valid: false,
135148
hash: empty_string,
136149
filesize: 0,
150+
compressed_size: None,
151+
latency: None,
137152
};
138153

139154
let Ok(file_meta) = fs::metadata(path).map_err(|e| {
@@ -156,13 +171,15 @@ impl PointerFile {
156171
PointerFile::init_from_string(&contents, path)
157172
}
158173

159-
pub fn init_from_info(path: &str, hash: &str, filesize: u64) -> Self {
174+
pub fn init_from_info(path: &str, hash: &str, filesize: u64, compressed_size: u64, latency: Duration) -> Self {
160175
Self {
161176
version_string: CURRENT_VERSION.to_string(),
162177
path: path.to_string(),
163178
is_valid: true,
164179
hash: hash.to_string(),
165180
filesize,
181+
compressed_size: Some(compressed_size),
182+
latency: Some(latency),
166183
}
167184
}
168185

@@ -194,6 +211,10 @@ impl PointerFile {
194211
pub fn filesize(&self) -> u64 {
195212
self.filesize
196213
}
214+
215+
pub fn compressed_size(&self) -> u64 {self.compressed_size.unwrap_or(0)}
216+
217+
pub fn latency(&self) -> f64 {self.latency.map(|dur| dur.as_secs_f64()).unwrap_or(0f64)}
197218
}
198219

199220
pub fn is_xet_pointer_file(data: &[u8]) -> bool {
@@ -322,4 +343,6 @@ mod tests {
322343
let test = PointerFile::init_from_string(&test_contents, &empty_string);
323344
assert!(!test.is_valid()); // new version is not valid
324345
}
346+
347+
// todo add init_from_info test
325348
}

hf_xet/pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ classifiers = [
1111
"Programming Language :: Python :: Implementation :: PyPy",
1212
]
1313
dynamic = ["version"]
14+
dependencies = [
15+
"maturin>=1.7.4",
16+
]
1417
[project.optional-dependencies]
1518
tests = [
1619
"pytest",

hf_xet/src/data_client.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ pub async fn download_async(
5555
endpoint: Option<String>,
5656
token_info: Option<(String, u64)>,
5757
token_refresher: Option<Arc<dyn TokenRefresher>>,
58-
) -> errors::Result<Vec<String>> {
58+
) -> errors::Result<Vec<PointerFile>> {
5959
let config = default_config(
6060
endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()),
6161
token_info,
6262
token_refresher,
6363
)?;
6464
let processor = Arc::new(PointerFileTranslator::new(config).await?);
6565
let processor = &processor;
66-
let paths = tokio_par_for_each(
66+
let pfs = tokio_par_for_each(
6767
pointer_files,
6868
MAX_CONCURRENT_DOWNLOADS,
6969
|pointer_file, _| async move {
@@ -77,7 +77,7 @@ pub async fn download_async(
7777
ParallelError::TaskError(e) => e,
7878
})?;
7979

80-
Ok(paths)
80+
Ok(pfs)
8181
}
8282

8383
async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Result<PointerFile> {
@@ -95,7 +95,6 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res
9595

9696
handle.add_bytes(read_buf[0..bytes].to_vec()).await?;
9797
}
98-
9998
let pf_str = handle.result().await?;
10099
let pf = PointerFile::init_from_string(&pf_str, path.to_str().unwrap());
101100
Ok(pf)
@@ -104,15 +103,17 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res
104103
async fn smudge_file(
105104
proc: &PointerFileTranslator,
106105
pointer_file: &PointerFile,
107-
) -> errors::Result<String> {
106+
) -> errors::Result<PointerFile> {
108107
let path = PathBuf::from(pointer_file.path());
109108
if let Some(parent_dir) = path.parent() {
110109
fs::create_dir_all(parent_dir)?;
111110
}
112111
let mut f = File::create(&path)?;
113112
proc.smudge_file_from_pointer(&pointer_file, &mut f, None)
114113
.await?;
115-
Ok(pointer_file.path().to_string())
114+
let mut pointer_file_clone = pointer_file.clone();
115+
pointer_file_clone.latency = Some(proc.start.elapsed());
116+
Ok(pointer_file_clone)
116117
}
117118

118119
#[cfg(test)]

hf_xet/src/lib.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,32 @@ use pyo3::prelude::*;
1010
use pyo3::pyfunction;
1111
use std::fmt::Debug;
1212
use std::sync::Arc;
13+
use std::time::Duration;
1314
use token_refresh::WrappedTokenRefresher;
1415

16+
// This will be the information that will finally be sent to the telemetry endpoint
17+
//
18+
// #[pyclass]
19+
// #[derive(Clone, Debug)]
20+
// pub struct UploadTelemetry {
21+
// #[pyo3(get)]
22+
// total_time_ms: u64,
23+
// #[pyo3(get)]
24+
// uploaded_bytes: u64,
25+
// }
26+
//
27+
// #[pyclass]
28+
// #[derive(Clone, Debug)]
29+
// pub struct DownloadTelemetry {
30+
// #[pyo3(get)]
31+
// total_time_ms: u64,
32+
// #[pyo3(get)]
33+
// downloaded_bytes: u64,
34+
// #[pyo3(get)]
35+
// cached_bytes: u32,
36+
// }
37+
38+
1539
#[pyfunction]
1640
#[pyo3(signature = (file_paths, endpoint, token_info, token_refresher), text_signature = "(file_paths: List[str], endpoint: Optional[str], token_info: Optional[(str, int)], token_refresher: Optional[Callable[[], (str, int)]]) -> List[PyPointerFile]")]
1741
pub fn upload_files(
@@ -63,8 +87,10 @@ pub fn download_files(
6387
.block_on(async move {
6488
data_client::download_async(pfs, endpoint, token_info, refresher).await
6589
})
90+
.map(|pfs| pfs.into_iter().map(|pointer_file| pointer_file.path().to_string()).collect())
6691
.map_err(|e| PyException::new_err(format!("{e:?}")))
6792
})
93+
6894
}
6995

7096
// helper to convert the implemented WrappedTokenRefresher into an Arc<dyn TokenRefresher>
@@ -82,21 +108,28 @@ pub struct PyPointerFile {
82108
hash: String,
83109
#[pyo3(get)]
84110
filesize: u64,
111+
#[pyo3(get)]
112+
compressed_size: u64,
113+
#[pyo3(get)]
114+
latency: f64,
85115
}
86116

117+
87118
impl From<PointerFile> for PyPointerFile {
88119
fn from(pf: PointerFile) -> Self {
89120
Self {
90121
path: pf.path().to_string(),
91122
hash: pf.hash_string().to_string(),
92123
filesize: pf.filesize(),
124+
compressed_size: pf.compressed_size(),
125+
latency: pf.latency(),
93126
}
94127
}
95128
}
96129

97130
impl From<PyPointerFile> for PointerFile {
98131
fn from(pf: PyPointerFile) -> Self {
99-
PointerFile::init_from_info(&pf.path, &pf.hash, pf.filesize)
132+
PointerFile::init_from_info(&pf.path, &pf.hash, pf.filesize, pf.compressed_size, Duration::from_secs_f64(pf.latency))
100133
}
101134
}
102135

@@ -108,6 +141,8 @@ impl PyPointerFile {
108141
path,
109142
hash,
110143
filesize,
144+
compressed_size: 0,
145+
latency: 0.0,
111146
}
112147
}
113148

0 commit comments

Comments
 (0)