diff --git a/btp/src/dechunk.rs b/btp/src/dechunk.rs index 6f0d0d0..d64d03f 100644 --- a/btp/src/dechunk.rs +++ b/btp/src/dechunk.rs @@ -1,7 +1,7 @@ use crate::{Header, CHUNK_DATA_SIZE, HEADER_SIZE}; use std::io::{self, Write}; -#[derive(Debug, PartialEq, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum DecodeError { #[error("chunk too small, expected at least {}", HEADER_SIZE)] HeaderTooSmall, @@ -46,8 +46,8 @@ impl Chunk { &self.chunk[..self.header.data_len as usize] } - /// Parses raw bytes into a chunk - pub fn parse(data: &[u8]) -> Result { + /// decodes raw bytes into a chunk + pub fn decode(data: &[u8]) -> Result { let (header_data, chunk_data) = data .split_at_checked(HEADER_SIZE) .ok_or(DecodeError::HeaderTooSmall)?; @@ -135,8 +135,8 @@ impl Dechunker { .unwrap_or(0.0) } - /// Inserts a parsed chunk. Use this for multiple concurrent messages. - /// First parse with [`Chunk::parse()`], lookup decoder by message ID, then insert. + /// Inserts a chunk. Use this for multiple concurrent messages. + /// First decode with [`Chunk::decode()`], lookup decoder by message ID, then insert. pub fn insert_chunk(&mut self, chunk: Chunk) -> Result<(), MessageIdError> { let header = &chunk.header; @@ -173,10 +173,10 @@ impl Dechunker { Ok(()) } - /// Parses and inserts raw chunk data. Use this for single message at a time. + /// Decodes and inserts raw chunk data. Use this for single message at a time. /// For multiple concurrent messages, use [`Chunk::parse()`] then [`Dechunker::insert_chunk()`]. pub fn receive(&mut self, data: &[u8]) -> Result<(), ReceiveError> { - let chunk_with_header = Chunk::parse(data)?; + let chunk_with_header = Chunk::decode(data)?; self.insert_chunk(chunk_with_header)?; Ok(()) } @@ -209,17 +209,17 @@ impl Dechunker { } } +pub struct MasterDechunker { + dechunkers: [Option; N], + counter: u64, +} + #[derive(Debug)] struct DechunkerSlot { dechunker: Dechunker, last_used: u64, } -pub struct MasterDechunker { - dechunkers: [Option; N], - counter: u64, -} - impl Default for MasterDechunker { fn default() -> Self { Self { @@ -253,16 +253,10 @@ impl MasterDechunker { if let Some(empty_slot) = self.dechunkers.iter_mut().find(|slot| slot.is_none()) { empty_slot } else { - let lru_index = self - .dechunkers - .iter() - .enumerate() - .filter_map(|(i, slot)| slot.as_ref().map(|s| (i, s.last_used))) - .min_by_key(|(_, last_used)| *last_used) - .map(|(i, _)| i) - .expect("should find slot"); - - &mut self.dechunkers[lru_index] + self.dechunkers + .iter_mut() + .min_by_key(|d| d.as_ref().map(|d| d.last_used)) + .expect("not empty") }; let mut decoder = Dechunker::new(); @@ -279,6 +273,14 @@ impl MasterDechunker { None } } + + pub fn get_dechunker(&self, msg_id: u16) -> Option<&Dechunker> { + self.dechunkers + .iter() + .filter_map(|d| d.as_ref()) + .find(|d| d.dechunker.info.map(|m| m.message_id) == Some(msg_id)) + .map(|d| &d.dechunker) + } } #[derive(Debug, thiserror::Error)] diff --git a/btp/src/tests.rs b/btp/src/tests.rs index b8ae1f9..ae4cff7 100644 --- a/btp/src/tests.rs +++ b/btp/src/tests.rs @@ -239,26 +239,26 @@ fn reverse_order_decoding() { } #[test] -fn chunk_parse_and_insert() { +fn chunk_decode_and_insert() { use crate::Chunk; - let data = b"Test data for parse and push"; + let data = b"Test data for decode and push"; let chunks: Vec<_> = chunk(data).collect(); let mut dechunker = Dechunker::new(); for raw_chunk in &chunks { - let parsed = Chunk::parse(raw_chunk).unwrap(); - dechunker.insert_chunk(parsed).unwrap(); + let decoded = Chunk::decode(raw_chunk).unwrap(); + dechunker.insert_chunk(decoded).unwrap(); } assert_eq!(dechunker.data(), Some(data.to_vec())); } #[test] -fn chunk_parse_errors() { +fn chunk_decode_errors() { let small_data = vec![0u8; HEADER_SIZE - 1]; - let result = Chunk::parse(&small_data); + let result = Chunk::decode(&small_data); assert!(matches!(result, Err(DecodeError::HeaderTooSmall))); } @@ -268,7 +268,7 @@ fn chunk_too_small() { let mut raw_chunk = vec![0u8; HEADER_SIZE + 5]; raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes()); - let result = Chunk::parse(&raw_chunk); + let result = Chunk::decode(&raw_chunk); assert!( matches!( result, @@ -287,7 +287,7 @@ fn chunk_too_large() { let mut raw_chunk = vec![0u8; HEADER_SIZE + 255]; raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes()); - let result = Chunk::parse(&raw_chunk); + let result = Chunk::decode(&raw_chunk); assert!( matches!(result, Err(DecodeError::ChunkTooLarge)), "should fail when data_len exceeds maximum chunk size" @@ -295,14 +295,14 @@ fn chunk_too_large() { } #[test] -fn chunk_parse_invalid_index() { +fn chunk_decode_invalid_index() { use crate::{Chunk, DecodeError, HEADER_SIZE}; let header = crate::Header::new(1234, 10, 1, 5); let mut raw_chunk = vec![0u8; HEADER_SIZE + 5]; raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes()); - let result = Chunk::parse(&raw_chunk); + let result = Chunk::decode(&raw_chunk); assert!( matches!( result, @@ -325,10 +325,10 @@ fn insert_chunk_wrong_message_id() { let mut dechunker = Dechunker::new(); - let chunk1 = Chunk::parse(&chunks1[0]).unwrap(); + let chunk1 = Chunk::decode(&chunks1[0]).unwrap(); dechunker.insert_chunk(chunk1).unwrap(); - let chunk2 = Chunk::parse(&chunks2[0]).unwrap(); + let chunk2 = Chunk::decode(&chunks2[0]).unwrap(); let result = dechunker.insert_chunk(chunk2); assert!( @@ -352,8 +352,8 @@ fn insert_chunk_out_of_order() { let mut dechunker = Dechunker::new(); for (i, chunk) in chunks.iter().enumerate() { - let parsed = Chunk::parse(chunk).unwrap(); - dechunker.insert_chunk(parsed).unwrap(); + let decoded = Chunk::decode(chunk).unwrap(); + dechunker.insert_chunk(decoded).unwrap(); let expected_progress = (i + 1) as f32 / original_count as f32; assert!( @@ -376,7 +376,7 @@ fn insert_duplicate_chunks() { let mut dechunker = Dechunker::new(); - let chunk0 = Chunk::parse(&chunks[0]).unwrap(); + let chunk0 = Chunk::decode(&chunks[0]).unwrap(); dechunker.insert_chunk(chunk0).unwrap(); dechunker.insert_chunk(chunk0).unwrap(); @@ -388,8 +388,8 @@ fn insert_duplicate_chunks() { ); for chunk in &chunks[1..] { - let parsed = Chunk::parse(chunk).unwrap(); - dechunker.insert_chunk(parsed).unwrap(); + let decoded = Chunk::decode(chunk).unwrap(); + dechunker.insert_chunk(decoded).unwrap(); } assert_eq!(dechunker.data(), Some(data.to_vec())); @@ -403,8 +403,8 @@ fn master_dechunker_basic() { let mut master = MasterDechunker::<10>::default(); for (i, chunk) in chunks.iter().enumerate() { - let parsed = Chunk::parse(chunk).unwrap(); - let result = master.insert_chunk(parsed); + let decoded = Chunk::decode(chunk).unwrap(); + let result = master.insert_chunk(decoded); if i == chunks.len() - 1 { assert_eq!( @@ -446,8 +446,8 @@ fn master_dechunker_multiple_messages() { let mut completed = Vec::new(); for (msg_id, chunk) in all_chunks { - let parsed = Chunk::parse(chunk).unwrap(); - if let Some(data) = master.insert_chunk(parsed) { + let decoded = Chunk::decode(chunk).unwrap(); + if let Some(data) = master.insert_chunk(decoded) { completed.push((msg_id, data)); } } @@ -489,24 +489,24 @@ fn master_dechunker_lru_eviction() { "Message 3 should require multiple chunks" ); - let parsed1_0 = Chunk::parse(&chunks1[0]).unwrap(); - let parsed2_0 = Chunk::parse(&chunks2[0]).unwrap(); - let parsed2_1 = Chunk::parse(&chunks2[1]).unwrap(); - let parsed3_0 = Chunk::parse(&chunks3[0]).unwrap(); + let decoded1_0 = Chunk::decode(&chunks1[0]).unwrap(); + let decoded2_0 = Chunk::decode(&chunks2[0]).unwrap(); + let decoded2_1 = Chunk::decode(&chunks2[1]).unwrap(); + let decoded3_0 = Chunk::decode(&chunks3[0]).unwrap(); - master.insert_chunk(parsed1_0); - master.insert_chunk(parsed2_0); + master.insert_chunk(decoded1_0); + master.insert_chunk(decoded2_0); - master.insert_chunk(parsed2_1); + master.insert_chunk(decoded2_1); - let result = master.insert_chunk(parsed3_0); + let result = master.insert_chunk(decoded3_0); assert_eq!( result, None, "Third message should succeed by evicting LRU slot" ); - let parsed1_0_again = Chunk::parse(&chunks1[0]).unwrap(); - let result = master.insert_chunk(parsed1_0_again); + let decoded1_0_again = Chunk::decode(&chunks1[0]).unwrap(); + let result = master.insert_chunk(decoded1_0_again); assert_eq!( result, None, "Message 1 should start fresh after being evicted" @@ -520,8 +520,8 @@ fn master_dechunker_single_chunk_message() { assert_eq!(chunks.len(), 1, "Small message should be single chunk"); let mut master = MasterDechunker::<10>::default(); - let parsed = Chunk::parse(&chunks[0]).unwrap(); - let result = master.insert_chunk(parsed); + let decoded = Chunk::decode(&chunks[0]).unwrap(); + let result = master.insert_chunk(decoded); assert_eq!( result, @@ -539,9 +539,9 @@ fn streaming_dechunker_memory_usage() { let mut output = Vec::new(); let mut streaming = StreamDechunker::new(&mut output); - let chunk0 = Chunk::parse(&chunks[0]).unwrap(); - let chunk1 = Chunk::parse(&chunks[1]).unwrap(); - let chunk2 = Chunk::parse(&chunks[2]).unwrap(); + let chunk0 = Chunk::decode(&chunks[0]).unwrap(); + let chunk1 = Chunk::decode(&chunks[1]).unwrap(); + let chunk2 = Chunk::decode(&chunks[2]).unwrap(); let complete = streaming.insert_chunk(chunk0).unwrap(); assert!(!complete, "Should not be complete after first chunk"); @@ -591,8 +591,8 @@ fn streaming_dechunker_memory_usage() { ); for chunk in &chunks[3..] { - let parsed = Chunk::parse(chunk).unwrap(); - streaming.insert_chunk(parsed).unwrap(); + let decoded = Chunk::decode(chunk).unwrap(); + streaming.insert_chunk(decoded).unwrap(); } assert!(streaming.is_complete(), "Message should be complete"); @@ -621,8 +621,8 @@ fn streaming_dechunker_reverse_order() { let mut streaming = StreamDechunker::new(&mut output); for chunk in chunks.iter().rev() { - let parsed = Chunk::parse(chunk).unwrap(); - streaming.insert_chunk(parsed).unwrap(); + let decoded = Chunk::decode(chunk).unwrap(); + streaming.insert_chunk(decoded).unwrap(); } assert_eq!( @@ -647,8 +647,8 @@ fn streaming_dechunker_memory_efficiency() { let mut streaming = StreamDechunker::new(&mut output); for (i, chunk) in chunks.iter().enumerate() { - let parsed = Chunk::parse(chunk).unwrap(); - streaming.insert_chunk(parsed).unwrap(); + let decoded = Chunk::decode(chunk).unwrap(); + streaming.insert_chunk(decoded).unwrap(); let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count(); assert_eq!(