Skip to content

Commit 72cfada

Browse files
authored
Merge pull request #27 from Foundation-Devices/dechunker-improvements
lookup dechunker
2 parents deb338e + 8834d03 commit 72cfada

File tree

2 files changed

+66
-64
lines changed

2 files changed

+66
-64
lines changed

btp/src/dechunk.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{Header, CHUNK_DATA_SIZE, HEADER_SIZE};
22
use std::io::{self, Write};
33

4-
#[derive(Debug, PartialEq, thiserror::Error)]
4+
#[derive(Debug, thiserror::Error)]
55
pub enum DecodeError {
66
#[error("chunk too small, expected at least {}", HEADER_SIZE)]
77
HeaderTooSmall,
@@ -46,8 +46,8 @@ impl Chunk {
4646
&self.chunk[..self.header.data_len as usize]
4747
}
4848

49-
/// Parses raw bytes into a chunk
50-
pub fn parse(data: &[u8]) -> Result<Self, DecodeError> {
49+
/// decodes raw bytes into a chunk
50+
pub fn decode(data: &[u8]) -> Result<Self, DecodeError> {
5151
let (header_data, chunk_data) = data
5252
.split_at_checked(HEADER_SIZE)
5353
.ok_or(DecodeError::HeaderTooSmall)?;
@@ -135,8 +135,8 @@ impl Dechunker {
135135
.unwrap_or(0.0)
136136
}
137137

138-
/// Inserts a parsed chunk. Use this for multiple concurrent messages.
139-
/// First parse with [`Chunk::parse()`], lookup decoder by message ID, then insert.
138+
/// Inserts a chunk. Use this for multiple concurrent messages.
139+
/// First decode with [`Chunk::decode()`], lookup decoder by message ID, then insert.
140140
pub fn insert_chunk(&mut self, chunk: Chunk) -> Result<(), MessageIdError> {
141141
let header = &chunk.header;
142142

@@ -173,10 +173,10 @@ impl Dechunker {
173173
Ok(())
174174
}
175175

176-
/// Parses and inserts raw chunk data. Use this for single message at a time.
176+
/// Decodes and inserts raw chunk data. Use this for single message at a time.
177177
/// For multiple concurrent messages, use [`Chunk::parse()`] then [`Dechunker::insert_chunk()`].
178178
pub fn receive(&mut self, data: &[u8]) -> Result<(), ReceiveError> {
179-
let chunk_with_header = Chunk::parse(data)?;
179+
let chunk_with_header = Chunk::decode(data)?;
180180
self.insert_chunk(chunk_with_header)?;
181181
Ok(())
182182
}
@@ -209,17 +209,17 @@ impl Dechunker {
209209
}
210210
}
211211

212+
pub struct MasterDechunker<const N: usize> {
213+
dechunkers: [Option<DechunkerSlot>; N],
214+
counter: u64,
215+
}
216+
212217
#[derive(Debug)]
213218
struct DechunkerSlot {
214219
dechunker: Dechunker,
215220
last_used: u64,
216221
}
217222

218-
pub struct MasterDechunker<const N: usize = 10> {
219-
dechunkers: [Option<DechunkerSlot>; N],
220-
counter: u64,
221-
}
222-
223223
impl<const N: usize> Default for MasterDechunker<N> {
224224
fn default() -> Self {
225225
Self {
@@ -253,16 +253,10 @@ impl<const N: usize> MasterDechunker<N> {
253253
if let Some(empty_slot) = self.dechunkers.iter_mut().find(|slot| slot.is_none()) {
254254
empty_slot
255255
} else {
256-
let lru_index = self
257-
.dechunkers
258-
.iter()
259-
.enumerate()
260-
.filter_map(|(i, slot)| slot.as_ref().map(|s| (i, s.last_used)))
261-
.min_by_key(|(_, last_used)| *last_used)
262-
.map(|(i, _)| i)
263-
.expect("should find slot");
264-
265-
&mut self.dechunkers[lru_index]
256+
self.dechunkers
257+
.iter_mut()
258+
.min_by_key(|d| d.as_ref().map(|d| d.last_used))
259+
.expect("not empty")
266260
};
267261

268262
let mut decoder = Dechunker::new();
@@ -279,6 +273,14 @@ impl<const N: usize> MasterDechunker<N> {
279273
None
280274
}
281275
}
276+
277+
pub fn get_dechunker(&self, msg_id: u16) -> Option<&Dechunker> {
278+
self.dechunkers
279+
.iter()
280+
.filter_map(|d| d.as_ref())
281+
.find(|d| d.dechunker.info.map(|m| m.message_id) == Some(msg_id))
282+
.map(|d| &d.dechunker)
283+
}
282284
}
283285

284286
#[derive(Debug, thiserror::Error)]

btp/src/tests.rs

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -239,26 +239,26 @@ fn reverse_order_decoding() {
239239
}
240240

241241
#[test]
242-
fn chunk_parse_and_insert() {
242+
fn chunk_decode_and_insert() {
243243
use crate::Chunk;
244244

245-
let data = b"Test data for parse and push";
245+
let data = b"Test data for decode and push";
246246
let chunks: Vec<_> = chunk(data).collect();
247247

248248
let mut dechunker = Dechunker::new();
249249

250250
for raw_chunk in &chunks {
251-
let parsed = Chunk::parse(raw_chunk).unwrap();
252-
dechunker.insert_chunk(parsed).unwrap();
251+
let decoded = Chunk::decode(raw_chunk).unwrap();
252+
dechunker.insert_chunk(decoded).unwrap();
253253
}
254254

255255
assert_eq!(dechunker.data(), Some(data.to_vec()));
256256
}
257257

258258
#[test]
259-
fn chunk_parse_errors() {
259+
fn chunk_decode_errors() {
260260
let small_data = vec![0u8; HEADER_SIZE - 1];
261-
let result = Chunk::parse(&small_data);
261+
let result = Chunk::decode(&small_data);
262262
assert!(matches!(result, Err(DecodeError::HeaderTooSmall)));
263263
}
264264

@@ -268,7 +268,7 @@ fn chunk_too_small() {
268268
let mut raw_chunk = vec![0u8; HEADER_SIZE + 5];
269269
raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes());
270270

271-
let result = Chunk::parse(&raw_chunk);
271+
let result = Chunk::decode(&raw_chunk);
272272
assert!(
273273
matches!(
274274
result,
@@ -287,22 +287,22 @@ fn chunk_too_large() {
287287
let mut raw_chunk = vec![0u8; HEADER_SIZE + 255];
288288
raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes());
289289

290-
let result = Chunk::parse(&raw_chunk);
290+
let result = Chunk::decode(&raw_chunk);
291291
assert!(
292292
matches!(result, Err(DecodeError::ChunkTooLarge)),
293293
"should fail when data_len exceeds maximum chunk size"
294294
);
295295
}
296296

297297
#[test]
298-
fn chunk_parse_invalid_index() {
298+
fn chunk_decode_invalid_index() {
299299
use crate::{Chunk, DecodeError, HEADER_SIZE};
300300

301301
let header = crate::Header::new(1234, 10, 1, 5);
302302
let mut raw_chunk = vec![0u8; HEADER_SIZE + 5];
303303
raw_chunk[..HEADER_SIZE].copy_from_slice(header.as_bytes());
304304

305-
let result = Chunk::parse(&raw_chunk);
305+
let result = Chunk::decode(&raw_chunk);
306306
assert!(
307307
matches!(
308308
result,
@@ -325,10 +325,10 @@ fn insert_chunk_wrong_message_id() {
325325

326326
let mut dechunker = Dechunker::new();
327327

328-
let chunk1 = Chunk::parse(&chunks1[0]).unwrap();
328+
let chunk1 = Chunk::decode(&chunks1[0]).unwrap();
329329
dechunker.insert_chunk(chunk1).unwrap();
330330

331-
let chunk2 = Chunk::parse(&chunks2[0]).unwrap();
331+
let chunk2 = Chunk::decode(&chunks2[0]).unwrap();
332332
let result = dechunker.insert_chunk(chunk2);
333333

334334
assert!(
@@ -352,8 +352,8 @@ fn insert_chunk_out_of_order() {
352352
let mut dechunker = Dechunker::new();
353353

354354
for (i, chunk) in chunks.iter().enumerate() {
355-
let parsed = Chunk::parse(chunk).unwrap();
356-
dechunker.insert_chunk(parsed).unwrap();
355+
let decoded = Chunk::decode(chunk).unwrap();
356+
dechunker.insert_chunk(decoded).unwrap();
357357

358358
let expected_progress = (i + 1) as f32 / original_count as f32;
359359
assert!(
@@ -376,7 +376,7 @@ fn insert_duplicate_chunks() {
376376

377377
let mut dechunker = Dechunker::new();
378378

379-
let chunk0 = Chunk::parse(&chunks[0]).unwrap();
379+
let chunk0 = Chunk::decode(&chunks[0]).unwrap();
380380

381381
dechunker.insert_chunk(chunk0).unwrap();
382382
dechunker.insert_chunk(chunk0).unwrap();
@@ -388,8 +388,8 @@ fn insert_duplicate_chunks() {
388388
);
389389

390390
for chunk in &chunks[1..] {
391-
let parsed = Chunk::parse(chunk).unwrap();
392-
dechunker.insert_chunk(parsed).unwrap();
391+
let decoded = Chunk::decode(chunk).unwrap();
392+
dechunker.insert_chunk(decoded).unwrap();
393393
}
394394

395395
assert_eq!(dechunker.data(), Some(data.to_vec()));
@@ -403,8 +403,8 @@ fn master_dechunker_basic() {
403403
let mut master = MasterDechunker::<10>::default();
404404

405405
for (i, chunk) in chunks.iter().enumerate() {
406-
let parsed = Chunk::parse(chunk).unwrap();
407-
let result = master.insert_chunk(parsed);
406+
let decoded = Chunk::decode(chunk).unwrap();
407+
let result = master.insert_chunk(decoded);
408408

409409
if i == chunks.len() - 1 {
410410
assert_eq!(
@@ -446,8 +446,8 @@ fn master_dechunker_multiple_messages() {
446446
let mut completed = Vec::new();
447447

448448
for (msg_id, chunk) in all_chunks {
449-
let parsed = Chunk::parse(chunk).unwrap();
450-
if let Some(data) = master.insert_chunk(parsed) {
449+
let decoded = Chunk::decode(chunk).unwrap();
450+
if let Some(data) = master.insert_chunk(decoded) {
451451
completed.push((msg_id, data));
452452
}
453453
}
@@ -489,24 +489,24 @@ fn master_dechunker_lru_eviction() {
489489
"Message 3 should require multiple chunks"
490490
);
491491

492-
let parsed1_0 = Chunk::parse(&chunks1[0]).unwrap();
493-
let parsed2_0 = Chunk::parse(&chunks2[0]).unwrap();
494-
let parsed2_1 = Chunk::parse(&chunks2[1]).unwrap();
495-
let parsed3_0 = Chunk::parse(&chunks3[0]).unwrap();
492+
let decoded1_0 = Chunk::decode(&chunks1[0]).unwrap();
493+
let decoded2_0 = Chunk::decode(&chunks2[0]).unwrap();
494+
let decoded2_1 = Chunk::decode(&chunks2[1]).unwrap();
495+
let decoded3_0 = Chunk::decode(&chunks3[0]).unwrap();
496496

497-
master.insert_chunk(parsed1_0);
498-
master.insert_chunk(parsed2_0);
497+
master.insert_chunk(decoded1_0);
498+
master.insert_chunk(decoded2_0);
499499

500-
master.insert_chunk(parsed2_1);
500+
master.insert_chunk(decoded2_1);
501501

502-
let result = master.insert_chunk(parsed3_0);
502+
let result = master.insert_chunk(decoded3_0);
503503
assert_eq!(
504504
result, None,
505505
"Third message should succeed by evicting LRU slot"
506506
);
507507

508-
let parsed1_0_again = Chunk::parse(&chunks1[0]).unwrap();
509-
let result = master.insert_chunk(parsed1_0_again);
508+
let decoded1_0_again = Chunk::decode(&chunks1[0]).unwrap();
509+
let result = master.insert_chunk(decoded1_0_again);
510510
assert_eq!(
511511
result, None,
512512
"Message 1 should start fresh after being evicted"
@@ -520,8 +520,8 @@ fn master_dechunker_single_chunk_message() {
520520
assert_eq!(chunks.len(), 1, "Small message should be single chunk");
521521

522522
let mut master = MasterDechunker::<10>::default();
523-
let parsed = Chunk::parse(&chunks[0]).unwrap();
524-
let result = master.insert_chunk(parsed);
523+
let decoded = Chunk::decode(&chunks[0]).unwrap();
524+
let result = master.insert_chunk(decoded);
525525

526526
assert_eq!(
527527
result,
@@ -539,9 +539,9 @@ fn streaming_dechunker_memory_usage() {
539539
let mut output = Vec::new();
540540
let mut streaming = StreamDechunker::new(&mut output);
541541

542-
let chunk0 = Chunk::parse(&chunks[0]).unwrap();
543-
let chunk1 = Chunk::parse(&chunks[1]).unwrap();
544-
let chunk2 = Chunk::parse(&chunks[2]).unwrap();
542+
let chunk0 = Chunk::decode(&chunks[0]).unwrap();
543+
let chunk1 = Chunk::decode(&chunks[1]).unwrap();
544+
let chunk2 = Chunk::decode(&chunks[2]).unwrap();
545545

546546
let complete = streaming.insert_chunk(chunk0).unwrap();
547547
assert!(!complete, "Should not be complete after first chunk");
@@ -591,8 +591,8 @@ fn streaming_dechunker_memory_usage() {
591591
);
592592

593593
for chunk in &chunks[3..] {
594-
let parsed = Chunk::parse(chunk).unwrap();
595-
streaming.insert_chunk(parsed).unwrap();
594+
let decoded = Chunk::decode(chunk).unwrap();
595+
streaming.insert_chunk(decoded).unwrap();
596596
}
597597

598598
assert!(streaming.is_complete(), "Message should be complete");
@@ -621,8 +621,8 @@ fn streaming_dechunker_reverse_order() {
621621
let mut streaming = StreamDechunker::new(&mut output);
622622

623623
for chunk in chunks.iter().rev() {
624-
let parsed = Chunk::parse(chunk).unwrap();
625-
streaming.insert_chunk(parsed).unwrap();
624+
let decoded = Chunk::decode(chunk).unwrap();
625+
streaming.insert_chunk(decoded).unwrap();
626626
}
627627

628628
assert_eq!(
@@ -647,8 +647,8 @@ fn streaming_dechunker_memory_efficiency() {
647647
let mut streaming = StreamDechunker::new(&mut output);
648648

649649
for (i, chunk) in chunks.iter().enumerate() {
650-
let parsed = Chunk::parse(chunk).unwrap();
651-
streaming.insert_chunk(parsed).unwrap();
650+
let decoded = Chunk::decode(chunk).unwrap();
651+
streaming.insert_chunk(decoded).unwrap();
652652

653653
let chunks_in_memory = streaming.chunks.iter().filter(|c| c.is_some()).count();
654654
assert_eq!(

0 commit comments

Comments
 (0)