@@ -339,6 +339,31 @@ impl HostIO {
339339 }
340340}
341341
342+ /// A block must be at least [HASH_LEN] bytes long.
343+ pub struct Block ( [ u8 ] ) ;
344+
345+ impl Block {
346+ pub fn as_raw_slice ( & self ) -> & [ u8 ] {
347+ let block = self as * const Block as * const [ u8 ] ;
348+ unsafe { & * block }
349+ }
350+
351+ pub fn from_raw_slice ( block : & [ u8 ] ) -> & Self {
352+ let block2 = block as * const [ u8 ] as * const Block ;
353+ unsafe { & * block2 }
354+ }
355+
356+ /// Panics if block is illegally short
357+ pub fn next_block ( & self ) -> SHA256Sum {
358+ self . as_raw_slice ( ) [ ..HASH_LEN ] . try_into ( ) . unwrap ( )
359+ }
360+
361+ /// Panics if block is illegally short
362+ pub fn data ( & self ) -> & [ u8 ] {
363+ & self . as_raw_slice ( ) [ HASH_LEN ..]
364+ }
365+ }
366+
342367/// Concrete implementation of the interface defined by
343368/// [ledger_parser_combinators::async_parser::Readable] for the block protocol.
344369#[ derive( Clone ) ]
@@ -348,28 +373,54 @@ pub struct ByteStream {
348373 current_offset : usize ,
349374}
350375
376+ impl ByteStream {
377+ /// Get the current block.
378+ fn get_current_block < ' a > ( & ' a mut self ) -> impl ' a + Future < Output = Ref < ' static , Block > > {
379+ async move {
380+ if self . current_chunk == [ 0 ; 32 ] {
381+ let _: ( ) = reject ( ) . await ;
382+ }
383+ let chunk_res = self . host_io . get_chunk ( self . current_chunk ) . await ;
384+ match chunk_res {
385+ Ok ( a) => Ref :: map ( a, Block :: from_raw_slice) ,
386+ Err ( _) => reject ( ) . await ,
387+ }
388+ //return Ref::map(chunk, |r| &r[self.current_offset + HASH_LEN..]);
389+ }
390+ }
391+
392+ /// Get the rest of the current block that we have not already processed.
393+ ///
394+ /// [block] must be the current block.
395+ fn slice_from_block < ' a , ' b > ( & ' a mut self , block : & ' b Block ) -> & ' b [ u8 ] {
396+ & block. data ( ) [ self . current_offset ..]
397+ }
398+
399+ /// Consume [consume] bytes from the current block.
400+ ///
401+ /// [block] must be the current block. [consume] must be less than or equal
402+ /// to `self.slice_from_block(block).len()`.
403+ fn consume ( & mut self , block : & Block , consume : usize ) {
404+ self . current_offset += consume;
405+ debug_assert ! ( self . current_offset <= block. data( ) . len( ) ) ;
406+ if self . current_offset == block. data ( ) . len ( ) {
407+ self . current_chunk = block. next_block ( ) ;
408+ self . current_offset = 0 ;
409+ }
410+ }
411+ }
412+
351413impl Readable for ByteStream {
352414 type OutFut < ' a , const N : usize > = impl ' a + core:: future:: Future < Output = [ u8 ; N ] > ;
353415 fn read < ' a : ' b , ' b , const N : usize > ( & ' a mut self ) -> Self :: OutFut < ' b , N > {
354416 async move {
355417 let mut buffer = ArrayVec :: < u8 , N > :: new ( ) ;
356418 while !buffer. is_full ( ) {
357- if self . current_chunk == [ 0 ; 32 ] {
358- let _: ( ) = reject ( ) . await ;
359- }
360- let chunk_res = self . host_io . get_chunk ( self . current_chunk ) . await ;
361- let chunk = match chunk_res {
362- Ok ( a) => a,
363- Err ( _) => reject ( ) . await ,
364- } ;
365- let avail = & chunk[ self . current_offset + HASH_LEN ..] ;
419+ let block = self . get_current_block ( ) . await ;
420+ let avail = self . slice_from_block ( & block) ;
366421 let consuming = core:: cmp:: min ( avail. len ( ) , buffer. remaining_capacity ( ) ) ;
367422 buffer. try_extend_from_slice ( & avail[ 0 ..consuming] ) . ok ( ) ;
368- self . current_offset += consuming;
369- if self . current_offset + HASH_LEN == chunk. len ( ) {
370- self . current_chunk = chunk[ 0 ..HASH_LEN ] . try_into ( ) . unwrap ( ) ;
371- self . current_offset = 0 ;
372- }
423+ self . consume ( & * block, consuming) ;
373424 }
374425 buffer. into_inner ( ) . unwrap ( )
375426 }
0 commit comments