Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ fn bench_dbtext(c: &mut Criterion) {
let mut buffer = Vec::with_capacity(200 * 1024 * 1024);
group.throughput(Throughput::Bytes(buf.len() as u64));
group.bench_function("compress-only", |b| {
b.iter(|| unsafe { compressor.compress_into(&buf, &mut buffer) });
b.iter(|| compressor.compress_into_uninit(&buf, buffer.spare_capacity_mut()));
});

unsafe {
compressor.compress_into(&buf, &mut buffer);
};
let initialized = compressor.compress_into_uninit(&buf, buffer.spare_capacity_mut());
unsafe { buffer.set_len(initialized) };

let decompressor = compressor.decompressor();
group.bench_function("decompress", |b| {
b.iter_with_large_drop(|| decompressor.decompress(&buffer));
Expand Down
16 changes: 4 additions & 12 deletions benches/micro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ fn bench_compress(c: &mut Criterion) {
assert!(compressor.insert(Symbol::from_u8(b'a'), 1));
let compressor = compressor.build();

b.iter(|| unsafe {
compressor.compress_into(&test_string, &mut output_buf);
})
b.iter(|| compressor.compress_into_uninit(&test_string, output_buf.spare_capacity_mut()))
});
group.finish();

Expand All @@ -159,9 +157,7 @@ fn bench_compress(c: &mut Criterion) {
assert!(compressor.insert(Symbol::from_slice(&[b'b', b'a', b'b', 0, 0, 0, 0, 0]), 3));
let compressor = compressor.build();

b.iter(|| unsafe {
compressor.compress_into(&test_string, &mut output_buf);
})
b.iter(|| compressor.compress_into_uninit(&test_string, output_buf.spare_capacity_mut()))
});
group.finish();

Expand All @@ -173,9 +169,7 @@ fn bench_compress(c: &mut Criterion) {
assert!(compressor.insert(Symbol::from_slice(&[b'a', b'b', b'c', b'd', 0, 0, 0, 0]), 4));
let compressor = compressor.build();

b.iter(|| unsafe {
compressor.compress_into(&test_string, &mut output_buf);
})
b.iter(|| compressor.compress_into_uninit(&test_string, output_buf.spare_capacity_mut()))
});
group.finish();

Expand All @@ -187,9 +181,7 @@ fn bench_compress(c: &mut Criterion) {
assert!(compressor.insert(Symbol::from_slice(b"abcdefgh"), 8));
let compressor = compressor.build();

b.iter(|| unsafe {
compressor.compress_into(&test_string, &mut output_buf);
})
b.iter(|| compressor.compress_into_uninit(&test_string, output_buf.spare_capacity_mut()))
});

