Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions btp/src/dechunk.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{Header, CHUNK_DATA_SIZE, HEADER_SIZE};
use std::io::{self, Write};

#[derive(Debug, PartialEq, thiserror::Error)]
#[derive(Debug, thiserror::Error)]
pub enum DecodeError {
#[error("chunk too small, expected at least {}", HEADER_SIZE)]
HeaderTooSmall,
Expand Down Expand Up @@ -46,8 +46,8 @@ impl Chunk {
&self.chunk[..self.header.data_len as usize]
}

/// Parses raw bytes into a chunk
pub fn parse(data: &[u8]) -> Result<Self, DecodeError> {
/// decodes raw bytes into a chunk
pub fn decode(data: &[u8]) -> Result<Self, DecodeError> {
let (header_data, chunk_data) = data
.split_at_checked(HEADER_SIZE)
.ok_or(DecodeError::HeaderTooSmall)?;
Expand Down Expand Up @@ -135,8 +135,8 @@ impl Dechunker {
.unwrap_or(0.0)
}

/// Inserts a parsed chunk. Use this for multiple concurrent messages.
/// First parse with [`Chunk::parse()`], lookup decoder by message ID, then insert.
/// Inserts a chunk. Use this for multiple concurrent messages.
/// First decode with [`Chunk::decode()`], lookup decoder by message ID, then insert.
pub fn insert_chunk(&mut self, chunk: Chunk) -> Result<(), MessageIdError> {
let header = &chunk.header;

Expand Down Expand Up @@ -173,10 +173,10 @@ impl Dechunker {
Ok(())
}

/// Parses and inserts raw chunk data. Use this for single message at a time.
/// Decodes and inserts raw chunk data. Use this for single message at a time.
/// For multiple concurrent messages, use [`Chunk::parse()`] then [`Dechunker::insert_chunk()`].
pub fn receive(&mut self, data: &[u8]) -> Result<(), ReceiveError> {
let chunk_with_header = Chunk::parse(data)?;
let chunk_with_header = Chunk::decode(data)?;
self.insert_chunk(chunk_with_header)?;
Ok(())
}
Expand Down Expand Up @@ -209,17 +209,17 @@ impl Dechunker {
}
}

pub struct MasterDechunker<const N: usize> {
dechunkers: [Option<DechunkerSlot>; N],
counter: u64,
}

#[derive(Debug)]
struct DechunkerSlot {
dechunker: Dechunker,
last_used: u64,
}

pub struct MasterDechunker<const N: usize = 10> {
dechunkers: [Option<DechunkerSlot>; N],
counter: u64,
}

impl<const N: usize> Default for MasterDechunker<N> {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -253,16 +253,10 @@ impl<const N: usize> MasterDechunker<N> {
if let Some(empty_slot) = self.dechunkers.iter_mut().find(|slot| slot.is_none()) {
empty_slot
} else {
let lru_index = self
.dechunkers
.iter()
.enumerate()
.filter_map(|(i, slot)| slot.as_ref().map(|s| (i, s.last_used)))
.min_by_key(|(_, last_used)| *last_used)
.map(|(i, _)| i)
.expect("should find slot");

&mut self.dechunkers[lru_index]
self.dechunkers
.iter_mut()
.min_by_key(|d| d.as_ref().map(|d| d.last_used))
.expect("not empty")
};

let mut decoder = Dechunker::new();
Expand All @@ -279,6 +273,14 @@ impl<const N: usize> MasterDechunker<N> {
None
}
}

pub fn get_dechunker(&self, msg_id: u16) -> Option<&Dechunker> {
self.dechunkers
.iter()
.filter_map(|d| d.as_ref())
.find(|d| d.dechunker.info.map(|m| m.message_id) == Some(msg_id))
.map(|d| &d.dechunker)
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
84 changes: 42 additions & 42 deletions btp/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,26 +239,26 @@ fn reverse_order_decoding() {
}

#[test]
fn chunk_parse_and_insert() {
fn chunk_decode_and_insert() {
use crate::Chunk;

let data = b"Test data for parse and push";
let data = b"Test data for decode and push";
let chunks: Vec<_> = chunk(data).collect();

let mut dechunker = Dechunker::new();

for raw_chunk in &chunks {
let parsed = Chunk::parse(raw_chunk).unwrap();
dechunker.insert_chunk(parsed).unwrap();
let decoded = Chunk::decode(raw_chunk).unwrap();
dechunker.insert_chunk(decoded).unwrap();
}

assert_eq!(dechunker.data(), Some(data.to_vec()));
}

#[test]
fn chunk_parse_errors() {
fn chunk_decode_errors() {
let small_data = vec![0u8; HEADER_SIZE - 1];
let result = Chunk::parse(&small_data);
let result = Chunk::decode(&small_data);
assert!(matches!(result, Err(DecodeError::HeaderTooSmall)));
}

Expand All @@ -268,7 +268,7 @@ fn chunk_too_small() {
let mut raw_chunk = vec![0u8; HEADER_SIZE + 5];
raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes());

let result = Chunk::parse(&raw_chunk);
let result = Chunk::decode(&raw_chunk);
assert!(
matches!(
result,
Expand All @@ -287,22 +287,22 @@ fn chunk_too_large() {
let mut raw_chunk = vec![0u8; HEADER_SIZE + 255];
raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes());

let result = Chunk::parse(&raw_chunk);
let result = Chunk::decode(&raw_chunk);
assert!(
matches!(result, Err(DecodeError::ChunkTooLarge)),
"should fail when data_len exceeds maximum chunk size"
);
}

#[test]
fn chunk_parse_invalid_index() {
fn chunk_decode_invalid_index() {
use crate::{Chunk, DecodeError, HEADER_SIZE};

let header = crate::Header::new(1234, 10, 1, 5);
let mut raw_chunk = vec![0u8; HEADER_SIZE + 5];
raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes());

let result = Chunk::parse(&raw_chunk);
let result = Chunk::decode(&raw_chunk);
assert!(
matches!(
result,
Expand All @@ -325,10 +325,10 @@ fn insert_chunk_wrong_message_id() {

let mut dechunker = Dechunker::new();

let chunk1 = Chunk::parse(&chunks1[0]).unwrap();
let chunk1 = Chunk::decode(&chunks1[0]).unwrap();
dechunker.insert_chunk(chunk1).unwrap();

let chunk2 = Chunk::parse(&chunks2[0]).unwrap();
let chunk2 = Chunk::decode(&chunks2[0]).unwrap();
let result = dechunker.insert_chunk(chunk2);

assert!(
Expand All @@ -352,8 +352,8 @@ fn insert_chunk_out_of_order() {
let mut dechunker = Dechunker::new();

for (i, chunk) in chunks.iter().enumerate() {
let parsed = Chunk::parse(chunk).unwrap();
dechunker.insert_chunk(parsed).unwrap();
let decoded = Chunk::decode(chunk).unwrap();
dechunker.insert_chunk(decoded).unwrap();

let expected_progress = (i + 1) as f32 / original_count as f32;
assert!(
Expand All @@ -376,7 +376,7 @@ fn insert_duplicate_chunks() {

let mut dechunker = Dechunker::new();

let chunk0 = Chunk::parse(&chunks[0]).unwrap();
let chunk0 = Chunk::decode(&chunks[0]).unwrap();

dechunker.insert_chunk(chunk0).unwrap();
dechunker.insert_chunk(chunk0).unwrap();
Expand All @@ -388,8 +388,8 @@ fn insert_duplicate_chunks() {
);

for chunk in &chunks[1..] {
let parsed = Chunk::parse(chunk).unwrap();
dechunker.insert_chunk(parsed).unwrap();
let decoded = Chunk::decode(chunk).unwrap();
dechunker.insert_chunk(decoded).unwrap();
}

assert_eq!(dechunker.data(), Some(data.to_vec()));
Expand All @@ -403,8 +403,8 @@ fn master_dechunker_basic() {
let mut master = MasterDechunker::<10>::default();

for (i, chunk) in chunks.iter().enumerate() {
let parsed = Chunk::parse(chunk).unwrap();
let result = master.insert_chunk(parsed);
let decoded = Chunk::decode(chunk).unwrap();
let result = master.insert_chunk(decoded);

if i == chunks.len() - 1 {
assert_eq!(
Expand Down Expand Up @@ -446,8 +446,8 @@ fn master_dechunker_multiple_messages() {
let mut completed = Vec::new();

for (msg_id, chunk) in all_chunks {
let parsed = Chunk::parse(chunk).unwrap();
if let Some(data) = master.insert_chunk(parsed) {
let decoded = Chunk::decode(chunk).unwrap();
if let Some(data) = master.insert_chunk(decoded) {
completed.push((msg_id, data));
}
}
Expand Down Expand Up @@ -489,24 +489,24 @@ fn master_dechunker_lru_eviction() {
"Message 3 should require multiple chunks"
);

let parsed1_0 = Chunk::parse(&chunks1[0]).unwrap();
let parsed2_0 = Chunk::parse(&chunks2[0]).unwrap();
let parsed2_1 = Chunk::parse(&chunks2[1]).unwrap();
let parsed3_0 = Chunk::parse(&chunks3[0]).unwrap();
let decoded1_0 = Chunk::decode(&chunks1[0]).unwrap();
let decoded2_0 = Chunk::decode(&chunks2[0]).unwrap();
let decoded2_1 = Chunk::decode(&chunks2[1]).unwrap();
let decoded3_0 = Chunk::decode(&chunks3[0]).unwrap();

master.insert_chunk(parsed1_0);
master.insert_chunk(parsed2_0);
master.insert_chunk(decoded1_0);
master.insert_chunk(decoded2_0);

master.insert_chunk(parsed2_1);
master.insert_chunk(decoded2_1);

let result = master.insert_chunk(parsed3_0);
let result = master.insert_chunk(decoded3_0);
assert_eq!(
result, None,
"Third message should succeed by evicting LRU slot"
);

let parsed1_0_again = Chunk::parse(&chunks1[0]).unwrap();
let result = master.insert_chunk(parsed1_0_again);
let decoded1_0_again = Chunk::decode(&chunks1[0]).unwrap();
let result = master.insert_chunk(decoded1_0_again);
assert_eq!(
result, None,
"Message 1 should start fresh after being evicted"
Expand All @@ -520,8 +520,8 @@ fn master_dechunker_single_chunk_message() {
assert_eq!(chunks.len(), 1, "Small message should be single chunk");

let mut master = MasterDechunker::<10>::default();
let parsed = Chunk::parse(&chunks[0]).unwrap();
let result = master.insert_chunk(parsed);
let decoded = Chunk::decode(&chunks[0]).unwrap();
let result = master.insert_chunk(decoded);

assert_eq!(
result,
Expand All @@ -539,9 +539,9 @@ fn streaming_dechunker_memory_usage() {
let mut output = Vec::new();
let mut streaming = StreamDechunker::new(&mut output);

let chunk0 = Chunk::parse(&chunks[0]).unwrap();
let chunk1 = Chunk::parse(&chunks[1]).unwrap();
let chunk2 = Chunk::parse(&chunks[2]).unwrap();
let chunk0 = Chunk::decode(&chunks[0]).unwrap();
let chunk1 = Chunk::decode(&chunks[1]).unwrap();
let chunk2 = Chunk::decode(&chunks[2]).unwrap();

let complete = streaming.insert_chunk(chunk0).unwrap();
assert!(!complete, "Should not be complete after first chunk");
Expand Down Expand Up @@ -591,8 +591,8 @@ fn streaming_dechunker_memory_usage() {
);

for chunk in &chunks[3..] {
let parsed = Chunk::parse(chunk).unwrap();
streaming.insert_chunk(parsed).unwrap();
let decoded = Chunk::decode(chunk).unwrap();
streaming.insert_chunk(decoded).unwrap();
}

assert!(streaming.is_complete(), "Message should be complete");
Expand Down Expand Up @@ -621,8 +621,8 @@ fn streaming_dechunker_reverse_order() {
let mut streaming = StreamDechunker::new(&mut output);

for chunk in chunks.iter().rev() {
let parsed = Chunk::parse(chunk).unwrap();
streaming.insert_chunk(parsed).unwrap();
let decoded = Chunk::decode(chunk).unwrap();
streaming.insert_chunk(decoded).unwrap();
}

assert_eq!(
Expand All @@ -647,8 +647,8 @@ fn streaming_dechunker_memory_efficiency() {
let mut streaming = StreamDechunker::new(&mut output);

for (i, chunk) in chunks.iter().enumerate() {
let parsed = Chunk::parse(chunk).unwrap();
streaming.insert_chunk(parsed).unwrap();
let decoded = Chunk::decode(chunk).unwrap();
streaming.insert_chunk(decoded).unwrap();

let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count();
assert_eq!(
Expand Down