From 5e9526e641e0e8df886ebb5ad7674749cc63910d Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 08:31:29 -0400 Subject: [PATCH 01/10] master dechunker --- btp/src/dechunk.rs | 61 ++++++++++++++++++++- btp/src/tests.rs | 132 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 191 insertions(+), 2 deletions(-) diff --git a/btp/src/dechunk.rs b/btp/src/dechunk.rs index 0af963d..0275a6d 100644 --- a/btp/src/dechunk.rs +++ b/btp/src/dechunk.rs @@ -1,6 +1,6 @@ use crate::{Header, CHUNK_DATA_SIZE, HEADER_SIZE}; -#[derive(Debug, thiserror::Error)] +#[derive(Debug, Copy, Clone, thiserror::Error)] pub enum DecodeError { #[error("chunk too small, expected at least {}", HEADER_SIZE)] HeaderTooSmall, @@ -207,3 +207,62 @@ impl Dechunker { Some(result) } } + +#[derive(Debug, Copy, Clone, thiserror::Error)] +pub enum InsertBytesError { + #[error(transparent)] + Parse(#[from] DecodeError), + #[error("no slots found")] + NoSlots, +} + +pub struct MasterDechunker { + pub dechunkers: [Option; N], +} + +impl Default for MasterDechunker { + fn default() -> Self { + Self { + dechunkers: std::array::from_fn(|_| None), + } + } +} + +impl MasterDechunker { + pub fn insert_bytes(&mut self, data: &[u8]) -> Result>, InsertBytesError> { + let chunk = Chunk::parse(data)?; + let message_id = chunk.header.message_id; + + for decoder_slot in &mut self.dechunkers { + if let Some(decoder) = decoder_slot { + if decoder.message_id() == Some(message_id) { + // Safe: message IDs match + decoder.insert_chunk(chunk).unwrap(); + + return if decoder.is_complete() { + Ok(decoder_slot.take().unwrap().data()) + } else { + Ok(None) + }; + } + } + } + + let empty_slot = self + .dechunkers + .iter_mut() + .find(|slot| slot.is_none()) + .ok_or(InsertBytesError::NoSlots)?; + + let mut decoder = Dechunker::new(); + // Safe: new decoder, no ID conflict + decoder.insert_chunk(chunk).unwrap(); + + if decoder.is_complete() { + Ok(decoder.data()) + } else { + *empty_slot = Some(decoder); + Ok(None) + } + } +} diff --git a/btp/src/tests.rs b/btp/src/tests.rs index 7b4f141..87834b5 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -1,5 +1,5 @@ use crate::{ - chunk, Chunk, Dechunker, DecodeError, MessageIdError, APP_MTU, CHUNK_DATA_SIZE, HEADER_SIZE, + chunk, Chunk, Dechunker, DecodeError, InsertBytesError, MasterDechunker, MessageIdError, APP_MTU, CHUNK_DATA_SIZE, HEADER_SIZE, }; use rand::{seq::SliceRandom, Rng, RngCore}; @@ -393,3 +393,133 @@ fn insert_duplicate_chunks() { assert_eq!(dechunker.data(), Some(data.to_vec())); } + +#[test] +fn master_dechunker_basic() { + let data = b"Test data for master dechunker"; + let chunks: Vec<_> = chunk(data).collect(); + + let mut master = MasterDechunker::<10>::default(); + + for (i, chunk) in chunks.iter().enumerate() { + let result = master.insert_bytes(chunk).unwrap(); + + if i == chunks.len() - 1 { + assert_eq!(result, Some(data.to_vec()), "Last chunk should return completed data"); + } else { + assert_eq!(result, None, "Intermediate chunks should return None"); + } + } +} + +#[test] +fn master_dechunker_multiple_messages() { + let data1 = b"First message data"; + let data2 = b"Second message data"; + let data3 = b"Third message data"; + + let chunks1: Vec<_> = chunk(data1).collect(); + let chunks2: Vec<_> = chunk(data2).collect(); + let chunks3: Vec<_> = chunk(data3).collect(); + + let mut master = MasterDechunker::<3>::default(); + + let mut all_chunks = Vec::new(); + for i in 0..chunks1.len().max(chunks2.len()).max(chunks3.len()) { + if i < chunks1.len() { + all_chunks.push((1, &chunks1[i])); + } + if i < chunks2.len() { + all_chunks.push((2, &chunks2[i])); + } + if i < chunks3.len() { + all_chunks.push((3, &chunks3[i])); + } + } + + let mut completed = Vec::new(); + + for (msg_id, chunk) in all_chunks { + if let Some(data) = master.insert_bytes(chunk).unwrap() { + completed.push((msg_id, data)); + } + } + + assert_eq!(completed.len(), 3, "All three messages should complete"); + + for (msg_id, data) in completed { + match msg_id { + 1 => assert_eq!(data, data1.to_vec(), "Message 1 data should match"), + 2 => assert_eq!(data, data2.to_vec(), "Message 2 data should match"), + 3 => assert_eq!(data, data3.to_vec(), "Message 3 data should match"), + _ => panic!("Unexpected message ID: {}", msg_id), + } + } +} + +#[test] +fn master_dechunker_slot_exhaustion() { + let mut master = MasterDechunker::<2>::default(); + + let data1 = vec![1u8; 1000]; + let data2 = vec![2u8; 1000]; + let data3 = vec![3u8; 1000]; + + let chunks1: Vec<_> = chunk(&data1).collect(); + let chunks2: Vec<_> = chunk(&data2).collect(); + let chunks3: Vec<_> = chunk(&data3).collect(); + + assert!(chunks1.len() > 1, "Message 1 should require multiple chunks"); + assert!(chunks2.len() > 1, "Message 2 should require multiple chunks"); + assert!(chunks3.len() > 1, "Message 3 should require multiple chunks"); + + master.insert_bytes(&chunks1[0]).unwrap(); + master.insert_bytes(&chunks2[0]).unwrap(); + + let result = master.insert_bytes(&chunks3[0]); + assert!(matches!(result, Err(InsertBytesError::NoSlots)), "Third message should fail with NoSlots error"); +} +#[test] +fn master_dechunker_custom_size() { + let mut master = MasterDechunker::<5>::default(); + + let messages: Vec> = (0..5) + .map(|i| vec![i as u8; 1000]) + .collect(); + + let all_chunks: Vec> = messages + .iter() + .map(|data| chunk(data).collect()) + .collect(); + + for chunks in &all_chunks { + assert!(chunks.len() > 1, "Each message should require multiple chunks"); + } + + for chunks in &all_chunks { + master.insert_bytes(&chunks[0]).unwrap(); + } + + let mut completed = 0; + for chunks in &all_chunks { + for chunk in &chunks[1..] { + if let Some(_) = master.insert_bytes(chunk).unwrap() { + completed += 1; + } + } + } + + assert_eq!(completed, 5, "All 5 messages should complete successfully"); +} + +#[test] +fn master_dechunker_single_chunk_message() { + let data = b"Small"; + let chunks: Vec<_> = chunk(data).collect(); + assert_eq!(chunks.len(), 1, "Small message should be single chunk"); + + let mut master = MasterDechunker::<10>::default(); + let result = master.insert_bytes(&chunks[0]).unwrap(); + + assert_eq!(result, Some(data.to_vec()), "Single chunk message should complete immediately"); +} \ No newline at end of file From 9b17cb401794ec354aedbbdcceadf0c975b8906d Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 08:47:08 -0400 Subject: [PATCH 02/10] remove default const generic --- btp/src/dechunk.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/btp/src/dechunk.rs b/btp/src/dechunk.rs index 0275a6d..d63f755 100644 --- a/btp/src/dechunk.rs +++ b/btp/src/dechunk.rs @@ -216,7 +216,7 @@ pub enum InsertBytesError { NoSlots, } -pub struct MasterDechunker { +pub struct MasterDechunker { pub dechunkers: [Option; N], } From ba5736bec56eefc9f7a41b51c582c5c2792f8525 Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 08:47:25 -0400 Subject: [PATCH 03/10] fmt --- btp/src/tests.rs | 77 ++++++++++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/btp/src/tests.rs b/btp/src/tests.rs index 87834b5..a49305b 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -1,5 +1,6 @@ use crate::{ - chunk, Chunk, Dechunker, DecodeError, InsertBytesError, MasterDechunker, MessageIdError, APP_MTU, CHUNK_DATA_SIZE, HEADER_SIZE, + chunk, Chunk, Dechunker, DecodeError, InsertBytesError, MasterDechunker, MessageIdError, + APP_MTU, CHUNK_DATA_SIZE, HEADER_SIZE, }; use rand::{seq::SliceRandom, Rng, RngCore}; @@ -405,7 +406,11 @@ fn master_dechunker_basic() { let result = master.insert_bytes(chunk).unwrap(); if i == chunks.len() - 1 { - assert_eq!(result, Some(data.to_vec()), "Last chunk should return completed data"); + assert_eq!( + result, + Some(data.to_vec()), + "Last chunk should return completed data" + ); } else { assert_eq!(result, None, "Intermediate chunks should return None"); } @@ -460,46 +465,56 @@ fn master_dechunker_multiple_messages() { #[test] fn master_dechunker_slot_exhaustion() { let mut master = MasterDechunker::<2>::default(); - + let data1 = vec![1u8; 1000]; let data2 = vec![2u8; 1000]; let data3 = vec![3u8; 1000]; - + let chunks1: Vec<_> = chunk(&data1).collect(); let chunks2: Vec<_> = chunk(&data2).collect(); let chunks3: Vec<_> = chunk(&data3).collect(); - - assert!(chunks1.len() > 1, "Message 1 should require multiple chunks"); - assert!(chunks2.len() > 1, "Message 2 should require multiple chunks"); - assert!(chunks3.len() > 1, "Message 3 should require multiple chunks"); - + + assert!( + chunks1.len() > 1, + "Message 1 should require multiple chunks" + ); + assert!( + chunks2.len() > 1, + "Message 2 should require multiple chunks" + ); + assert!( + chunks3.len() > 1, + "Message 3 should require multiple chunks" + ); + master.insert_bytes(&chunks1[0]).unwrap(); master.insert_bytes(&chunks2[0]).unwrap(); - + let result = master.insert_bytes(&chunks3[0]); - assert!(matches!(result, Err(InsertBytesError::NoSlots)), "Third message should fail with NoSlots error"); + assert!( + matches!(result, Err(InsertBytesError::NoSlots)), + "Third message should fail with NoSlots error" + ); } #[test] fn master_dechunker_custom_size() { let mut master = MasterDechunker::<5>::default(); - - let messages: Vec> = (0..5) - .map(|i| vec![i as u8; 1000]) - .collect(); - - let all_chunks: Vec> = messages - .iter() - .map(|data| chunk(data).collect()) - .collect(); - + + let messages: Vec> = (0..5).map(|i| vec![i as u8; 1000]).collect(); + + let all_chunks: Vec> = messages.iter().map(|data| chunk(data).collect()).collect(); + for chunks in &all_chunks { - assert!(chunks.len() > 1, "Each message should require multiple chunks"); + assert!( + chunks.len() > 1, + "Each message should require multiple chunks" + ); } - + for chunks in &all_chunks { master.insert_bytes(&chunks[0]).unwrap(); } - + let mut completed = 0; for chunks in &all_chunks { for chunk in &chunks[1..] { @@ -508,7 +523,7 @@ fn master_dechunker_custom_size() { } } } - + assert_eq!(completed, 5, "All 5 messages should complete successfully"); } @@ -517,9 +532,13 @@ fn master_dechunker_single_chunk_message() { let data = b"Small"; let chunks: Vec<_> = chunk(data).collect(); assert_eq!(chunks.len(), 1, "Small message should be single chunk"); - + let mut master = MasterDechunker::<10>::default(); let result = master.insert_bytes(&chunks[0]).unwrap(); - - assert_eq!(result, Some(data.to_vec()), "Single chunk message should complete immediately"); -} \ No newline at end of file + + assert_eq!( + result, + Some(data.to_vec()), + "Single chunk message should complete immediately" + ); +} From 316012db437cacb10f4b31ffcde8c7af70421023 Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 08:58:46 -0400 Subject: [PATCH 04/10] lru eviction --- btp/src/dechunk.rs | 62 ++++++++++++++++++++++++++++------------------ btp/src/tests.rs | 22 ++++++++++------ 2 files changed, 53 insertions(+), 31 deletions(-) diff --git a/btp/src/dechunk.rs b/btp/src/dechunk.rs index d63f755..01bc40b 100644 --- a/btp/src/dechunk.rs +++ b/btp/src/dechunk.rs @@ -1,6 +1,6 @@ use crate::{Header, CHUNK_DATA_SIZE, HEADER_SIZE}; -#[derive(Debug, Copy, Clone, thiserror::Error)] +#[derive(Debug, PartialEq, thiserror::Error)] pub enum DecodeError { #[error("chunk too small, expected at least {}", HEADER_SIZE)] HeaderTooSmall, @@ -208,39 +208,40 @@ impl Dechunker { } } -#[derive(Debug, Copy, Clone, thiserror::Error)] -pub enum InsertBytesError { - #[error(transparent)] - Parse(#[from] DecodeError), - #[error("no slots found")] - NoSlots, +#[derive(Debug)] +struct DechunkerSlot { + dechunker: Dechunker, + last_used: u64, } -pub struct MasterDechunker { - pub dechunkers: [Option; N], +pub struct MasterDechunker { + dechunkers: [Option; N], + counter: u64, } impl Default for MasterDechunker { fn default() -> Self { Self { dechunkers: std::array::from_fn(|_| None), + counter: 0, } } } impl MasterDechunker { - pub fn insert_bytes(&mut self, data: &[u8]) -> Result>, InsertBytesError> { + pub fn insert_bytes(&mut self, data: &[u8]) -> Result>, DecodeError> { let chunk = Chunk::parse(data)?; let message_id = chunk.header.message_id; for decoder_slot in &mut self.dechunkers { - if let Some(decoder) = decoder_slot { - if decoder.message_id() == Some(message_id) { - // Safe: message IDs match - decoder.insert_chunk(chunk).unwrap(); - - return if decoder.is_complete() { - Ok(decoder_slot.take().unwrap().data()) + if let Some(ref mut slot) = decoder_slot { + if slot.dechunker.message_id() == Some(message_id) { + self.counter += 1; + slot.last_used = self.counter; + slot.dechunker.insert_chunk(chunk).unwrap(); + + return if slot.dechunker.is_complete() { + Ok(decoder_slot.take().unwrap().dechunker.data()) } else { Ok(None) }; @@ -248,20 +249,33 @@ impl MasterDechunker { } } - let empty_slot = self - .dechunkers - .iter_mut() - .find(|slot| slot.is_none()) - .ok_or(InsertBytesError::NoSlots)?; + let target_slot = + if let Some(empty_slot) = self.dechunkers.iter_mut().find(|slot| slot.is_none()) { + empty_slot + } else { + let lru_index = self + .dechunkers + .iter() + .enumerate() + .filter_map(|(i, slot)| slot.as_ref().map(|s| (i, s.last_used))) + .min_by_key(|(_, last_used)| *last_used) + .map(|(i, _)| i) + .expect("should find slot"); + + &mut self.dechunkers[lru_index] + }; let mut decoder = Dechunker::new(); - // Safe: new decoder, no ID conflict decoder.insert_chunk(chunk).unwrap(); if decoder.is_complete() { Ok(decoder.data()) } else { - *empty_slot = Some(decoder); + self.counter += 1; + *target_slot = Some(DechunkerSlot { + dechunker: decoder, + last_used: self.counter, + }); Ok(None) } } diff --git a/btp/src/tests.rs b/btp/src/tests.rs index a49305b..7b277ed 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -1,6 +1,6 @@ use crate::{ - chunk, Chunk, Dechunker, DecodeError, InsertBytesError, MasterDechunker, MessageIdError, - APP_MTU, CHUNK_DATA_SIZE, HEADER_SIZE, + chunk, Chunk, Dechunker, DecodeError, MasterDechunker, MessageIdError, APP_MTU, + CHUNK_DATA_SIZE, HEADER_SIZE, }; use rand::{seq::SliceRandom, Rng, RngCore}; @@ -463,7 +463,7 @@ fn master_dechunker_multiple_messages() { } #[test] -fn master_dechunker_slot_exhaustion() { +fn master_dechunker_lru_eviction() { let mut master = MasterDechunker::<2>::default(); let data1 = vec![1u8; 1000]; @@ -490,10 +490,18 @@ fn master_dechunker_slot_exhaustion() { master.insert_bytes(&chunks1[0]).unwrap(); master.insert_bytes(&chunks2[0]).unwrap(); - let result = master.insert_bytes(&chunks3[0]); - assert!( - matches!(result, Err(InsertBytesError::NoSlots)), - "Third message should fail with NoSlots error" + master.insert_bytes(&chunks2[1]).unwrap(); + + let result = master.insert_bytes(&chunks3[0]).unwrap(); + assert_eq!( + result, None, + "Third message should succeed by evicting LRU slot" + ); + + let result = master.insert_bytes(&chunks1[0]).unwrap(); + assert_eq!( + result, None, + "Message 1 should start fresh after being evicted" ); } #[test] From e9fd6e2767bcf403bf64bb5943d9395934ead20b Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 09:01:36 -0400 Subject: [PATCH 05/10] clippy fixes --- btp/src/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/btp/src/tests.rs b/btp/src/tests.rs index 7b277ed..916b971 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -457,7 +457,7 @@ fn master_dechunker_multiple_messages() { 1 => assert_eq!(data, data1.to_vec(), "Message 1 data should match"), 2 => assert_eq!(data, data2.to_vec(), "Message 2 data should match"), 3 => assert_eq!(data, data3.to_vec(), "Message 3 data should match"), - _ => panic!("Unexpected message ID: {}", msg_id), + _ => panic!("Unexpected message ID: {msg_id}"), } } } @@ -526,7 +526,7 @@ fn master_dechunker_custom_size() { let mut completed = 0; for chunks in &all_chunks { for chunk in &chunks[1..] { - if let Some(_) = master.insert_bytes(chunk).unwrap() { + if master.insert_bytes(chunk).unwrap().is_some() { completed += 1; } } From 63239efd5acbd47a31b5a10fb5b7e42bcb6c904e Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 09:26:25 -0400 Subject: [PATCH 06/10] streaming dechunker --- btp/src/dechunk.rs | 100 +++++++++++++++++++++++++++++++++- btp/src/tests.rs | 133 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 1 deletion(-) diff --git a/btp/src/dechunk.rs b/btp/src/dechunk.rs index 01bc40b..a961237 100644 --- a/btp/src/dechunk.rs +++ b/btp/src/dechunk.rs @@ -1,4 +1,5 @@ use crate::{Header, CHUNK_DATA_SIZE, HEADER_SIZE}; +use std::io::{self, Write}; #[derive(Debug, PartialEq, thiserror::Error)] pub enum DecodeError { @@ -97,7 +98,7 @@ struct MessageInfo { } #[derive(Debug, Clone)] -struct RawChunk { +pub(crate) struct RawChunk { data: [u8; CHUNK_DATA_SIZE], len: u8, } @@ -280,3 +281,100 @@ impl MasterDechunker { } } } + +#[derive(Debug, thiserror::Error)] +pub enum StreamingError { + #[error(transparent)] + Io(#[from] io::Error), + #[error(transparent)] + MessageId(#[from] MessageIdError), +} + +pub struct StreamingDechunker { + writer: W, + pub(crate) chunks: Vec>, + info: Option, + next_chunk_to_write: u16, + bytes_written: u64, +} + +impl StreamingDechunker { + pub fn new(writer: W) -> Self { + Self { + writer, + chunks: Vec::new(), + info: None, + next_chunk_to_write: 0, + bytes_written: 0, + } + } + + pub fn insert_chunk(&mut self, chunk: Chunk) -> Result { + let header = &chunk.header; + + match self.info { + None => { + self.info = Some(MessageInfo { + message_id: header.message_id, + total_chunks: header.total_chunks, + chunks_received: 0, + }); + self.chunks.resize(header.total_chunks as usize, None); + } + Some(info) if info.message_id != header.message_id => { + return Err(StreamingError::MessageId(MessageIdError { + expected: info.message_id, + actual: header.message_id, + })); + } + _ => {} + } + + if self.chunks[header.index as usize].is_none() { + self.chunks[header.index as usize] = Some(RawChunk { + len: header.data_len, + data: chunk.chunk, + }); + + if let Some(ref mut info) = self.info { + info.chunks_received += 1; + } + } + + while (self.next_chunk_to_write as usize) < self.chunks.len() { + if let Some(chunk) = self.chunks[self.next_chunk_to_write as usize].take() { + self.writer.write_all(chunk.as_slice())?; + self.next_chunk_to_write += 1; + self.bytes_written += chunk.len as u64; + } else { + break; + } + } + + Ok(self.is_complete()) + } + + pub fn is_complete(&self) -> bool { + self.info + .map(|info| info.chunks_received == info.total_chunks) + .unwrap_or(false) + } + + pub fn message_id(&self) -> Option { + self.info.map(|info| info.message_id) + } + + pub fn bytes_written(&self) -> u64 { + self.bytes_written + } + + pub fn progress(&self) -> f32 { + self.info + .map(|info| info.chunks_received as f32 / info.total_chunks as f32) + .unwrap_or(0.0) + } + + pub fn into_writer(self) -> W { + self.writer + } +} diff --git a/btp/src/tests.rs b/btp/src/tests.rs index 916b971..b596d46 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -550,3 +550,136 @@ fn master_dechunker_single_chunk_message() { "Single chunk message should complete immediately" ); } +#[test] +fn streaming_dechunker_memory_usage() { + let data = vec![42u8; 5000]; + let chunks: Vec<_> = chunk(&data).collect(); + + assert!(chunks.len() >= 3, "Need at least 3 chunks for this test"); + + let mut output = Vec::new(); + let mut streaming = crate::dechunk::StreamingDechunker::new(&mut output); + + let chunk0 = Chunk::parse(&chunks[0]).unwrap(); + let chunk1 = Chunk::parse(&chunks[1]).unwrap(); + let chunk2 = Chunk::parse(&chunks[2]).unwrap(); + + let complete = streaming.insert_chunk(chunk0).unwrap(); + assert!(!complete, "Should not be complete after first chunk"); + assert_eq!( + streaming.bytes_written(), + chunk0.header.data_len as u64, + "Chunk 0 should be written immediately" + ); + + let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count(); + assert_eq!( + chunks_in_memory, 0, + "Chunk 0 should be freed from memory after writing" + ); + + let bytes_before = streaming.bytes_written(); + let complete = streaming.insert_chunk(chunk2).unwrap(); + assert!(!complete, "Should not be complete"); + assert_eq!( + streaming.bytes_written(), + bytes_before, + "Chunk 2 should not be written yet (out of order)" + ); + + let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count(); + assert_eq!(chunks_in_memory, 1, "Chunk 2 should be buffered in memory"); + assert!( + streaming.chunks[2].is_some(), + "Chunk 2 should be at index 2" + ); + + let bytes_before = streaming.bytes_written(); + let _complete = streaming.insert_chunk(chunk1).unwrap(); + + let expected_bytes = + bytes_before + chunk1.header.data_len as u64 + chunk2.header.data_len as u64; + assert_eq!( + streaming.bytes_written(), + expected_bytes, + "Both chunk 1 and 2 should be written" + ); + + let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count(); + assert_eq!( + chunks_in_memory, 0, + "All written chunks should be freed from memory" + ); + + for chunk in &chunks[3..] { + let parsed = Chunk::parse(chunk).unwrap(); + streaming.insert_chunk(parsed).unwrap(); + } + + assert!(streaming.is_complete(), "Message should be complete"); + assert_eq!( + streaming.bytes_written(), + data.len() as u64, + "All bytes should be written" + ); + + let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count(); + assert_eq!( + chunks_in_memory, 0, + "All chunks should be freed from memory when complete" + ); + + let output = streaming.into_writer(); + assert_eq!(*output, data, "Output should match original data"); +} + +#[test] +fn streaming_dechunker_reverse_order() { + let data = vec![1u8; 2000]; + let chunks: Vec<_> = chunk(&data).collect(); + + let mut output = Vec::new(); + let mut streaming = crate::dechunk::StreamingDechunker::new(&mut output); + + for chunk in chunks.iter().rev() { + let parsed = Chunk::parse(chunk).unwrap(); + streaming.insert_chunk(parsed).unwrap(); + } + + assert_eq!( + streaming.bytes_written(), + data.len() as u64, + "All data should be written after last chunk" + ); + + let output = streaming.into_writer(); + assert_eq!( + *output, data, + "Output should match despite reverse insertion order" + ); +} + +#[test] +fn streaming_dechunker_memory_efficiency() { + let data = vec![7u8; 10000]; + let chunks: Vec<_> = chunk(&data).collect(); + + let mut output = Vec::new(); + let mut streaming = crate::dechunk::StreamingDechunker::new(&mut output); + + for (i, chunk) in chunks.iter().enumerate() { + let parsed = Chunk::parse(chunk).unwrap(); + streaming.insert_chunk(parsed).unwrap(); + + let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count(); + assert_eq!( + chunks_in_memory, 0, + "No chunks should be buffered when inserting in order (iteration {})", + i + ); + } + + assert!(streaming.is_complete(), "Should be complete"); + let output = streaming.into_writer(); + assert_eq!(*output, data, "Output should match original data"); +} From a3d2b680b354f461ac0cbd9bb4b0daeb783f82cf Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 09:28:19 -0400 Subject: [PATCH 07/10] streaming -> Stream --- btp/src/dechunk.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/btp/src/dechunk.rs b/btp/src/dechunk.rs index a961237..75818ca 100644 --- a/btp/src/dechunk.rs +++ b/btp/src/dechunk.rs @@ -283,14 +283,14 @@ impl MasterDechunker { } #[derive(Debug, thiserror::Error)] -pub enum StreamingError { +pub enum StreamError { #[error(transparent)] Io(#[from] io::Error), #[error(transparent)] MessageId(#[from] MessageIdError), } -pub struct StreamingDechunker { +pub struct StreamDechunker { writer: W, pub(crate) chunks: Vec>, info: Option, @@ -298,7 +298,7 @@ pub struct StreamingDechunker { bytes_written: u64, } -impl StreamingDechunker { +impl StreamDechunker { pub fn new(writer: W) -> Self { Self { writer, @@ -309,7 +309,7 @@ impl StreamingDechunker { } } - pub fn insert_chunk(&mut self, chunk: Chunk) -> Result { + pub fn insert_chunk(&mut self, chunk: Chunk) -> Result { let header = &chunk.header; match self.info { @@ -322,7 +322,7 @@ impl StreamingDechunker { self.chunks.resize(header.total_chunks as usize, None); } Some(info) if info.message_id != header.message_id => { - return Err(StreamingError::MessageId(MessageIdError { + return Err(StreamError::MessageId(MessageIdError { expected: info.message_id, actual: header.message_id, })); From 19fefc44c23cd0a43319ebf64f236f833089bf45 Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 09:37:47 -0400 Subject: [PATCH 08/10] refine tests --- btp/src/tests.rs | 40 +++++----------------------------------- 1 file changed, 5 insertions(+), 35 deletions(-) diff --git a/btp/src/tests.rs b/btp/src/tests.rs index b596d46..2cf33ad 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -1,6 +1,6 @@ use crate::{ - chunk, Chunk, Dechunker, DecodeError, MasterDechunker, MessageIdError, APP_MTU, - CHUNK_DATA_SIZE, HEADER_SIZE, + chunk, Chunk, Dechunker, DecodeError, MasterDechunker, MessageIdError, StreamDechunker, + APP_MTU, CHUNK_DATA_SIZE, HEADER_SIZE, }; use rand::{seq::SliceRandom, Rng, RngCore}; @@ -504,36 +504,6 @@ fn master_dechunker_lru_eviction() { "Message 1 should start fresh after being evicted" ); } -#[test] -fn master_dechunker_custom_size() { - let mut master = MasterDechunker::<5>::default(); - - let messages: Vec> = (0..5).map(|i| vec![i as u8; 1000]).collect(); - - let all_chunks: Vec> = messages.iter().map(|data| chunk(data).collect()).collect(); - - for chunks in &all_chunks { - assert!( - chunks.len() > 1, - "Each message should require multiple chunks" - ); - } - - for chunks in &all_chunks { - master.insert_bytes(&chunks[0]).unwrap(); - } - - let mut completed = 0; - for chunks in &all_chunks { - for chunk in &chunks[1..] { - if master.insert_bytes(chunk).unwrap().is_some() { - completed += 1; - } - } - } - - assert_eq!(completed, 5, "All 5 messages should complete successfully"); -} #[test] fn master_dechunker_single_chunk_message() { @@ -558,7 +528,7 @@ fn streaming_dechunker_memory_usage() { assert!(chunks.len() >= 3, "Need at least 3 chunks for this test"); let mut output = Vec::new(); - let mut streaming = crate::dechunk::StreamingDechunker::new(&mut output); + let mut streaming = StreamDechunker::new(&mut output); let chunk0 = Chunk::parse(&chunks[0]).unwrap(); let chunk1 = Chunk::parse(&chunks[1]).unwrap(); @@ -639,7 +609,7 @@ fn streaming_dechunker_reverse_order() { let chunks: Vec<_> = chunk(&data).collect(); let mut output = Vec::new(); - let mut streaming = crate::dechunk::StreamingDechunker::new(&mut output); + let mut streaming = StreamDechunker::new(&mut output); for chunk in chunks.iter().rev() { let parsed = Chunk::parse(chunk).unwrap(); @@ -665,7 +635,7 @@ fn streaming_dechunker_memory_efficiency() { let chunks: Vec<_> = chunk(&data).collect(); let mut output = Vec::new(); - let mut streaming = crate::dechunk::StreamingDechunker::new(&mut output); + let mut streaming = StreamDechunker::new(&mut output); for (i, chunk) in chunks.iter().enumerate() { let parsed = Chunk::parse(chunk).unwrap(); From d49cb1394b2f5f651870634a54a5af764a0d9922 Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 09:45:09 -0400 Subject: [PATCH 09/10] insert_chunk --- btp/src/dechunk.rs | 11 +++++------ btp/src/tests.rs | 25 +++++++++++++++++-------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/btp/src/dechunk.rs b/btp/src/dechunk.rs index 75818ca..6f0d0d0 100644 --- a/btp/src/dechunk.rs +++ b/btp/src/dechunk.rs @@ -230,8 +230,7 @@ impl Default for MasterDechunker { } impl MasterDechunker { - pub fn insert_bytes(&mut self, data: &[u8]) -> Result>, DecodeError> { - let chunk = Chunk::parse(data)?; + pub fn insert_chunk(&mut self, chunk: Chunk) -> Option> { let message_id = chunk.header.message_id; for decoder_slot in &mut self.dechunkers { @@ -242,9 +241,9 @@ impl MasterDechunker { slot.dechunker.insert_chunk(chunk).unwrap(); return if slot.dechunker.is_complete() { - Ok(decoder_slot.take().unwrap().dechunker.data()) + decoder_slot.take().unwrap().dechunker.data() } else { - Ok(None) + None }; } } @@ -270,14 +269,14 @@ impl MasterDechunker { decoder.insert_chunk(chunk).unwrap(); if decoder.is_complete() { - Ok(decoder.data()) + decoder.data() } else { self.counter += 1; *target_slot = Some(DechunkerSlot { dechunker: decoder, last_used: self.counter, }); - Ok(None) + None } } } diff --git a/btp/src/tests.rs b/btp/src/tests.rs index 2cf33ad..7fb5c7b 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -403,7 +403,8 @@ fn master_dechunker_basic() { let mut master = MasterDechunker::<10>::default(); for (i, chunk) in chunks.iter().enumerate() { - let result = master.insert_bytes(chunk).unwrap(); + let parsed = Chunk::parse(chunk).unwrap(); + let result = master.insert_chunk(parsed); if i == chunks.len() - 1 { assert_eq!( @@ -445,7 +446,8 @@ fn master_dechunker_multiple_messages() { let mut completed = Vec::new(); for (msg_id, chunk) in all_chunks { - if let Some(data) = master.insert_bytes(chunk).unwrap() { + let parsed = Chunk::parse(chunk).unwrap(); + if let Some(data) = master.insert_chunk(parsed) { completed.push((msg_id, data)); } } @@ -487,18 +489,24 @@ fn master_dechunker_lru_eviction() { "Message 3 should require multiple chunks" ); - master.insert_bytes(&chunks1[0]).unwrap(); - master.insert_bytes(&chunks2[0]).unwrap(); + let parsed1_0 = Chunk::parse(&chunks1[0]).unwrap(); + let parsed2_0 = Chunk::parse(&chunks2[0]).unwrap(); + let parsed2_1 = Chunk::parse(&chunks2[1]).unwrap(); + let parsed3_0 = Chunk::parse(&chunks3[0]).unwrap(); + + master.insert_chunk(parsed1_0); + master.insert_chunk(parsed2_0); - master.insert_bytes(&chunks2[1]).unwrap(); + master.insert_chunk(parsed2_1); - let result = master.insert_bytes(&chunks3[0]).unwrap(); + let result = master.insert_chunk(parsed3_0); assert_eq!( result, None, "Third message should succeed by evicting LRU slot" ); - let result = master.insert_bytes(&chunks1[0]).unwrap(); + let parsed1_0_again = Chunk::parse(&chunks1[0]).unwrap(); + let result = master.insert_chunk(parsed1_0_again); assert_eq!( result, None, "Message 1 should start fresh after being evicted" @@ -512,7 +520,8 @@ fn master_dechunker_single_chunk_message() { assert_eq!(chunks.len(), 1, "Small message should be single chunk"); let mut master = MasterDechunker::<10>::default(); - let result = master.insert_bytes(&chunks[0]).unwrap(); + let parsed = Chunk::parse(&chunks[0]).unwrap(); + let result = master.insert_chunk(parsed); assert_eq!( result, From 3098acd9b6b46d3eb7ebd90d7921895bad9429a0 Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Thu, 24 Jul 2025 09:45:22 -0400 Subject: [PATCH 10/10] fix clippy --- btp/src/tests.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/btp/src/tests.rs b/btp/src/tests.rs index 7fb5c7b..b8ae1f9 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -653,8 +653,7 @@ fn streaming_dechunker_memory_efficiency() { let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count(); assert_eq!( chunks_in_memory, 0, - "No chunks should be buffered when inserting in order (iteration {})", - i + "No chunks should be buffered when inserting in order (iteration {i})" ); }