diff --git a/Cargo.lock b/Cargo.lock index 3b6fc9aa..e860a9be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -605,6 +605,7 @@ dependencies = [ "tokio-util", "tracing", "utils", + "zstd", ] [[package]] @@ -5179,3 +5180,31 @@ dependencies = [ "quote", "syn 2.0.101", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index a6fef7f3..a7499a79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ uuid = "1" walkdir = "2" web-time = "1.1" whoami = "1" +zstd = "0.13" # windows winapi = { version = "0.3", features = [ diff --git a/cas_object/Cargo.toml b/cas_object/Cargo.toml index dcfc7dca..548e6f51 100644 --- a/cas_object/Cargo.toml +++ b/cas_object/Cargo.toml @@ -30,6 +30,7 @@ futures = { workspace = true } half = { workspace = true } lz4_flex = { workspace = true } more-asserts = { workspace = true } +zstd = { workspace = true } rand = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/cas_object/src/cas_chunk_format/deserialize_async.rs b/cas_object/src/cas_chunk_format/deserialize_async.rs index 2ac58b3e..dc9dbd27 100644 --- a/cas_object/src/cas_chunk_format/deserialize_async.rs +++ b/cas_object/src/cas_chunk_format/deserialize_async.rs @@ -154,9 +154,12 @@ mod tests { (1, CompressionScheme::None), (3, CompressionScheme::None), (5, CompressionScheme::LZ4), + (5, CompressionScheme::Zstd), (100, CompressionScheme::None), (100, CompressionScheme::LZ4), + (100, CompressionScheme::Zstd), (1000, CompressionScheme::LZ4), + (1000, CompressionScheme::Zstd), ]; let rng = &mut rng(); for (num_chunks, compression_scheme) in cases { diff --git a/cas_object/src/compression_scheme.rs b/cas_object/src/compression_scheme.rs index f42da819..f00dc174 100644 --- a/cas_object/src/compression_scheme.rs +++ b/cas_object/src/compression_scheme.rs @@ -5,6 +5,7 @@ use std::time::Instant; use anyhow::anyhow; use lz4_flex::frame::{FrameDecoder, FrameEncoder}; +use zstd::stream::{Decoder, Encoder}; use crate::byte_grouping::BG4Predictor; use crate::byte_grouping::bg4::{bg4_regroup, bg4_split}; @@ -14,6 +15,8 @@ pub static mut BG4_SPLIT_RUNTIME: f64 = 0.; pub static mut BG4_REGROUP_RUNTIME: f64 = 0.; pub static mut BG4_LZ4_COMPRESS_RUNTIME: f64 = 0.; pub static mut BG4_LZ4_DECOMPRESS_RUNTIME: f64 = 0.; +pub static mut BG4_ZSTD_COMPRESS_RUNTIME: f64 = 0.; +pub static mut BG4_ZSTD_DECOMPRESS_RUNTIME: f64 = 0.; /// Dis-allow the value of ascii capital letters as valid CompressionScheme, 65-90 #[repr(u8)] @@ -23,8 +26,10 @@ pub enum CompressionScheme { None = 0, LZ4 = 1, ByteGrouping4LZ4 = 2, // 4 byte groups + Zstd = 3, + ByteGrouping4Zstd = 4, // 4 byte groups with zstd } -pub const NUM_COMPRESSION_SCHEMES: usize = 3; +pub const NUM_COMPRESSION_SCHEMES: usize = 5; impl Display for CompressionScheme { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -37,6 +42,8 @@ impl From<&CompressionScheme> for &'static str { CompressionScheme::None => "none", CompressionScheme::LZ4 => "lz4", CompressionScheme::ByteGrouping4LZ4 => "bg4-lz4", + CompressionScheme::Zstd => "zstd", + CompressionScheme::ByteGrouping4Zstd => "bg4-zstd", } } } @@ -55,6 +62,8 @@ impl TryFrom for CompressionScheme { 0 => Ok(CompressionScheme::None), 1 => Ok(CompressionScheme::LZ4), 2 => Ok(CompressionScheme::ByteGrouping4LZ4), + 3 => Ok(CompressionScheme::Zstd), + 4 => Ok(CompressionScheme::ByteGrouping4Zstd), _ => Err(CasObjectError::FormatError(anyhow!("cannot convert value {value} to CompressionScheme"))), } } @@ -66,6 +75,8 @@ impl CompressionScheme { CompressionScheme::None => data.into(), CompressionScheme::LZ4 => lz4_compress_from_slice(data).map(Cow::from)?, CompressionScheme::ByteGrouping4LZ4 => bg4_lz4_compress_from_slice(data).map(Cow::from)?, + CompressionScheme::Zstd => zstd_compress_from_slice(data).map(Cow::from)?, + CompressionScheme::ByteGrouping4Zstd => bg4_zstd_compress_from_slice(data).map(Cow::from)?, }) } @@ -74,6 +85,8 @@ impl CompressionScheme { CompressionScheme::None => data.into(), CompressionScheme::LZ4 => lz4_decompress_from_slice(data).map(Cow::from)?, CompressionScheme::ByteGrouping4LZ4 => bg4_lz4_decompress_from_slice(data).map(Cow::from)?, + CompressionScheme::Zstd => zstd_decompress_from_slice(data).map(Cow::from)?, + CompressionScheme::ByteGrouping4Zstd => bg4_zstd_decompress_from_slice(data).map(Cow::from)?, }) } @@ -82,6 +95,8 @@ impl CompressionScheme { CompressionScheme::None => copy(reader, writer)?, CompressionScheme::LZ4 => lz4_decompress_from_reader(reader, writer)?, CompressionScheme::ByteGrouping4LZ4 => bg4_lz4_decompress_from_reader(reader, writer)?, + CompressionScheme::Zstd => zstd_decompress_from_reader(reader, writer)?, + CompressionScheme::ByteGrouping4Zstd => bg4_zstd_decompress_from_reader(reader, writer)?, }) } @@ -160,6 +175,67 @@ fn bg4_lz4_decompress_from_reader(reader: &mut R, writer: &mu Ok(regrouped.len() as u64) } +pub fn zstd_compress_from_slice(data: &[u8]) -> Result> { + let mut enc = Encoder::new(Vec::new(), 3)?; + enc.write_all(data)?; + Ok(enc.finish()?) +} + +pub fn zstd_decompress_from_slice(data: &[u8]) -> Result> { + let mut dest = vec![]; + zstd_decompress_from_reader(&mut Cursor::new(data), &mut dest)?; + Ok(dest) +} + +fn zstd_decompress_from_reader(reader: &mut R, writer: &mut W) -> Result { + let mut dec = Decoder::new(reader)?; + Ok(copy(&mut dec, writer)?) +} + +pub fn bg4_zstd_compress_from_slice(data: &[u8]) -> Result> { + let s = Instant::now(); + let groups = bg4_split(data); + unsafe { + BG4_SPLIT_RUNTIME += s.elapsed().as_secs_f64(); + } + + let s = Instant::now(); + let mut dest = vec![]; + let mut enc = Encoder::new(&mut dest, 3)?; + enc.write_all(&groups)?; + enc.finish()?; + unsafe { + BG4_ZSTD_COMPRESS_RUNTIME += s.elapsed().as_secs_f64(); + } + + Ok(dest) +} + +pub fn bg4_zstd_decompress_from_slice(data: &[u8]) -> Result> { + let mut dest = vec![]; + bg4_zstd_decompress_from_reader(&mut Cursor::new(data), &mut dest)?; + Ok(dest) +} + +fn bg4_zstd_decompress_from_reader(reader: &mut R, writer: &mut W) -> Result { + let s = Instant::now(); + let mut g = vec![]; + Decoder::new(reader)?.read_to_end(&mut g)?; + unsafe { + BG4_ZSTD_DECOMPRESS_RUNTIME += s.elapsed().as_secs_f64(); + } + + let s = Instant::now(); + let regrouped = bg4_regroup(&g); + unsafe { + BG4_REGROUP_RUNTIME += s.elapsed().as_secs_f64(); + } + + writer.write_all(®rouped)?; + + Ok(regrouped.len() as u64) +} + #[cfg(test)] mod tests { use std::mem::size_of; @@ -174,6 +250,8 @@ mod tests { assert_eq!(Into::<&str>::into(CompressionScheme::None), "none"); assert_eq!(Into::<&str>::into(CompressionScheme::LZ4), "lz4"); assert_eq!(Into::<&str>::into(CompressionScheme::ByteGrouping4LZ4), "bg4-lz4"); + assert_eq!(Into::<&str>::into(CompressionScheme::Zstd), "zstd"); + assert_eq!(Into::<&str>::into(CompressionScheme::ByteGrouping4Zstd), "bg4-zstd"); } #[test] @@ -181,7 +259,9 @@ mod tests { assert_eq!(CompressionScheme::try_from(0u8), Ok(CompressionScheme::None)); assert_eq!(CompressionScheme::try_from(1u8), Ok(CompressionScheme::LZ4)); assert_eq!(CompressionScheme::try_from(2u8), Ok(CompressionScheme::ByteGrouping4LZ4)); - assert!(CompressionScheme::try_from(3u8).is_err()); + assert_eq!(CompressionScheme::try_from(3u8), Ok(CompressionScheme::Zstd)); + assert_eq!(CompressionScheme::try_from(4u8), Ok(CompressionScheme::ByteGrouping4Zstd)); + assert!(CompressionScheme::try_from(5u8).is_err()); } #[test] @@ -263,4 +343,28 @@ mod tests { } } } + + #[test] + fn test_zstd() { + let mut rng = rand::rng(); + + for i in 0..4 { + let n = 64 * 1024 + i * 23; + let all_zeros = vec![0u8; n]; + let random_u8s: Vec<_> = (0..n).map(|_| rng.random_range(0..255)).collect(); + + let dataset = [all_zeros, random_u8s]; + + for data in dataset { + let zstd_compressed = zstd_compress_from_slice(&data).unwrap(); + let zstd_uncompressed = zstd_decompress_from_slice(&zstd_compressed).unwrap(); + assert_eq!(data, zstd_uncompressed); + + let bg4_zstd_compressed = bg4_zstd_compress_from_slice(&data).unwrap(); + let bg4_zstd_uncompressed = bg4_zstd_decompress_from_slice(&bg4_zstd_compressed).unwrap(); + assert_eq!(data.len(), bg4_zstd_uncompressed.len()); + assert_eq!(data, bg4_zstd_uncompressed); + } + } + } } diff --git a/data/src/bin/xtool.rs b/data/src/bin/xtool.rs index 801c9020..949e5108 100644 --- a/data/src/bin/xtool.rs +++ b/data/src/bin/xtool.rs @@ -98,7 +98,9 @@ struct DedupArg { /// The compression scheme to use on XORB upload. Choices are /// 0: no compression; /// 1: LZ4 compression; - /// 2: 4 byte groups with LZ4 compression. + /// 2: 4 byte groups with LZ4 compression; + /// 3: Zstd compression; + /// 4: 4 byte groups with Zstd compression. /// If not specified, this will be determined by the repo type. #[clap(short, long)] compression: Option,