@@ -38,8 +38,8 @@ pub struct Client {
3838}
3939
4040enum Decoder {
41- Metadata ( AsyncMetadataDecoder < BufReader < ReadHalf < TcpStream > > > ) ,
42- Record ( AsyncRecordDecoder < BufReader < ReadHalf < TcpStream > > > ) ,
41+ Metadata ( AsyncMetadataDecoder < ReadHalf < TcpStream > > ) ,
42+ Record ( AsyncRecordDecoder < ReadHalf < TcpStream > > ) ,
4343 Empty ,
4444}
4545
@@ -107,7 +107,7 @@ impl Client {
107107 heartbeat_interval,
108108 protocol,
109109 peer_addr,
110- decoder : Decoder :: Metadata ( AsyncMetadataDecoder :: new ( recver) ) ,
110+ decoder : Decoder :: Metadata ( AsyncMetadataDecoder :: new ( recver. into_inner ( ) ) ) ,
111111 session_id,
112112 span,
113113 sub_counter : 0 ,
@@ -229,12 +229,7 @@ impl Client {
229229 info ! ( "Starting session" ) ;
230230 self . protocol . start_session ( ) . await ?;
231231 let mut metadata = decoder. decode ( ) . await ?;
232- self . decoder = Decoder :: Record ( AsyncRecordDecoder :: with_version (
233- decoder. into_inner ( ) ,
234- metadata. version ,
235- self . upgrade_policy ,
236- metadata. ts_out ,
237- ) ?) ;
232+ self . decoder = Decoder :: Record ( AsyncRecordDecoder :: from ( decoder) ) ;
238233 // Should match `send_ts_out` but set again here for safety
239234 metadata. upgrade ( self . upgrade_policy ) ;
240235 Ok ( metadata)
@@ -299,7 +294,7 @@ impl Client {
299294 self . heartbeat_interval . map ( |i| i. whole_seconds ( ) ) ,
300295 )
301296 . await ?;
302- self . decoder = Decoder :: Metadata ( AsyncMetadataDecoder :: new ( recver) ) ;
297+ self . decoder = Decoder :: Metadata ( AsyncMetadataDecoder :: new ( recver. into_inner ( ) ) ) ;
303298 self . span = info_span ! ( "LiveClient" , dataset = %self . dataset, session_id = self . session_id) ;
304299 Ok ( ( ) )
305300 }
@@ -413,7 +408,7 @@ mod tests {
413408 self . send ( "success=1|session_id=5\n " ) . await ;
414409 }
415410
416- async fn subscribe ( & mut self , subscription : Subscription ) {
411+ async fn subscribe ( & mut self , subscription : Subscription , is_last : bool ) {
417412 let sub_line = self . read_line ( ) . await ;
418413 assert ! ( sub_line. contains( & format!( "symbols={}" , subscription. symbols. to_api_string( ) ) ) ) ;
419414 assert ! ( sub_line. contains( & format!( "schema={}" , subscription. schema) ) ) ;
@@ -423,6 +418,7 @@ mod tests {
423418 assert ! ( sub_line. contains( & format!( "start={}" , start. unix_timestamp_nanos( ) ) ) )
424419 }
425420 assert ! ( sub_line. contains( & format!( "snapshot={}" , subscription. use_snapshot as u8 ) ) ) ;
421+ assert ! ( sub_line. contains( & format!( "is_last={}" , is_last as u8 ) ) ) ;
426422 }
427423
428424 async fn start ( & mut self ) {
@@ -489,7 +485,7 @@ mod tests {
489485 Accept ,
490486 Authenticate ( Option < Duration > ) ,
491487 Send ( String ) ,
492- Subscribe ( Subscription ) ,
488+ Subscribe ( Subscription , bool ) ,
493489 Start ,
494490 SendRecord ( Box < dyn AsRef < [ u8 ] > + Send > ) ,
495491 Disconnect ,
@@ -502,7 +498,7 @@ mod tests {
502498 Event :: Accept => write ! ( f, "Accept" ) ,
503499 Event :: Authenticate ( hb_int) => write ! ( f, "Authenticate({hb_int:?})" ) ,
504500 Event :: Send ( msg) => write ! ( f, "Send({msg:?})" ) ,
505- Event :: Subscribe ( sub) => write ! ( f, "Subscribe({sub:?})" ) ,
501+ Event :: Subscribe ( sub, is_last ) => write ! ( f, "Subscribe({sub:?}, {is_last :?})" ) ,
506502 Event :: Start => write ! ( f, "Start" ) ,
507503 Event :: SendRecord ( _) => write ! ( f, "SendRecord" ) ,
508504 Event :: Disconnect => write ! ( f, "Disconnect" ) ,
@@ -521,7 +517,7 @@ mod tests {
521517 Some ( Event :: Authenticate ( hb_int) ) => mock. authenticate ( hb_int) . await ,
522518 Some ( Event :: Accept ) => mock. accept ( ) . await ,
523519 Some ( Event :: Send ( msg) ) => mock. send ( & msg) . await ,
524- Some ( Event :: Subscribe ( sub) ) => mock. subscribe ( sub) . await ,
520+ Some ( Event :: Subscribe ( sub, is_last ) ) => mock. subscribe ( sub, is_last ) . await ,
525521 Some ( Event :: Start ) => mock. start ( ) . await ,
526522 Some ( Event :: SendRecord ( rec) ) => mock. send_record ( rec) . await ,
527523 Some ( Event :: Disconnect ) => mock. close ( ) . await ,
@@ -544,8 +540,10 @@ mod tests {
544540 . unwrap ( ) ;
545541 }
546542
547- pub fn expect_subscribe ( & mut self , subscription : Subscription ) {
548- self . send . send ( Event :: Subscribe ( subscription) ) . unwrap ( ) ;
543+ pub fn expect_subscribe ( & mut self , subscription : Subscription , is_last : bool ) {
544+ self . send
545+ . send ( Event :: Subscribe ( subscription, is_last) )
546+ . unwrap ( ) ;
549547 }
550548
551549 pub fn start ( & mut self ) {
@@ -613,7 +611,7 @@ mod tests {
613611 . schema ( Schema :: Ohlcv1M )
614612 . stype_in ( SType :: RawSymbol )
615613 . build ( ) ;
616- fixture. expect_subscribe ( subscription. clone ( ) ) ;
614+ fixture. expect_subscribe ( subscription. clone ( ) , true ) ;
617615 client. subscribe ( subscription) . await . unwrap ( ) ;
618616 fixture. stop ( ) . await ;
619617 }
@@ -628,7 +626,7 @@ mod tests {
628626 . stype_in ( SType :: RawSymbol )
629627 . use_snapshot ( )
630628 . build ( ) ;
631- fixture. expect_subscribe ( subscription. clone ( ) ) ;
629+ fixture. expect_subscribe ( subscription. clone ( ) , true ) ;
632630 client. subscribe ( subscription) . await . unwrap ( ) ;
633631 fixture. stop ( ) . await ;
634632 }
@@ -670,7 +668,10 @@ mod tests {
670668 let mut i = 0 ;
671669 while i < SYMBOL_COUNT {
672670 let chunk_size = 500 . min ( SYMBOL_COUNT - i) ;
673- fixture. expect_subscribe ( sub_base. clone ( ) . symbols ( vec ! [ SYMBOL ; chunk_size] ) . build ( ) ) ;
671+ fixture. expect_subscribe (
672+ sub_base. clone ( ) . symbols ( vec ! [ SYMBOL ; chunk_size] ) . build ( ) ,
673+ i + chunk_size == SYMBOL_COUNT ,
674+ ) ;
674675 i += chunk_size;
675676 }
676677 fixture. stop ( ) . await ;
@@ -823,7 +824,7 @@ mod tests {
823824 . schema ( Schema :: Trades )
824825 . start ( OffsetDateTime :: UNIX_EPOCH )
825826 . build ( ) ;
826- fixture. expect_subscribe ( sub. clone ( ) ) ;
827+ fixture. expect_subscribe ( sub. clone ( ) , true ) ;
827828 client. subscribe ( sub. clone ( ) ) . await . unwrap ( ) ;
828829 fixture. start ( ) ;
829830 let metadata = client. start ( ) . await . unwrap ( ) ;
@@ -863,7 +864,7 @@ mod tests {
863864
864865 let mut resub = sub. clone ( ) ;
865866 resub. start = None ;
866- fixture. expect_subscribe ( resub) ;
867+ fixture. expect_subscribe ( resub, true ) ;
867868 client. resubscribe ( ) . await . unwrap ( ) ;
868869 fixture. start ( ) ;
869870 client. start ( ) . await . unwrap ( ) ;
0 commit comments