Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ fn _bench_copy<T: Columnar+Eq>(bencher: &mut Bencher, record: T) where T::Contai
for _ in 0 .. 1024 {
arena.push(&record);
}
use columnar::{AsBytes, Container};
bencher.bytes = Sequence::length_in_bytes(arena.borrow().as_bytes()) as u64;
use columnar::Container;
bencher.bytes = Sequence::length_in_bytes(&arena.borrow()) as u64;
arena.clear();

bencher.iter(|| {
Expand Down
10 changes: 5 additions & 5 deletions benches/serde.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bencher::{benchmark_group, benchmark_main, Bencher};
use columnar::{Columnar, Container, Clear, AsBytes, FromBytes};
use columnar::{Columnar, Container, Clear, FromBytes};
use columnar::bytes::{EncodeDecode, Sequence};
use serde::{Serialize, Deserialize};

Expand All @@ -19,7 +19,7 @@ fn goser_push(b: &mut Bencher) {
container.push(&log);
}
let mut words = vec![];
Sequence::encode(&mut words, container.borrow().as_bytes());
Sequence::encode(&mut words, &container.borrow());
b.bytes = 8 * words.len() as u64;
b.iter(|| {
container.clear();
Expand Down Expand Up @@ -50,11 +50,11 @@ fn goser_encode(b: &mut Bencher) {
container.push(&log);
}
let mut words = vec![];
Sequence::encode(&mut words, container.borrow().as_bytes());
Sequence::encode(&mut words, &container.borrow());
b.bytes = 8 * words.len() as u64;
b.iter(|| {
words.clear();
Sequence::encode(&mut words, container.borrow().as_bytes());
Sequence::encode(&mut words, &container.borrow());
bencher::black_box(&words);
});
}
Expand All @@ -67,7 +67,7 @@ fn goser_decode(b: &mut Bencher) {
for _ in 0..1024 {
container.push(&log);
}
Sequence::encode(&mut words, container.borrow().as_bytes());
Sequence::encode(&mut words, &container.borrow());
b.bytes = 8 * words.len() as u64;
b.iter(|| {
let mut slices = Sequence::decode(&mut words);
Expand Down
185 changes: 174 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,18 +476,20 @@ pub mod common {
/// The methods here line up with the `AsBytes` and `FromBytes` traits.
pub mod bytes {

use crate::AsBytes;

/// A coupled encode/decode pair for byte sequences.
pub trait EncodeDecode {
/// Encoded length in number of `u64` words required.
fn length_in_words<'a, I>(bytes: I) -> usize where I : Iterator<Item=(u64, &'a [u8])>;
fn length_in_words<'a, A>(bytes: &A) -> usize where A : AsBytes<'a>;
/// Encoded length in number of `u8` bytes required.
///
/// This method should always be eight times `Self::length_in_words`, and is provided for convenience and clarity.
fn length_in_bytes<'a, I>(bytes: I) -> usize where I : Iterator<Item=(u64, &'a [u8])> { 8 * Self::length_in_words(bytes) }
fn length_in_bytes<'a, A>(bytes: &A) -> usize where A : AsBytes<'a> { 8 * Self::length_in_words(bytes) }
/// Encodes `bytes` into a sequence of `u64`.
fn encode<'a, I>(store: &mut Vec<u64>, bytes: I) where I : Iterator<Item=(u64, &'a [u8])>;
fn encode<'a, A>(store: &mut Vec<u64>, bytes: &A) where A : AsBytes<'a>;
/// Writes `bytes` in the encoded format to an arbitrary writer.
fn write<'a, I, W: std::io::Write>(writer: W, bytes: I) -> std::io::Result<()> where I : Iterator<Item=(u64, &'a [u8])>;
fn write<'a, A, W: std::io::Write>(writer: W, bytes: &A) -> std::io::Result<()> where A : AsBytes<'a>;
/// Decodes bytes from a sequence of `u64`.
fn decode<'a>(store: &'a [u64]) -> impl Iterator<Item=&'a [u8]>;
}
Expand All @@ -499,18 +501,20 @@ pub mod bytes {
pub use serialization::Sequence;
mod serialization {

use crate::AsBytes;

/// Encodes and decodes bytes sequences, by prepending the length and appending the all sequences.
pub struct Sequence;
impl super::EncodeDecode for Sequence {
fn length_in_words<'a, I>(bytes: I) -> usize where I : Iterator<Item=(u64, &'a [u8])> {
fn length_in_words<'a, A>(bytes: &A) -> usize where A : AsBytes<'a> {
// Each byte slice has one `u64` for the length, and then as many `u64`s as needed to hold all bytes.
bytes.map(|(_align, bytes)| 1 + (bytes.len() + 7)/8).sum()
bytes.as_bytes().map(|(_align, bytes)| 1 + (bytes.len() + 7)/8).sum()
}
fn encode<'a, I>(store: &mut Vec<u64>, bytes: I) where I : Iterator<Item=(u64, &'a [u8])> {
encode(store, bytes)
fn encode<'a, A>(store: &mut Vec<u64>, bytes: &A) where A : AsBytes<'a> {
encode(store, bytes.as_bytes())
}
fn write<'a, I, W: std::io::Write>(writer: W, bytes: I) -> std::io::Result<()> where I : Iterator<Item=(u64, &'a [u8])> {
write(writer, bytes)
fn write<'a, A, W: std::io::Write>(writer: W, bytes: &A) -> std::io::Result<()> where A : AsBytes<'a> {
write(writer, bytes.as_bytes())
}
fn decode<'a>(store: &'a [u64]) -> impl Iterator<Item=&'a [u8]> {
decode(store)
Expand Down Expand Up @@ -598,6 +602,166 @@ pub mod bytes {
}
}

/// A binary encoding of sequences of byte slices.
///
/// The encoding starts with a sequence of n+1 offsets describing where to find the n slices in the bytes that follow.
/// Treating the offsets as a byte slice too, the each offset indicates the location (in bytes) of the end of its slice.
/// Each byte slice can be found from a pair of adjacent offsets, where the first is rounded up to a multiple of eight.
pub use serialization_neu::Indexed;
pub mod serialization_neu {

use crate::AsBytes;

/// Encodes and decodes bytes sequences, using an index of offsets.
pub struct Indexed;
impl super::EncodeDecode for Indexed {
fn length_in_words<'a, A>(bytes: &A) -> usize where A : AsBytes<'a> {
1 + bytes.as_bytes().map(|(_align, bytes)| 1 + (bytes.len() + 7)/8).sum::<usize>()
}
fn encode<'a, A>(store: &mut Vec<u64>, bytes: &A) where A : AsBytes<'a> {
encode(store, bytes)
}
fn write<'a, A, W: std::io::Write>(writer: W, bytes: &A) -> std::io::Result<()> where A : AsBytes<'a> {
write(writer, bytes)
}
fn decode<'a>(store: &'a [u64]) -> impl Iterator<Item=&'a [u8]> {
decode(store)
}
}

/// Encodes `item` into `u64` aligned words.
///
/// The sequence of byte slices are appended, with padding to have each slice start `u64` aligned.
/// The sequence is then pre-pended with as many byte offsets as there are slices in `item`, plus one.
/// The byte offsets indicate where each slice ends, and by rounding up to `u64` alignemnt where the next slice begins.
/// The first offset indicates where the list of offsets itself ends, and where the first slice begins.
///
/// We will need to visit `as_bytes` three times to extract this information, so the method should be efficient and inlined.
/// The first read writes the first offset, the second writes each other offset, and the third writes the bytes themselves.
///
/// The offsets are zero-based, rather than based on `store.len()`.
/// If you call the method with a non-empty `store` be careful decoding.
pub fn encode<'a, A>(store: &mut Vec<u64>, iter: &A)
where A : AsBytes<'a>,
{
// Read 1: Number of offsets we will record, equal to the number of slices plus one.
// TODO: right-size `store` before first call to `push`.
let offsets = 1 + iter.as_bytes().count();
let offsets_end: u64 = TryInto::<u64>::try_into((offsets) * std::mem::size_of::<u64>()).unwrap();
store.push(offsets_end);
// Read 2: Establish each of the offsets based on lengths of byte slices.
let mut position_bytes = offsets_end;
for (align, bytes) in iter.as_bytes() {
assert!(align <= 8);
// Write length in bytes, but round up to words before updating `position_bytes`.
let to_push: u64 = position_bytes + TryInto::<u64>::try_into(bytes.len()).unwrap();
store.push(to_push);
let round_len: u64 = ((bytes.len() + 7) & !7).try_into().unwrap();
position_bytes += round_len;
}
// Read 3: Append each byte slice, with padding to align starts to `u64`.
for (_align, bytes) in iter.as_bytes() {
let whole_words = 8 * (bytes.len() / 8);
// We want to extend `store` by `bytes`, but `bytes` may not be `u64` aligned.
// In the latter case, init `store` and cast and copy onto it as a byte slice.
if let Ok(words) = bytemuck::try_cast_slice(&bytes[.. whole_words]) {
store.extend_from_slice(words);
}
else {
let store_len = store.len();
store.resize(store_len + whole_words/8, 0);
let slice = bytemuck::try_cast_slice_mut(&mut store[store_len..]).expect("&[u64] should convert to &[u8]");
slice.copy_from_slice(&bytes[.. whole_words]);
}
let remaining_bytes = &bytes[whole_words..];
if !remaining_bytes.is_empty() {
let mut remainder = 0u64;
let transmute: &mut [u8] = bytemuck::try_cast_slice_mut(std::slice::from_mut(&mut remainder)).expect("&[u64] should convert to &[u8]");
for (i, byte) in remaining_bytes.iter().enumerate() {
transmute[i] = *byte;
}
store.push(remainder);
}
}
}

pub fn write<'a, A, W>(mut writer: W, iter: &A) -> std::io::Result<()>
where
A: AsBytes<'a>,
W: std::io::Write,
{
// Read 1: Number of offsets we will record, equal to the number of slices plus one.
let offsets = 1 + iter.as_bytes().count();
let offsets_end: u64 = TryInto::<u64>::try_into((offsets) * std::mem::size_of::<u64>()).unwrap();
writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&offsets_end)))?;
// Read 2: Establish each of the offsets based on lengths of byte slices.
let mut position_bytes = offsets_end;
for (align, bytes) in iter.as_bytes() {
assert!(align <= 8);
// Write length in bytes, but round up to words before updating `position_bytes`.
let to_push: u64 = position_bytes + TryInto::<u64>::try_into(bytes.len()).unwrap();
writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&to_push)))?;
let round_len: u64 = ((bytes.len() + 7) & !7).try_into().unwrap();
position_bytes += round_len;
}
// Read 3: Append each byte slice, with padding to align starts to `u64`.
for (_align, bytes) in iter.as_bytes() {
writer.write_all(bytes)?;
let padding = ((bytes.len() + 7) & !7) - bytes.len();
if padding > 0 {
writer.write_all(&[0u8;8][..padding])?;
}
}