group.bench_function("decompress", |b| {
Expand Down
2 changes: 1 addition & 1 deletion src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ impl CompressorBuilder {
);
let remaining_bytes = remaining_bytes as usize;

// Load the last `remaining_byte`s of data into a final world. We then replicate the loop above,
// Load the last `remaining_byte`s of data into a final word. We then replicate the loop above,
// but shift data out of this word rather than advancing an input pointer and potentially reading
// unowned memory
let mut bytes = [0u8; 8];
Expand Down
117 changes: 115 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,58 @@ impl Compressor {
}
}

#[inline(always)]
fn compress_word_safe(&self, word: u64, out_ptr: &mut [MaybeUninit<u8>]) -> (usize, usize) {
assert!(out_ptr.len() >= 2);

// Speculatively write the first byte of `word` at offset 1. This is necessary if it is an escape, and
// if it isn't, it will be overwritten anyway.
let first_byte = word as u8;
out_ptr[1].write(first_byte);

// First, check the two_bytes table
let code_twobyte = self.codes_two_byte[word as u16 as usize];

if code_twobyte.code() < self.has_suffix_code {
// 2 byte code without having to worry about longer matches.
out_ptr[0].write(code_twobyte.code());

// Advance input by symbol length (2) and output by a single code byte
(2, 1)
} else {
// Probe the hash table
let entry = self.lossy_pht.lookup(word);

// Now, downshift the `word` and the `entry` to see if they align.
let ignored_bits = entry.ignored_bits;
if entry.code != Code::UNUSED
&& compare_masked(word, entry.symbol.to_u64(), ignored_bits)
{
// Advance the input by the symbol length (variable) and the output by one code byte
// SAFETY: out_ptr is not null.
out_ptr[0].write(entry.code.code());
(entry.code.len() as usize, 1)
} else {
// SAFETY: out_ptr is not null
out_ptr[0].write(code_twobyte.code());

// Advance the input by the symbol length (variable) and the output by either 1
// byte (if was one-byte code) or two bytes (escape).
(
code_twobyte.len() as usize,
// Predicated version of:
//
// if entry.code >= 256 {
// 2
// } else {
// 1
// }
1 + (code_twobyte.extended_code() >> 8) as usize,
)
}
}
}

/// Compress many lines in bulk.
pub fn compress_bulk(&self, lines: &Vec<&[u8]>) -> Vec<Vec<u8>> {
let mut res = Vec::new();
Expand Down Expand Up @@ -721,7 +773,7 @@ impl Compressor {

let remaining_bytes = remaining_bytes as usize;

// Load the last `remaining_byte`s of data into a final world. We then replicate the loop above,
// Load the last `remaining_byte`s of data into a final word. We then replicate the loop above,
// but shift data out of this word rather than advancing an input pointer and potentially reading
// unowned memory.
let mut bytes = [0u8; 8];
Expand Down Expand Up @@ -761,16 +813,77 @@ impl Compressor {
unsafe { values.set_len(bytes_written as usize) };
}

/// Compress a plain value into a block of uninitialized memory.
///
/// Returns the number of bytes of memory that were initialized.
pub fn compress_into_uninit(&self, plaintext: &[u8], into: &mut [MaybeUninit<u8>]) -> usize {
let mut in_ptr = 0;
let mut out_ptr = 0;

loop {
let input = &plaintext[in_ptr..];
let output = &mut into[out_ptr..];

if input.len() < 8 || output.len() < 8 {
break;
}

let word = u64::from_le_bytes(input[..8].try_into().unwrap());
let (advance_in, advance_out) = self.compress_word_safe(word, output);
in_ptr += advance_in;
out_ptr += advance_out;
}

let remaining_bytes = plaintext.len() - in_ptr;
assert!(
out_ptr < into.len() || remaining_bytes == 0,
"output buffer sized too small"
);

// Load the last `remaining_byte`s of data into a final word. We then replicate the loop above,
// but shift data out of this word rather than advancing an input pointer and potentially reading
// unowned memory.
let mut bytes = [0u8; 8];
bytes[0..remaining_bytes].copy_from_slice(&plaintext[in_ptr..]);
let mut last_word = u64::from_le_bytes(bytes);

loop {
let output = &mut into[out_ptr..];

if in_ptr >= plaintext.len() || output.is_empty() {
break;
}

let (advance_in, advance_out) = self.compress_word_safe(last_word, output);

in_ptr += advance_in;
out_ptr += advance_out;
last_word = advance_8byte_word(last_word, advance_in);
}

assert!(
in_ptr >= plaintext.len(),
"exhausted output buffer before exhausting input, there is a bug in SymbolTable::compress()"
);

out_ptr
}

/// Use the symbol table to compress the plaintext into a sequence of codes and escapes.
pub fn compress(&self, plaintext: &[u8]) -> Vec<u8> {
if plaintext.is_empty() {
return Vec::new();
}

// Initialize a buffer sufficiently large to handle all plaintext.
let mut buffer = Vec::with_capacity(plaintext.len() * 2);

// SAFETY: the largest compressed size would be all escapes == 2*plaintext_len
Comment thread
a10y marked this conversation as resolved.
unsafe { self.compress_into(plaintext, &mut buffer) };
let initialized = self.compress_into_uninit(plaintext, buffer.spare_capacity_mut());

// SAFETY: `initialized` elements were initialized in the call above.
// TODO(aduffy): shrink_to_fit?
unsafe { buffer.set_len(initialized) };

buffer
}
Expand Down
Loading