Skip to content

Commit ddd87f2

Browse files
Made decompression work across row boundaries (#5)
1 parent 5aaccd3 commit ddd87f2

File tree

13 files changed

+878
-504
lines changed

13 files changed

+878
-504
lines changed

src/error.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use crate::proto::stream::Kind;
2+
3+
#[derive(Debug, Clone)]
4+
pub enum Error {
5+
OutOfSpec,
6+
RleLiteralTooLarge,
7+
InvalidUtf8,
8+
InvalidColumn(u32),
9+
InvalidKind(u32, Kind),
10+
DecodeFloat,
11+
Decompression,
12+
InvalidProto,
13+
}
14+
15+
impl From<prost::DecodeError> for Error {
16+
fn from(_: prost::DecodeError) -> Self {
17+
Self::InvalidProto
18+
}
19+
}
20+
21+
impl From<std::io::Error> for Error {
22+
fn from(_: std::io::Error) -> Self {
23+
Self::OutOfSpec
24+
}
25+
}

src/lib.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,5 @@
1-
use proto::stream::Kind;
2-
1+
pub mod error;
32
pub mod proto;
43
pub mod read;
54

65
pub use fallible_streaming_iterator;
7-
8-
#[derive(Debug, Clone)]
9-
pub enum Error {
10-
OutOfSpec,
11-
RleLiteralTooLarge,
12-
InvalidUtf8,
13-
InvalidColumn(u32, Kind),
14-
}
15-
16-
impl From<prost::DecodeError> for Error {
17-
fn from(_: prost::DecodeError) -> Self {
18-
Self::OutOfSpec
19-
}
20-
}
21-
22-
impl From<std::io::Error> for Error {
23-
fn from(_: std::io::Error) -> Self {
24-
Self::OutOfSpec
25-
}
26-
}

src/read/decode/boolean_rle.rs

Lines changed: 102 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,84 @@
1-
use crate::Error;
1+
use std::io::Read;
2+
3+
use crate::error::Error;
4+
5+
use super::read_u8;
26

37
#[derive(Debug, Copy, Clone, PartialEq)]
4-
pub enum BooleanRun<'a> {
8+
#[allow(clippy::large_enum_variant)]
9+
pub enum BooleanRun {
510
Run(u8, u16),
6-
Literals(&'a [u8]),
11+
Literals([u8; 255]),
712
}
813

9-
pub struct BooleanRleRunIter<'a> {
10-
stream: &'a [u8],
14+
pub struct BooleanRleRunIter<'a, R: Read> {
15+
reader: &'a mut R,
1116
}
1217

13-
impl<'a> BooleanRleRunIter<'a> {
14-
pub fn new(stream: &'a [u8]) -> Self {
15-
Self { stream }
18+
impl<'a, R: Read> BooleanRleRunIter<'a, R> {
19+
pub fn new(reader: &'a mut R) -> Self {
20+
Self { reader }
1621
}
1722
}
1823

19-
impl<'a> Iterator for BooleanRleRunIter<'a> {
20-
type Item = Result<BooleanRun<'a>, Error>;
24+
fn read_literals<R: Read>(reader: &mut R, header: i8) -> Result<[u8; 255], Error> {
25+
let length = (-header) as usize;
26+
27+
let mut literals = [0u8; 255];
28+
29+
reader
30+
.take(length as u64)
31+
.read_exact(&mut literals[..length])?;
32+
33+
Ok(literals)
34+
}
35+
36+
impl<'a, R: Read> Iterator for BooleanRleRunIter<'a, R> {
37+
type Item = Result<BooleanRun, Error>;
2138

2239
#[inline]
2340
fn next(&mut self) -> Option<Self::Item> {
24-
let header = *self.stream.first()?;
25-
self.stream = &self.stream[1..];
26-
let header = i8::from_le_bytes([header]);
41+
let header = read_u8(self.reader);
42+
let header = match header {
43+
Ok(header) => header as i8,
44+
Err(e) => return Some(Err(e.into())),
45+
};
2746
if header < 0 {
28-
let length = (-header) as usize;
29-
if length > self.stream.len() {
30-
return Some(Err(Error::RleLiteralTooLarge));
31-
}
32-
let (literals, remaining) = self.stream.split_at(length);
33-
self.stream = remaining;
34-
Some(Ok(BooleanRun::Literals(literals)))
47+
Some(read_literals(self.reader, header).map(BooleanRun::Literals))
3548
} else {
3649
let length = header as u16 + 3;
37-
let value = self.stream[0];
38-
self.stream = &self.stream[1..];
50+
// this is not ok - it may require more than one byte
51+
let value = read_u8(self.reader);
52+
let value = match value {
53+
Ok(value) => value,
54+
Err(e) => return Some(Err(e.into())),
55+
};
3956
Some(Ok(BooleanRun::Run(value, length)))
4057
}
4158
}
4259
}
4360

44-
pub struct BooleanIter<'a> {
45-
iter: BooleanRleRunIter<'a>,
46-
current: Option<BooleanRun<'a>>,
61+
pub struct BooleanIter<'a, R: Read> {
62+
iter: BooleanRleRunIter<'a, R>,
63+
current: Option<BooleanRun>,
4764
position: u8,
65+
byte_position: usize,
4866
remaining: usize,
4967
}
5068

51-
impl<'a> BooleanIter<'a> {
52-
pub fn new(stream: &'a [u8], length: usize) -> Self {
69+
impl<'a, R: Read> BooleanIter<'a, R> {
70+
pub fn new(reader: &'a mut R, length: usize) -> Self {
5371
Self {
54-
iter: BooleanRleRunIter::new(stream),
72+
iter: BooleanRleRunIter::new(reader),
5573
current: None,
5674
position: 0,
75+
byte_position: 0,
5776
remaining: length,
5877
}
5978
}
6079
}
6180

62-
impl<'a> Iterator for BooleanIter<'a> {
81+
impl<'a, R: Read> Iterator for BooleanIter<'a, R> {
6382
type Item = Result<bool, Error>;
6483

6584
#[inline]
@@ -89,7 +108,7 @@ impl<'a> Iterator for BooleanIter<'a> {
89108
}
90109
BooleanRun::Literals(bytes) => {
91110
let mask = 128u8 >> self.position;
92-
let result = bytes[0] & mask == mask;
111+
let result = bytes[self.byte_position] & mask == mask;
93112
self.position += 1;
94113
if self.remaining == 0 {
95114
self.current = None;
@@ -100,8 +119,9 @@ impl<'a> Iterator for BooleanIter<'a> {
100119
if self.position == 8 {
101120
if bytes.len() == 1 {
102121
self.current = None;
122+
self.byte_position = 0;
103123
} else {
104-
self.current = Some(BooleanRun::Literals(&bytes[1..]));
124+
self.byte_position += 1;
105125
}
106126
self.position = 0;
107127
}
@@ -128,3 +148,54 @@ impl<'a> Iterator for BooleanIter<'a> {
128148
(self.remaining, Some(self.remaining))
129149
}
130150
}
151+
152+
#[cfg(test)]
153+
mod test {
154+
use super::*;
155+
156+
#[test]
157+
fn basic() {
158+
let data = [0x61u8, 0x00];
159+
160+
let data = &mut data.as_ref();
161+
162+
let iter = BooleanIter::new(data, 100)
163+
.collect::<Result<Vec<_>, Error>>()
164+
.unwrap();
165+
assert_eq!(iter, vec![false; 100])
166+
}
167+
168+
#[test]
169+
fn literals() {
170+
let data = [0xfeu8, 0b01000100, 0b01000101];
171+
172+
let data = &mut data.as_ref();
173+
174+
let iter = BooleanIter::new(data, 16)
175+
.collect::<Result<Vec<_>, Error>>()
176+
.unwrap();
177+
assert_eq!(
178+
iter,
179+
vec![
180+
false, true, false, false, false, true, false, false, // 0b01000100
181+
false, true, false, false, false, true, false, true, // 0b01000101
182+
]
183+
)
184+
}
185+
186+
#[test]
187+
fn another() {
188+
// "For example, the byte sequence [0xff, 0x80] would be one true followed by seven false values."
189+
let data = [0xff, 0x80];
190+
191+
let data = &mut data.as_ref();
192+
193+
let iter = BooleanIter::new(data, 8)
194+
.collect::<Result<Vec<_>, Error>>()
195+
.unwrap();
196+
assert_eq!(
197+
iter,
198+
vec![true, false, false, false, false, false, false, false,]
199+
)
200+
}
201+
}

src/read/decode/float.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use crate::error::Error;
2+
3+
/// Sealead trait to generically represent f32 and f64.
4+
pub trait Float: Default + Copy + private::Sealed {
5+
type Bytes: AsRef<[u8]> + AsMut<[u8]> + Default;
6+
fn from_le_bytes(bytes: Self::Bytes) -> Self;
7+
}
8+
9+
mod private {
10+
pub trait Sealed {} // Users in other crates cannot name this trait.
11+
impl Sealed for f32 {}
12+
impl Sealed for f64 {}
13+
}
14+
15+
impl Float for f32 {
16+
type Bytes = [u8; 4];
17+
18+
#[inline]
19+
fn from_le_bytes(bytes: Self::Bytes) -> Self {
20+
Self::from_le_bytes(bytes)
21+
}
22+
}
23+
24+
impl Float for f64 {
25+
type Bytes = [u8; 8];
26+
27+
#[inline]
28+
fn from_le_bytes(bytes: Self::Bytes) -> Self {
29+
Self::from_le_bytes(bytes)
30+
}
31+
}
32+
33+
/// An iterator
34+
pub struct FloatIter<'a, T: Float, R: std::io::Read> {
35+
reader: &'a mut R,
36+
remaining: usize,
37+
phantom: std::marker::PhantomData<T>,
38+
}
39+
40+
impl<'a, T: Float, R: std::io::Read> FloatIter<'a, T, R> {
41+
#[inline]
42+
pub fn new(reader: &'a mut R, length: usize) -> Self {
43+
Self {
44+
reader,
45+
remaining: length,
46+
phantom: Default::default(),
47+
}
48+
}
49+
50+
#[inline]
51+
pub fn len(&self) -> usize {
52+
self.remaining
53+
}
54+
55+
#[must_use]
56+
pub fn is_empty(&self) -> bool {
57+
self.len() == 0
58+
}
59+
}
60+
61+
impl<'a, T: Float, R: std::io::Read> Iterator for FloatIter<'a, T, R> {
62+
type Item = Result<T, Error>;
63+
64+
#[inline]
65+
fn next(&mut self) -> Option<Self::Item> {
66+
if self.remaining == 0 {
67+
return None;
68+
}
69+
let mut chunk: T::Bytes = Default::default();
70+
let error = self.reader.read_exact(chunk.as_mut());
71+
if error.is_err() {
72+
return Some(Err(Error::DecodeFloat));
73+
};
74+
self.remaining -= 1;
75+
Some(Ok(T::from_le_bytes(chunk)))
76+
}
77+
78+
#[inline]
79+
fn size_hint(&self) -> (usize, Option<usize>) {
80+
let remaining = self.len();
81+
(remaining, Some(remaining))
82+
}
83+
}

src/read/decode/mod.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
mod boolean_rle;
2+
mod float;
23
mod rle_v2;
4+
mod variable_length;
35

46
pub use boolean_rle::{BooleanIter, BooleanRleRunIter, BooleanRun};
7+
pub use float::FloatIter;
58
pub use rle_v2::IteratorEnum;
69
pub use rle_v2::{SignedRleV2Iter, SignedRleV2Run, UnsignedRleV2Iter, UnsignedRleV2Run};
10+
pub use variable_length::Values;
711

8-
pub fn deserialize_f32(stream: &[u8]) -> impl Iterator<Item = f32> + '_ {
9-
stream
10-
.chunks_exact(4)
11-
.map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
12-
}
13-
14-
pub fn deserialize_f64(stream: &[u8]) -> impl Iterator<Item = f64> + '_ {
15-
stream
16-
.chunks_exact(8)
17-
.map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
12+
#[inline]
13+
fn read_u8<R: std::io::Read>(reader: &mut R) -> Result<u8, std::io::Error> {
14+
let mut buf = [0; 1];
15+
reader.read_exact(&mut buf)?;
16+
Ok(buf[0])
1817
}

0 commit comments

Comments
 (0)