Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ uuid = "1"
walkdir = "2"
web-time = "1.1"
whoami = "1"
zstd = "0.13"

# windows
winapi = { version = "0.3", features = [
Expand Down
1 change: 1 addition & 0 deletions cas_object/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ futures = { workspace = true }
half = { workspace = true }
lz4_flex = { workspace = true }
more-asserts = { workspace = true }
zstd = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions cas_object/src/cas_chunk_format/deserialize_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,12 @@ mod tests {
(1, CompressionScheme::None),
(3, CompressionScheme::None),
(5, CompressionScheme::LZ4),
(5, CompressionScheme::Zstd),
(100, CompressionScheme::None),
(100, CompressionScheme::LZ4),
(100, CompressionScheme::Zstd),
(1000, CompressionScheme::LZ4),
(1000, CompressionScheme::Zstd),
];
let rng = &mut rng();
for (num_chunks, compression_scheme) in cases {
Expand Down
108 changes: 106 additions & 2 deletions cas_object/src/compression_scheme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Instant;

use anyhow::anyhow;
use lz4_flex::frame::{FrameDecoder, FrameEncoder};
use zstd::stream::{Decoder, Encoder};

use crate::byte_grouping::BG4Predictor;
use crate::byte_grouping::bg4::{bg4_regroup, bg4_split};
Expand All @@ -14,6 +15,8 @@ pub static mut BG4_SPLIT_RUNTIME: f64 = 0.;
pub static mut BG4_REGROUP_RUNTIME: f64 = 0.;
pub static mut BG4_LZ4_COMPRESS_RUNTIME: f64 = 0.;
pub static mut BG4_LZ4_DECOMPRESS_RUNTIME: f64 = 0.;
pub static mut BG4_ZSTD_COMPRESS_RUNTIME: f64 = 0.;
pub static mut BG4_ZSTD_DECOMPRESS_RUNTIME: f64 = 0.;

/// Dis-allow the value of ascii capital letters as valid CompressionScheme, 65-90
#[repr(u8)]
Expand All @@ -23,8 +26,10 @@ pub enum CompressionScheme {
None = 0,
LZ4 = 1,
ByteGrouping4LZ4 = 2, // 4 byte groups
Zstd = 3,
ByteGrouping4Zstd = 4, // 4 byte groups with zstd
}
pub const NUM_COMPRESSION_SCHEMES: usize = 3;
pub const NUM_COMPRESSION_SCHEMES: usize = 5;

impl Display for CompressionScheme {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -37,6 +42,8 @@ impl From<&CompressionScheme> for &'static str {
CompressionScheme::None => "none",
CompressionScheme::LZ4 => "lz4",
CompressionScheme::ByteGrouping4LZ4 => "bg4-lz4",
CompressionScheme::Zstd => "zstd",
CompressionScheme::ByteGrouping4Zstd => "bg4-zstd",
}
}
}
Expand All @@ -55,6 +62,8 @@ impl TryFrom<u8> for CompressionScheme {
0 => Ok(CompressionScheme::None),
1 => Ok(CompressionScheme::LZ4),
2 => Ok(CompressionScheme::ByteGrouping4LZ4),
3 => Ok(CompressionScheme::Zstd),
4 => Ok(CompressionScheme::ByteGrouping4Zstd),
_ => Err(CasObjectError::FormatError(anyhow!("cannot convert value {value} to CompressionScheme"))),
}
}
Expand All @@ -66,6 +75,8 @@ impl CompressionScheme {
CompressionScheme::None => data.into(),
CompressionScheme::LZ4 => lz4_compress_from_slice(data).map(Cow::from)?,
CompressionScheme::ByteGrouping4LZ4 => bg4_lz4_compress_from_slice(data).map(Cow::from)?,
CompressionScheme::Zstd => zstd_compress_from_slice(data).map(Cow::from)?,
CompressionScheme::ByteGrouping4Zstd => bg4_zstd_compress_from_slice(data).map(Cow::from)?,
})
}

Expand All @@ -74,6 +85,8 @@ impl CompressionScheme {
CompressionScheme::None => data.into(),
CompressionScheme::LZ4 => lz4_decompress_from_slice(data).map(Cow::from)?,
CompressionScheme::ByteGrouping4LZ4 => bg4_lz4_decompress_from_slice(data).map(Cow::from)?,
CompressionScheme::Zstd => zstd_decompress_from_slice(data).map(Cow::from)?,
CompressionScheme::ByteGrouping4Zstd => bg4_zstd_decompress_from_slice(data).map(Cow::from)?,
})
}

Expand All @@ -82,6 +95,8 @@ impl CompressionScheme {
CompressionScheme::None => copy(reader, writer)?,
CompressionScheme::LZ4 => lz4_decompress_from_reader(reader, writer)?,
CompressionScheme::ByteGrouping4LZ4 => bg4_lz4_decompress_from_reader(reader, writer)?,
CompressionScheme::Zstd => zstd_decompress_from_reader(reader, writer)?,
CompressionScheme::ByteGrouping4Zstd => bg4_zstd_decompress_from_reader(reader, writer)?,
})
}

