@@ -25,12 +25,22 @@ pub struct MessageIdError {
2525 actual : u16 ,
2626}
2727
28+ #[ derive( Debug , thiserror:: Error ) ]
29+ #[ error( "message length: expected {expected}, actual {actual}" ) ]
30+ pub struct LengthMismatchError {
31+ message_id : u16 ,
32+ expected : u16 ,
33+ actual : u16 ,
34+ }
35+
2836#[ derive( Debug , thiserror:: Error ) ]
2937pub enum ReceiveError {
3038 #[ error( transparent) ]
3139 Decode ( #[ from] DecodeError ) ,
3240 #[ error( transparent) ]
3341 MessageId ( #[ from] MessageIdError ) ,
42+ #[ error( transparent) ]
43+ LengthMismatch ( #[ from] LengthMismatchError ) ,
3444}
3545
3646#[ derive( Clone , Copy ) ]
@@ -85,8 +95,8 @@ impl Chunk {
8595
8696#[ derive( Default ) ]
8797pub struct Dechunker {
88- chunks : Vec < Option < RawChunk > > ,
89- info : Option < MessageInfo > ,
98+ pub chunks : Vec < Option < RawChunk > > ,
99+ pub info : Option < MessageInfo > ,
90100}
91101
92102impl std:: fmt:: Debug for Dechunker {
@@ -102,16 +112,16 @@ impl std::fmt::Debug for Dechunker {
102112}
103113
104114#[ derive( Debug , Clone , Copy ) ]
105- struct MessageInfo {
106- message_id : u16 ,
107- total_chunks : u16 ,
108- chunks_received : u16 ,
115+ pub struct MessageInfo {
116+ pub message_id : u16 ,
117+ pub total_chunks : u16 ,
118+ pub chunks_received : u16 ,
109119}
110120
111121#[ derive( Debug , Clone ) ]
112- pub ( crate ) struct RawChunk {
113- data : [ u8 ; CHUNK_DATA_SIZE ] ,
114- len : u8 ,
122+ pub struct RawChunk {
123+ pub data : [ u8 ; CHUNK_DATA_SIZE ] ,
124+ pub len : u8 ,
115125}
116126
117127impl RawChunk {
@@ -149,7 +159,7 @@ impl Dechunker {
149159 /// Inserts a chunk. Use this for multiple concurrent messages.
150160 /// First decode with [`Chunk::decode()`], lookup decoder by message ID,
151161 /// then insert.
152- pub fn insert_chunk ( & mut self , chunk : Chunk ) -> Result < ( ) , MessageIdError > {
162+ pub fn insert_chunk ( & mut self , chunk : Chunk ) -> Result < ( ) , ReceiveError > {
153163 let header = & chunk. header ;
154164
155165 match self . info {
@@ -165,7 +175,16 @@ impl Dechunker {
165175 return Err ( MessageIdError {
166176 expected : info. message_id ,
167177 actual : header. message_id ,
168- } ) ;
178+ }
179+ . into ( ) ) ;
180+ }
181+ Some ( info) if info. total_chunks != header. total_chunks => {
182+ return Err ( LengthMismatchError {
183+ message_id : header. message_id ,
184+ expected : info. message_id ,
185+ actual : header. message_id ,
186+ }
187+ . into ( ) ) ;
169188 }
170189 _ => { }
171190 }
@@ -207,16 +226,12 @@ impl Dechunker {
207226
208227 // unwraps are now ok
209228
210- let mut result = Vec :: with_capacity (
211- self . chunks
212- . iter ( )
213- . map ( |chunk| chunk. as_ref ( ) . unwrap ( ) . len as usize )
214- . sum ( ) ,
215- ) ;
216-
217- for chunk in & self . chunks {
218- result. extend_from_slice ( chunk. as_ref ( ) . unwrap ( ) . as_slice ( ) ) ;
219- }
229+ let result = self
230+ . chunks
231+ . iter ( )
232+ . flat_map ( |chunk| chunk. as_ref ( ) . unwrap ( ) . as_slice ( ) )
233+ . copied ( )
234+ . collect ( ) ;
220235
221236 Some ( result)
222237 }
@@ -230,8 +245,8 @@ pub struct MasterDechunker<const N: usize> {
230245
231246#[ derive( Debug ) ]
232247pub struct DechunkerSlot {
233- dechunker : Dechunker ,
234- last_used : u64 ,
248+ pub dechunker : Dechunker ,
249+ pub last_used : u64 ,
235250}
236251
237252impl < const N : usize > Default for MasterDechunker < N > {
@@ -249,6 +264,11 @@ impl<const N: usize> MasterDechunker<N> {
249264 }
250265
251266 pub fn insert_chunk ( & mut self , chunk : Chunk ) -> Option < Vec < u8 > > {
267+ let ( completed, _evicted) = self . insert_chunk_raw ( chunk) ;
268+ completed
269+ }
270+
271+ pub fn insert_chunk_raw ( & mut self , chunk : Chunk ) -> ( Option < Vec < u8 > > , Option < DechunkerSlot > ) {
252272 let message_id = chunk. header . message_id ;
253273
254274 for decoder_slot in & mut self . dechunkers {
@@ -259,36 +279,41 @@ impl<const N: usize> MasterDechunker<N> {
259279 slot. dechunker . insert_chunk ( chunk) . unwrap ( ) ;
260280
261281 return if slot. dechunker . is_complete ( ) {
262- decoder_slot. take ( ) . unwrap ( ) . dechunker . data ( )
282+ let completed = decoder_slot. take ( ) . unwrap ( ) . dechunker . data ( ) ;
283+ ( completed, None )
263284 } else {
264- None
285+ ( None , None )
265286 } ;
266287 }
267288 }
268289 }
269290
270- let target_slot =
291+ let ( target_slot, evicted ) =
271292 if let Some ( empty_slot) = self . dechunkers . iter_mut ( ) . find ( |slot| slot. is_none ( ) ) {
272- empty_slot
293+ ( empty_slot, None )
273294 } else {
274- self . dechunkers
295+ let slot = self
296+ . dechunkers
275297 . iter_mut ( )
276298 . min_by_key ( |d| d. as_ref ( ) . map ( |d| d. last_used ) )
277- . expect ( "not empty" )
299+ . expect ( "not empty" ) ;
300+
301+ let evicted = slot. take ( ) ;
302+ ( slot, evicted)
278303 } ;
279304
280305 let mut decoder = Dechunker :: new ( ) ;
281306 decoder. insert_chunk ( chunk) . unwrap ( ) ;
282307
283308 if decoder. is_complete ( ) {
284- decoder. data ( )
309+ ( decoder. data ( ) , evicted )
285310 } else {
286311 self . counter += 1 ;
287312 * target_slot = Some ( DechunkerSlot {
288313 dechunker : decoder,
289314 last_used : self . counter ,
290315 } ) ;
291- None
316+ ( None , evicted )
292317 }
293318 }
294319
0 commit comments