Skip to content

Commit 037d267

Browse files
rajataryaHoyt Koepkeassafvayner
authored
CasObject v2 (#16)
CasObject v2 includes: * Major changes to metadata / info kept with each Xorb * Header -> Footer (now called CasObjectInfo) * Chunks include efficient 8-byte header, can be compressed * More unit-tests * Lots of documentation --------- Co-authored-by: Hoyt Koepke <[email protected]> Co-authored-by: Assaf Vayner <[email protected]> Co-authored-by: Assaf Vayner <[email protected]>
1 parent 2020051 commit 037d267

File tree

9 files changed

+904
-291
lines changed

9 files changed

+904
-291
lines changed

Cargo.lock

Lines changed: 24 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cas_client/src/local_client.rs

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
use std::fs::{metadata, File};
2-
use std::path::{Path, PathBuf};
3-
use std::io::{BufReader, BufWriter, Write};
1+
use crate::error::{CasClientError, Result};
2+
use crate::interface::Client;
43
use cas::key::Key;
5-
use cas_object::cas_object_format::CasObject;
4+
use cas_object::CasObject;
65
use merkledb::prelude::*;
76
use merkledb::{Chunk, MerkleMemDB};
8-
use tempfile::TempDir;
97
use merklehash::MerkleHash;
10-
use crate::interface::Client;
11-
use crate::error::{CasClientError, Result};
8+
use std::fs::{metadata, File};
9+
use std::io::{BufReader, BufWriter, Write};
10+
use std::path::{Path, PathBuf};
11+
use tempfile::TempDir;
1212

1313
use anyhow::anyhow;
14-
use tracing::{debug, error, info};
1514
use async_trait::async_trait;
15+
use tracing::{debug, error, info};
1616

1717
#[derive(Debug)]
1818
pub struct LocalClient {
@@ -130,7 +130,9 @@ impl LocalClient {
130130

131131
fn validate_root_hash(data: &[u8], chunk_boundaries: &[u64], hash: &MerkleHash) -> bool {
132132
// at least 1 chunk, and last entry in chunk boundary must match the length
133-
if chunk_boundaries.is_empty() || chunk_boundaries[chunk_boundaries.len() - 1] as usize != data.len() {
133+
if chunk_boundaries.is_empty()
134+
|| chunk_boundaries[chunk_boundaries.len() - 1] as usize != data.len()
135+
{
134136
return false;
135137
}
136138

@@ -150,7 +152,6 @@ impl LocalClient {
150152
let ret = db.finalize(staging);
151153
*ret.hash() == *hash
152154
}
153-
154155
}
155156

156157
/// LocalClient is responsible for writing/reading Xorbs on local disk.
@@ -167,7 +168,7 @@ impl Client for LocalClient {
167168
if chunk_boundaries.is_empty() || data.is_empty() {
168169
return Err(CasClientError::InvalidArguments);
169170
}
170-
171+
171172
// last boundary must be end of data
172173
if !chunk_boundaries.is_empty()
173174
&& chunk_boundaries[chunk_boundaries.len() - 1] as usize != data.len()
@@ -189,7 +190,7 @@ impl Client for LocalClient {
189190

190191
let file_path = self.get_path_for_entry(prefix, hash);
191192
info!("Writing XORB {prefix}/{hash:?} to local path {file_path:?}");
192-
193+
193194
if let Ok(metadata) = metadata(&file_path) {
194195
return if metadata.is_file() {
195196
info!("{file_path:?} already exists; returning.");
@@ -217,31 +218,31 @@ impl Client for LocalClient {
217218
))
218219
})?;
219220

220-
let total_bytes_written;
221-
{
222-
let mut writer = BufWriter::new(&tempfile);
223-
let (_, bytes_written) = CasObject::serialize(
224-
&mut writer,
225-
hash,
226-
&data,
227-
&chunk_boundaries.into_iter().map(|x| x as u32).collect()
228-
)?;
229-
// flush before persisting
230-
writer.flush()?;
231-
total_bytes_written = bytes_written;
232-
}
233-
234-
tempfile.persist(&file_path)?;
235-
236-
// attempt to set to readonly
237-
// its ok to fail.
238-
if let Ok(metadata) = std::fs::metadata(&file_path) {
239-
let mut permissions = metadata.permissions();
240-
permissions.set_readonly(true);
241-
let _ = std::fs::set_permissions(&file_path, permissions);
242-
}
243-
244-
info!("{file_path:?} successfully written with {total_bytes_written:?} bytes.");
221+
let total_bytes_written;
222+
{
223+
let mut writer = BufWriter::new(&tempfile);
224+
let (_, bytes_written) = CasObject::serialize(
225+
&mut writer,
226+
hash,
227+
&data,
228+
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
229+
)?;
230+
// flush before persisting
231+
writer.flush()?;
232+
total_bytes_written = bytes_written;
233+
}
234+
235+
tempfile.persist(&file_path)?;
236+
237+
// attempt to set to readonly
238+
// its ok to fail.
239+
if let Ok(metadata) = std::fs::metadata(&file_path) {
240+
let mut permissions = metadata.permissions();
241+
permissions.set_readonly(true);
242+
let _ = std::fs::set_permissions(&file_path, permissions);
243+
}
244+
245+
info!("{file_path:?} successfully written with {total_bytes_written:?} bytes.");
245246

246247
Ok(())
247248
}
@@ -271,7 +272,6 @@ impl Client for LocalClient {
271272
hash: &MerkleHash,
272273
ranges: Vec<(u64, u64)>,
273274
) -> Result<Vec<Vec<u8>>> {
274-
275275
// Handle the case where we aren't asked for any real data.
276276
if ranges.len() == 1 && ranges[0].0 == ranges[0].1 {
277277
return Ok(vec![Vec::<u8>::new()]);
@@ -302,7 +302,8 @@ impl Client for LocalClient {
302302
Ok(file) => {
303303
let mut reader = BufReader::new(file);
304304
let cas = CasObject::deserialize(&mut reader)?;
305-
Ok(cas.header.total_uncompressed_length as u64)
305+
let length = cas.get_contents_length()?;
306+
Ok(length as u64)
306307
}
307308
Err(_) => Err(CasClientError::XORBNotFound(*hash)),
308309
}
@@ -323,11 +324,14 @@ mod tests {
323324
let data = gen_random_bytes(2048);
324325
let hash = compute_data_hash(&data[..]);
325326
let chunk_boundaries = vec![data.len() as u64];
326-
327+
327328
let data_again = data.clone();
328329

329330
// Act & Assert
330-
assert!(client.put("key", &hash, data, chunk_boundaries).await.is_ok());
331+
assert!(client
332+
.put("key", &hash, data, chunk_boundaries)
333+
.await
334+
.is_ok());
331335

332336
let returned_data = client.get("key", &hash).await.unwrap();
333337
assert_eq!(data_again, returned_data);
@@ -357,12 +361,15 @@ mod tests {
357361
// Act & Assert
358362
assert!(client.put("", &hash, data, chunk_boundaries).await.is_ok());
359363

360-
let ranges: Vec<(u64, u64)> = vec![(0, 100),(100, 1500)];
364+
let ranges: Vec<(u64, u64)> = vec![(0, 100), (100, 1500)];
361365
let ranges_again = ranges.clone();
362366
let returned_ranges = client.get_object_range("", &hash, ranges).await.unwrap();
363367

364368
for idx in 0..returned_ranges.len() {
365-
assert_eq!(data_again[ranges_again[idx].0 as usize .. ranges_again[idx].1 as usize], returned_ranges[idx]);
369+
assert_eq!(
370+
data_again[ranges_again[idx].0 as usize..ranges_again[idx].1 as usize],
371+
returned_ranges[idx]
372+
);
366373
}
367374
}
368375

@@ -376,7 +383,7 @@ mod tests {
376383
// Act
377384
client.put("", &hash, data, chunk_boundaries).await.unwrap();
378385
let len = client.get_length("", &hash).await.unwrap();
379-
386+
380387
// Assert
381388
assert_eq!(len as usize, gen_length);
382389
}
@@ -399,7 +406,10 @@ mod tests {
399406

400407
let hello_hash = merklehash::compute_data_hash(&hello[..]);
401408
// write "hello world"
402-
client.put("key", &hello_hash, hello.clone(), vec![hello.len() as u64]).await.unwrap();
409+
client
410+
.put("key", &hello_hash, hello.clone(), vec![hello.len() as u64])
411+
.await
412+
.unwrap();
403413

404414
// put the same value a second time. This should be ok.
405415
client
@@ -505,7 +515,7 @@ mod tests {
505515
client.get("key", &hello_hash).await.unwrap_err()
506516
);
507517
}
508-
518+
509519
#[tokio::test]
510520
async fn test_hashing() {
511521
let client = LocalClient::default();
@@ -532,12 +542,15 @@ mod tests {
532542
.unwrap();
533543
}
534544

535-
fn gen_dummy_xorb(num_chunks: u32, uncompressed_chunk_size: u32, randomize_chunk_sizes: bool) -> (DataHash, Vec<u8>, Vec<u64>) {
545+
fn gen_dummy_xorb(
546+
num_chunks: u32,
547+
uncompressed_chunk_size: u32,
548+
randomize_chunk_sizes: bool,
549+
) -> (DataHash, Vec<u8>, Vec<u64>) {
536550
let mut contents = Vec::new();
537551
let mut chunks: Vec<Chunk> = Vec::new();
538552
let mut chunk_boundaries = Vec::with_capacity(num_chunks as usize);
539553
for _idx in 0..num_chunks {
540-
541554
let chunk_size: u32 = if randomize_chunk_sizes {
542555
let mut rng = rand::thread_rng();
543556
rng.gen_range(1024..=uncompressed_chunk_size)
@@ -547,7 +560,10 @@ mod tests {
547560

548561
let bytes = gen_random_bytes(chunk_size);
549562

550-
chunks.push(Chunk { hash: merklehash::compute_data_hash(&bytes), length: bytes.len() });
563+
chunks.push(Chunk {
564+
hash: merklehash::compute_data_hash(&bytes),
565+
length: bytes.len(),
566+
});
551567

552568
contents.extend(bytes);
553569
chunk_boundaries.push(contents.len() as u64);
@@ -566,7 +582,6 @@ mod tests {
566582
let mut rng = rand::thread_rng();
567583
let mut data = vec![0u8; uncompressed_chunk_size as usize];
568584
rng.fill(&mut data[..]);
569-
data
585+
data
570586
}
571-
572-
}
587+
}

cas_client/src/remote_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use reqwest::{StatusCode, Url};
88
use serde::{de::DeserializeOwned, Serialize};
99

1010
use bytes::Bytes;
11-
use cas_object::cas_object_format::CasObject;
11+
use cas_object::CasObject;
1212
use cas_types::CASReconstructionTerm;
1313
use tracing::warn;
1414

cas_object/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ merklehash = { path = "../merklehash" }
1111
tempfile = "3.12.0"
1212
tracing = "0.1.40"
1313
xet_error = { path = "../xet_error" }
14+
cas_types = { path = "../cas_types" }
15+
lz4_flex = "0.11.3"
16+
bytes = "1.7.2"
1417

1518
[dev-dependencies]
1619
rand = "0.8.5"

0 commit comments

Comments
 (0)