Expand Down Expand Up @@ -160,6 +175,67 @@ fn bg4_lz4_decompress_from_reader<R: Read, W: Write>(reader: &mut R, writer: &mu
Ok(regrouped.len() as u64)
}

pub fn zstd_compress_from_slice(data: &[u8]) -> Result<Vec<u8>> {
let mut enc = Encoder::new(Vec::new(), 3)?;
enc.write_all(data)?;
Ok(enc.finish()?)
}

pub fn zstd_decompress_from_slice(data: &[u8]) -> Result<Vec<u8>> {
let mut dest = vec![];
zstd_decompress_from_reader(&mut Cursor::new(data), &mut dest)?;
Ok(dest)
}

fn zstd_decompress_from_reader<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> Result<u64> {
let mut dec = Decoder::new(reader)?;
Ok(copy(&mut dec, writer)?)
}

pub fn bg4_zstd_compress_from_slice(data: &[u8]) -> Result<Vec<u8>> {
let s = Instant::now();
let groups = bg4_split(data);
unsafe {
BG4_SPLIT_RUNTIME += s.elapsed().as_secs_f64();
}

let s = Instant::now();
let mut dest = vec![];
let mut enc = Encoder::new(&mut dest, 3)?;
enc.write_all(&groups)?;
enc.finish()?;
unsafe {
BG4_ZSTD_COMPRESS_RUNTIME += s.elapsed().as_secs_f64();
}

Ok(dest)
}

pub fn bg4_zstd_decompress_from_slice(data: &[u8]) -> Result<Vec<u8>> {
let mut dest = vec![];
bg4_zstd_decompress_from_reader(&mut Cursor::new(data), &mut dest)?;
Ok(dest)
}

fn bg4_zstd_decompress_from_reader<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> Result<u64> {
let s = Instant::now();
let mut g = vec![];
Decoder::new(reader)?.read_to_end(&mut g)?;
unsafe {
BG4_ZSTD_DECOMPRESS_RUNTIME += s.elapsed().as_secs_f64();
}

let s = Instant::now();
let regrouped = bg4_regroup(&g);
unsafe {
BG4_REGROUP_RUNTIME += s.elapsed().as_secs_f64();
}

writer.write_all(&regrouped)?;

Ok(regrouped.len() as u64)
}

#[cfg(test)]
mod tests {
use std::mem::size_of;
Expand All @@ -174,14 +250,18 @@ mod tests {
assert_eq!(Into::<&str>::into(CompressionScheme::None), "none");
assert_eq!(Into::<&str>::into(CompressionScheme::LZ4), "lz4");
assert_eq!(Into::<&str>::into(CompressionScheme::ByteGrouping4LZ4), "bg4-lz4");
assert_eq!(Into::<&str>::into(CompressionScheme::Zstd), "zstd");
assert_eq!(Into::<&str>::into(CompressionScheme::ByteGrouping4Zstd), "bg4-zstd");
}

#[test]
fn test_from_u8() {
assert_eq!(CompressionScheme::try_from(0u8), Ok(CompressionScheme::None));
assert_eq!(CompressionScheme::try_from(1u8), Ok(CompressionScheme::LZ4));
assert_eq!(CompressionScheme::try_from(2u8), Ok(CompressionScheme::ByteGrouping4LZ4));
assert!(CompressionScheme::try_from(3u8).is_err());
assert_eq!(CompressionScheme::try_from(3u8), Ok(CompressionScheme::Zstd));
assert_eq!(CompressionScheme::try_from(4u8), Ok(CompressionScheme::ByteGrouping4Zstd));
assert!(CompressionScheme::try_from(5u8).is_err());
}

#[test]
Expand Down Expand Up @@ -263,4 +343,28 @@ mod tests {
}
}
}

#[test]
fn test_zstd() {
let mut rng = rand::rng();

for i in 0..4 {
let n = 64 * 1024 + i * 23;
let all_zeros = vec![0u8; n];
let random_u8s: Vec<_> = (0..n).map(|_| rng.random_range(0..255)).collect();

let dataset = [all_zeros, random_u8s];

for data in dataset {
let zstd_compressed = zstd_compress_from_slice(&data).unwrap();
let zstd_uncompressed = zstd_decompress_from_slice(&zstd_compressed).unwrap();
assert_eq!(data, zstd_uncompressed);

let bg4_zstd_compressed = bg4_zstd_compress_from_slice(&data).unwrap();
let bg4_zstd_uncompressed = bg4_zstd_decompress_from_slice(&bg4_zstd_compressed).unwrap();
assert_eq!(data.len(), bg4_zstd_uncompressed.len());
assert_eq!(data, bg4_zstd_uncompressed);
}
}
}
}
4 changes: 3 additions & 1 deletion data/src/bin/xtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ struct DedupArg {
/// The compression scheme to use on XORB upload. Choices are
/// 0: no compression;
/// 1: LZ4 compression;
/// 2: 4 byte groups with LZ4 compression.
/// 2: 4 byte groups with LZ4 compression;
/// 3: Zstd compression;
/// 4: 4 byte groups with Zstd compression.
/// If not specified, this will be determined by the repo type.
#[clap(short, long)]
compression: Option<u8>,
Expand Down
Loading