Skip to content

Commit 9392e59

Browse files
authored
Add LZ4 compression usage to CasObject (#17)
* Consolidate CompressionScheme enum - removed from utils crate - moved from cas_types crate to cas_object * Adds LZ4 compression, with unit-tests
1 parent 037d267 commit 9392e59

File tree

10 files changed

+182
-149
lines changed

10 files changed

+182
-149
lines changed

cas_client/src/data_transport.rs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,8 @@ use std::time::Duration;
44

55
use crate::cas_connection_pool::CasConnectionConfig;
66
use anyhow::{anyhow, Result};
7-
use cas::common::CompressionScheme;
8-
use cas::compression::{
9-
multiple_accepted_encoding_header_value, CAS_ACCEPT_ENCODING_HEADER,
10-
CAS_CONTENT_ENCODING_HEADER, CAS_INFLATED_SIZE_HEADER,
11-
};
7+
use cas_object::CompressionScheme;
8+
129
use error_printer::ErrorPrinter;
1310
use http_body_util::{BodyExt, Full};
1411
use hyper::body::Bytes;
@@ -33,16 +30,26 @@ use xet_error::Error;
3330

3431
use merklehash::MerkleHash;
3532

33+
const CAS_CONTENT_ENCODING_HEADER: &str = "xet-cas-content-encoding";
34+
const CAS_ACCEPT_ENCODING_HEADER: &str = "xet-cas-content-encoding";
35+
const CAS_INFLATED_SIZE_HEADER: &str = "xet-cas-inflated-size";
36+
3637
const HTTP2_POOL_IDLE_TIMEOUT_SECS: u64 = 30;
3738
const HTTP2_KEEPALIVE_MILLIS: u64 = 500;
3839
const HTTP2_WINDOW_SIZE: u32 = 2147418112;
3940
const NUM_RETRIES: usize = 5;
4041
const BASE_RETRY_DELAY_MS: u64 = 3000;
4142

43+
// in the header value, we will consider
44+
fn multiple_accepted_encoding_header_value(list: Vec<CompressionScheme>) -> String {
45+
let as_strs: Vec<&str> = list.iter().map(Into::into).collect();
46+
as_strs.join(";").to_string()
47+
}
48+
4249
lazy_static! {
4350
static ref ACCEPTED_ENCODINGS_HEADER_VALUE: HeaderValue = HeaderValue::from_str(
4451
multiple_accepted_encoding_header_value(vec![
45-
CompressionScheme::Lz4,
52+
CompressionScheme::LZ4,
4653
CompressionScheme::None
4754
])
4855
.as_str()
@@ -284,7 +291,7 @@ impl DataTransport {
284291
let bytes = maybe_decode(bytes.as_slice(), encoding, uncompressed_size)?;
285292
debug!(
286293
"GET; encoding: ({}), uncompressed size: ({}), payload ({}) prefix: ({}), hash: ({})",
287-
encoding.as_str_name(),
294+
encoding,
288295
uncompressed_size.unwrap_or_default(),
289296
payload_size,
290297
prefix,
@@ -344,7 +351,7 @@ impl DataTransport {
344351
.to_vec();
345352
let payload_size = bytes.len();
346353
let bytes = maybe_decode(bytes.as_slice(), encoding, uncompressed_size)?;
347-
debug!("GET RANGE; encoding: ({}), uncompressed size: ({}), payload ({}) prefix: ({}), hash: ({})", encoding.as_str_name(), uncompressed_size.unwrap_or_default(), payload_size, prefix, hash);
354+
debug!("GET RANGE; encoding: ({}), uncompressed size: ({}), payload ({}) prefix: ({}), hash: ({})", encoding, uncompressed_size.unwrap_or_default(), payload_size, prefix, hash);
348355
Ok(bytes.to_vec())
349356
},
350357
is_status_retriable_and_print,
@@ -367,7 +374,7 @@ impl DataTransport {
367374
let data = maybe_encode(data, encoding)?;
368375
debug!(
369376
"PUT; encoding: ({}), uncompressed size: ({}), payload: ({}), prefix: ({}), hash: ({})",
370-
encoding.as_str_name(),
377+
encoding,
371378
full_size,
372379
data.len(),
373380
prefix,
@@ -423,7 +430,7 @@ fn maybe_decode<'a, T: Into<&'a [u8]>>(
423430
encoding: CompressionScheme,
424431
uncompressed_size: Option<i32>,
425432
) -> Result<Vec<u8>> {
426-
if let CompressionScheme::Lz4 = encoding {
433+
if let CompressionScheme::LZ4 = encoding {
427434
if uncompressed_size.is_none() {
428435
return Err(anyhow!(
429436
"Missing uncompressed size when attempting to decompress LZ4"
@@ -447,7 +454,7 @@ fn get_encoding_info<T>(response: &Response<T>) -> Option<(CompressionScheme, Op
447454
}
448455

449456
fn maybe_encode<'a, T: Into<&'a [u8]>>(data: T, encoding: CompressionScheme) -> Result<Vec<u8>> {
450-
if let CompressionScheme::Lz4 = encoding {
457+
if let CompressionScheme::LZ4 = encoding {
451458
lz4::block::compress(data.into(), Some(CompressionMode::DEFAULT), false)
452459
.log_error("LZ4 compression error")
453460
.map_err(|e| anyhow!(e))
@@ -580,4 +587,20 @@ mod tests {
580587
assert_eq!(get_header_value(GIT_XET_VERSION_HEADER), git_xet_version);
581588
assert_eq!(get_header_value(USER_ID_HEADER), user_id);
582589
}
590+
591+
#[test]
592+
fn test_multiple_accepted_encoding_header_value() {
593+
let multi = vec![CompressionScheme::LZ4, CompressionScheme::None];
594+
assert_eq!(
595+
multiple_accepted_encoding_header_value(multi),
596+
"lz4;none".to_string()
597+
);
598+
599+
let singular = vec![CompressionScheme::LZ4];
600+
assert_eq!(
601+
multiple_accepted_encoding_header_value(singular),
602+
"lz4".to_string()
603+
);
604+
}
605+
583606
}

cas_client/src/local_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ impl Client for LocalClient {
226226
hash,
227227
&data,
228228
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
229+
cas_object::CompressionScheme::None
229230
)?;
230231
// flush before persisting
231232
writer.flush()?;

cas_client/src/remote_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ impl CASAPIClient {
168168
&key.hash,
169169
contents,
170170
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
171+
cas_object::CompressionScheme::LZ4
171172
)?;
172173

173174
debug!("Upload: POST to {url:?} for {key:?}");

cas_object/src/cas_chunk_format.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66

77
use crate::error::CasObjectError;
88
use anyhow::anyhow;
9-
use cas_types::compression_scheme::CompressionScheme;
9+
use crate::CompressionScheme;
1010
use lz4_flex::frame::{FrameDecoder, FrameEncoder};
1111

1212
pub const CAS_CHUNK_HEADER_LENGTH: u8 = 8;
@@ -191,7 +191,7 @@ mod tests {
191191
use std::io::Cursor;
192192

193193
use super::*;
194-
use cas_types::compression_scheme::CompressionScheme;
194+
use CompressionScheme;
195195
use rand::Rng;
196196

197197
const COMP_LEN: u32 = 0x010203;

cas_object/src/cas_object_format.rs

Lines changed: 131 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ use std::{
77
};
88

99
use crate::{
10-
cas_chunk_format::{deserialize_chunk, serialize_chunk},
11-
error::CasObjectError,
10+
cas_chunk_format::{deserialize_chunk, serialize_chunk}, error::CasObjectError, CompressionScheme
1211
};
1312
use anyhow::anyhow;
1413

@@ -453,6 +452,7 @@ impl CasObject {
453452
hash: &MerkleHash,
454453
data: &[u8],
455454
chunk_boundaries: &Vec<u32>,
455+
compression_scheme: CompressionScheme,
456456
) -> Result<(Self, usize), CasObjectError> {
457457
let mut cas = CasObject::default();
458458
cas.info.cashash.copy_from_slice(hash.as_slice());
@@ -474,11 +474,8 @@ impl CasObject {
474474

475475
// now serialize chunk directly to writer (since chunks come first!)
476476
// TODO: add compression scheme to this call
477-
let chunk_written_bytes = serialize_chunk(
478-
&chunk_raw_bytes,
479-
writer,
480-
cas_types::compression_scheme::CompressionScheme::None,
481-
)?;
477+
let chunk_written_bytes =
478+
serialize_chunk(&chunk_raw_bytes, writer, compression_scheme)?;
482479
total_written_bytes += chunk_written_bytes;
483480

484481
let chunk_meta = CasChunkInfo {
@@ -554,7 +551,7 @@ mod tests {
554551
#[test]
555552
fn test_chunk_boundaries_chunk_size_info() {
556553
// Arrange
557-
let (c, _cas_data, _raw_data) = build_cas_object(3, 100, false);
554+
let (c, _cas_data, _raw_data) = build_cas_object(3, 100, false, false);
558555
// Act & Assert
559556
assert_eq!(c.get_chunk_boundaries().len(), 3);
560557
assert_eq!(c.get_chunk_boundaries(), [100, 200, 300]);
@@ -579,6 +576,7 @@ mod tests {
579576
num_chunks: u32,
580577
uncompressed_chunk_size: u32,
581578
use_random_chunk_size: bool,
579+
use_lz4_compression: bool
582580
) -> (CasObject, Vec<u8>, Vec<u8>) {
583581
let mut c = CasObject::default();
584582

@@ -594,7 +592,7 @@ mod tests {
594592
for _idx in 0..num_chunks {
595593
let chunk_size: u32 = if use_random_chunk_size {
596594
let mut rng = rand::thread_rng();
597-
rng.gen_range(1024..=uncompressed_chunk_size)
595+
rng.gen_range(512..=uncompressed_chunk_size)
598596
} else {
599597
uncompressed_chunk_size
600598
};
@@ -606,10 +604,15 @@ mod tests {
606604

607605
// build chunk, create ChunkInfo and keep going
608606

607+
let compression_scheme = match use_lz4_compression {
608+
true => CompressionScheme::LZ4,
609+
false => CompressionScheme::None
610+
};
611+
609612
let bytes_written = serialize_chunk(
610613
&bytes,
611614
&mut writer,
612-
cas_types::compression_scheme::CompressionScheme::None,
615+
compression_scheme,
613616
)
614617
.unwrap();
615618

@@ -646,14 +649,15 @@ mod tests {
646649
#[test]
647650
fn test_basic_serialization_mem() {
648651
// Arrange
649-
let (c, _cas_data, raw_data) = build_cas_object(3, 100, false);
652+
let (c, _cas_data, raw_data) = build_cas_object(3, 100, false, false);
650653
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
651654
// Act & Assert
652655
assert!(CasObject::serialize(
653656
&mut writer,
654657
&c.info.cashash,
655658
&raw_data,
656659
&c.get_chunk_boundaries(),
660+
CompressionScheme::None
657661
)
658662
.is_ok());
659663

@@ -670,14 +674,15 @@ mod tests {
670674
#[test]
671675
fn test_serialization_deserialization_mem_medium() {
672676
// Arrange
673-
let (c, _cas_data, raw_data) = build_cas_object(32, 16384, false);
677+
let (c, _cas_data, raw_data) = build_cas_object(32, 16384, false, false);
674678
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
675679
// Act & Assert
676680
assert!(CasObject::serialize(
677681
&mut writer,
678682
&c.info.cashash,
679683
&raw_data,
680684
&c.get_chunk_boundaries(),
685+
CompressionScheme::None
681686
)
682687
.is_ok());
683688

@@ -697,14 +702,15 @@ mod tests {
697702
#[test]
698703
fn test_serialization_deserialization_mem_large_random() {
699704
// Arrange
700-
let (c, _cas_data, raw_data) = build_cas_object(32, 65536, true);
705+
let (c, _cas_data, raw_data) = build_cas_object(32, 65536, true, false);
701706
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
702707
// Act & Assert
703708
assert!(CasObject::serialize(
704709
&mut writer,
705710
&c.info.cashash,
706711
&raw_data,
707712
&c.get_chunk_boundaries(),
713+
CompressionScheme::None
708714
)
709715
.is_ok());
710716

@@ -723,14 +729,125 @@ mod tests {
723729
#[test]
724730
fn test_serialization_deserialization_file_large_random() {
725731
// Arrange
726-
let (c, _cas_data, raw_data) = build_cas_object(256, 65536, true);
732+
let (c, _cas_data, raw_data) = build_cas_object(256, 65536, true, false);
733+
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
734+
// Act & Assert
735+
assert!(CasObject::serialize(
736+
&mut writer,
737+
&c.info.cashash,
738+
&raw_data,
739+
&c.get_chunk_boundaries(),
740+
CompressionScheme::None
741+
)
742+
.is_ok());
743+
744+
let mut reader = writer.clone();
745+
reader.set_position(0);
746+
let res = CasObject::deserialize(&mut reader);
747+
assert!(res.is_ok());
748+
749+
let c2 = res.unwrap();
750+
assert_eq!(c, c2);
751+
752+
assert_eq!(c.info.num_chunks, c2.info.num_chunks);
753+
assert_eq!(raw_data, c2.get_all_bytes(&mut reader).unwrap());
754+
}
755+
756+
#[test]
757+
fn test_basic_mem_lz4() {
758+
// Arrange
759+
let (c, _cas_data, raw_data) = build_cas_object(1, 8, false, true);
760+
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
761+
// Act & Assert
762+
assert!(CasObject::serialize(
763+
&mut writer,
764+
&c.info.cashash,
765+
&raw_data,
766+
&c.get_chunk_boundaries(),
767+
CompressionScheme::LZ4
768+
)
769+
.is_ok());
770+
771+
let mut reader = writer.clone();
772+
reader.set_position(0);
773+
let res = CasObject::deserialize(&mut reader);
774+
assert!(res.is_ok());
775+
776+
let c2 = res.unwrap();
777+
assert_eq!(c, c2);
778+
779+
let bytes_read = c2.get_all_bytes(&mut reader).unwrap();
780+
assert_eq!(c.info.num_chunks, c2.info.num_chunks);
781+
assert_eq!(raw_data, bytes_read);
782+
}
783+
784+
#[test]
785+
fn test_serialization_deserialization_mem_medium_lz4() {
786+
// Arrange
787+
let (c, _cas_data, raw_data) = build_cas_object(32, 16384, false, true);
788+
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
789+
// Act & Assert
790+
assert!(CasObject::serialize(
791+
&mut writer,
792+
&c.info.cashash,
793+
&raw_data,
794+
&c.get_chunk_boundaries(),
795+
CompressionScheme::LZ4
796+
)
797+
.is_ok());
798+
799+
let mut reader = writer.clone();
800+
reader.set_position(0);
801+
let res = CasObject::deserialize(&mut reader);
802+
assert!(res.is_ok());
803+
804+
let c2 = res.unwrap();
805+
assert_eq!(c, c2);
806+
807+
let bytes_read = c2.get_all_bytes(&mut reader).unwrap();
808+
assert_eq!(c.info.num_chunks, c2.info.num_chunks);
809+
assert_eq!(raw_data, bytes_read);
810+
}
811+
812+
#[test]
813+
fn test_serialization_deserialization_mem_large_random_lz4() {
814+
// Arrange
815+
let (c, _cas_data, raw_data) = build_cas_object(32, 65536, true, true);
816+
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
817+
// Act & Assert
818+
assert!(CasObject::serialize(
819+
&mut writer,
820+
&c.info.cashash,
821+
&raw_data,
822+
&c.get_chunk_boundaries(),
823+
CompressionScheme::LZ4
824+
)
825+
.is_ok());
826+
827+
let mut reader = writer.clone();
828+
reader.set_position(0);
829+
let res = CasObject::deserialize(&mut reader);
830+
assert!(res.is_ok());
831+
832+
let c2 = res.unwrap();
833+
assert_eq!(c, c2);
834+
835+
assert_eq!(c.info.num_chunks, c2.info.num_chunks);
836+
assert_eq!(raw_data, c2.get_all_bytes(&mut reader).unwrap());
837+
}
838+
839+
#[test]
840+
fn test_serialization_deserialization_file_large_random_lz4() {
841+
// Arrange
842+
let (c, _cas_data, raw_data) = build_cas_object(256, 65536, true, true);
727843
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
728844
// Act & Assert
729845
assert!(CasObject::serialize(
730846
&mut writer,
731847
&c.info.cashash,
732848
&raw_data,
733849
&c.get_chunk_boundaries(),
850+
CompressionScheme::LZ4
734851
)
735852
.is_ok());
736853

0 commit comments

Comments
 (0)