diff --git a/benches/compress.rs b/benches/compress.rs index 69b5670..04df37d 100644 --- a/benches/compress.rs +++ b/benches/compress.rs @@ -79,12 +79,12 @@ fn bench_dbtext(c: &mut Criterion) { let mut buffer = Vec::with_capacity(200 * 1024 * 1024); group.throughput(Throughput::Bytes(buf.len() as u64)); group.bench_function("compress-only", |b| { - b.iter(|| unsafe { compressor.compress_into(&buf, &mut buffer) }); + b.iter(|| compressor.compress_into_uninit(&buf, buffer.spare_capacity_mut())); }); - unsafe { - compressor.compress_into(&buf, &mut buffer); - }; + let initialized = compressor.compress_into_uninit(&buf, buffer.spare_capacity_mut()); + unsafe { buffer.set_len(initialized) }; + let decompressor = compressor.decompressor(); group.bench_function("decompress", |b| { b.iter_with_large_drop(|| decompressor.decompress(&buffer)); diff --git a/benches/micro.rs b/benches/micro.rs index 89c54d2..e68f6ae 100644 --- a/benches/micro.rs +++ b/benches/micro.rs @@ -142,9 +142,7 @@ fn bench_compress(c: &mut Criterion) { assert!(compressor.insert(Symbol::from_u8(b'a'), 1)); let compressor = compressor.build(); - b.iter(|| unsafe { - compressor.compress_into(&test_string, &mut output_buf); - }) + b.iter(|| compressor.compress_into_uninit(&test_string, output_buf.spare_capacity_mut())) }); group.finish(); @@ -159,9 +157,7 @@ fn bench_compress(c: &mut Criterion) { assert!(compressor.insert(Symbol::from_slice(&[b'b', b'a', b'b', 0, 0, 0, 0, 0]), 3)); let compressor = compressor.build(); - b.iter(|| unsafe { - compressor.compress_into(&test_string, &mut output_buf); - }) + b.iter(|| compressor.compress_into_uninit(&test_string, output_buf.spare_capacity_mut())) }); group.finish(); @@ -173,9 +169,7 @@ fn bench_compress(c: &mut Criterion) { assert!(compressor.insert(Symbol::from_slice(&[b'a', b'b', b'c', b'd', 0, 0, 0, 0]), 4)); let compressor = compressor.build(); - b.iter(|| unsafe { - compressor.compress_into(&test_string, &mut output_buf); - }) + b.iter(|| compressor.compress_into_uninit(&test_string, output_buf.spare_capacity_mut())) }); group.finish(); @@ -187,9 +181,7 @@ fn bench_compress(c: &mut Criterion) { assert!(compressor.insert(Symbol::from_slice(b"abcdefgh"), 8)); let compressor = compressor.build(); - b.iter(|| unsafe { - compressor.compress_into(&test_string, &mut output_buf); - }) + b.iter(|| compressor.compress_into_uninit(&test_string, output_buf.spare_capacity_mut())) }); group.bench_function("decompress", |b| { diff --git a/src/builder.rs b/src/builder.rs index 31cace5..3a351bc 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -705,7 +705,7 @@ impl CompressorBuilder { ); let remaining_bytes = remaining_bytes as usize; - // Load the last `remaining_byte`s of data into a final world. We then replicate the loop above, + // Load the last `remaining_byte`s of data into a final word. We then replicate the loop above, // but shift data out of this word rather than advancing an input pointer and potentially reading // unowned memory let mut bytes = [0u8; 8]; diff --git a/src/lib.rs b/src/lib.rs index 4a7ba7b..2832de4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -650,6 +650,58 @@ impl Compressor { } } + #[inline(always)] + fn compress_word_safe(&self, word: u64, out_ptr: &mut [MaybeUninit]) -> (usize, usize) { + assert!(out_ptr.len() >= 2); + + // Speculatively write the first byte of `word` at offset 1. This is necessary if it is an escape, and + // if it isn't, it will be overwritten anyway. + let first_byte = word as u8; + out_ptr[1].write(first_byte); + + // First, check the two_bytes table + let code_twobyte = self.codes_two_byte[word as u16 as usize]; + + if code_twobyte.code() < self.has_suffix_code { + // 2 byte code without having to worry about longer matches. + out_ptr[0].write(code_twobyte.code()); + + // Advance input by symbol length (2) and output by a single code byte + (2, 1) + } else { + // Probe the hash table + let entry = self.lossy_pht.lookup(word); + + // Now, downshift the `word` and the `entry` to see if they align. + let ignored_bits = entry.ignored_bits; + if entry.code != Code::UNUSED + && compare_masked(word, entry.symbol.to_u64(), ignored_bits) + { + // Advance the input by the symbol length (variable) and the output by one code byte + // SAFETY: out_ptr is not null. + out_ptr[0].write(entry.code.code()); + (entry.code.len() as usize, 1) + } else { + // SAFETY: out_ptr is not null + out_ptr[0].write(code_twobyte.code()); + + // Advance the input by the symbol length (variable) and the output by either 1 + // byte (if was one-byte code) or two bytes (escape). + ( + code_twobyte.len() as usize, + // Predicated version of: + // + // if entry.code >= 256 { + // 2 + // } else { + // 1 + // } + 1 + (code_twobyte.extended_code() >> 8) as usize, + ) + } + } + } + /// Compress many lines in bulk. pub fn compress_bulk(&self, lines: &Vec<&[u8]>) -> Vec> { let mut res = Vec::new(); @@ -721,7 +773,7 @@ impl Compressor { let remaining_bytes = remaining_bytes as usize; - // Load the last `remaining_byte`s of data into a final world. We then replicate the loop above, + // Load the last `remaining_byte`s of data into a final word. We then replicate the loop above, // but shift data out of this word rather than advancing an input pointer and potentially reading // unowned memory. let mut bytes = [0u8; 8]; @@ -761,16 +813,77 @@ impl Compressor { unsafe { values.set_len(bytes_written as usize) }; } + /// Compress a plain value into a block of uninitialized memory. + /// + /// Returns the number of bytes of memory that were initialized. + pub fn compress_into_uninit(&self, plaintext: &[u8], into: &mut [MaybeUninit]) -> usize { + let mut in_ptr = 0; + let mut out_ptr = 0; + + loop { + let input = &plaintext[in_ptr..]; + let output = &mut into[out_ptr..]; + + if input.len() < 8 || output.len() < 8 { + break; + } + + let word = u64::from_le_bytes(input[..8].try_into().unwrap()); + let (advance_in, advance_out) = self.compress_word_safe(word, output); + in_ptr += advance_in; + out_ptr += advance_out; + } + + let remaining_bytes = plaintext.len() - in_ptr; + assert!( + out_ptr < into.len() || remaining_bytes == 0, + "output buffer sized too small" + ); + + // Load the last `remaining_byte`s of data into a final word. We then replicate the loop above, + // but shift data out of this word rather than advancing an input pointer and potentially reading + // unowned memory. + let mut bytes = [0u8; 8]; + bytes[0..remaining_bytes].copy_from_slice(&plaintext[in_ptr..]); + let mut last_word = u64::from_le_bytes(bytes); + + loop { + let output = &mut into[out_ptr..]; + + if in_ptr >= plaintext.len() || output.is_empty() { + break; + } + + let (advance_in, advance_out) = self.compress_word_safe(last_word, output); + + in_ptr += advance_in; + out_ptr += advance_out; + last_word = advance_8byte_word(last_word, advance_in); + } + + assert!( + in_ptr >= plaintext.len(), + "exhausted output buffer before exhausting input, there is a bug in SymbolTable::compress()" + ); + + out_ptr + } + /// Use the symbol table to compress the plaintext into a sequence of codes and escapes. pub fn compress(&self, plaintext: &[u8]) -> Vec { if plaintext.is_empty() { return Vec::new(); } + // Initialize a buffer sufficiently large to handle all plaintext. let mut buffer = Vec::with_capacity(plaintext.len() * 2); // SAFETY: the largest compressed size would be all escapes == 2*plaintext_len - unsafe { self.compress_into(plaintext, &mut buffer) }; + let initialized = self.compress_into_uninit(plaintext, buffer.spare_capacity_mut()); + + // SAFETY: `initialized` elements were initialized in the call above. + // TODO(aduffy): shrink_to_fit? + unsafe { buffer.set_len(initialized) }; buffer }