Ok(())
}

/// Decodes an encoded sequence of byte slices. Each result will be `u64` aligned.
pub fn decode(store: &[u64]) -> impl Iterator<Item=&[u8]> {
assert!(store[0] % 8 == 0);
let slices = (store[0] / 8) - 1;
(0 .. slices).map(|i| decode_index(store, i))
}

/// Decodes a specific byte slice by index. It will be `u64` aligned.
#[inline(always)]
pub fn decode_index(store: &[u64], index: u64) -> &[u8] {
debug_assert!(index + 1 < store[0]/8);
let index: usize = index.try_into().unwrap();
let lower: usize = ((store[index] + 7) & !7).try_into().unwrap();
let upper: usize = (store[index + 1]).try_into().unwrap();
let bytes: &[u8] = bytemuck::try_cast_slice(store).expect("&[u64] should convert to &[u8]");
&bytes[lower .. upper]
}

#[cfg(test)]
mod test {

use crate::{Columnar, Container};
use crate::common::Push;
use crate::AsBytes;

use super::{encode, decode};

fn assert_roundtrip<'a, AB: AsBytes<'a>>(item: &AB) {
let mut store = Vec::new();
encode(&mut store, item);
assert!(item.as_bytes().map(|x| x.1).eq(decode(&store)));
}

#[test]
fn round_trip() {

let mut column: <Result<u64, String> as Columnar>::Container = Default::default();
for i in 0..10000u64 {
column.push(&Ok::<u64, String>(i));
column.push(&Err::<u64, String>(format!("{:?}", i)));
}

assert_roundtrip(&column.borrow());
}
}
}

#[cfg(test)]
mod test {
Expand Down Expand Up @@ -635,7 +799,6 @@ pub mod bytes {
}
}
}

}

/// Types that prefer to be represented by `Vec<T>`.
Expand Down