@@ -393,17 +393,14 @@ impl<M: RemoteMessage> NetTx<M> {
393
393
}
394
394
395
395
fn requeue_unacked ( & mut self , unacked : MessageDeque < M > ) {
396
- match ( unacked. back ( ) , self . deque . front ( ) ) {
397
- ( Some ( last) , Some ( first) ) => {
398
- assert ! (
399
- last. seq < first. seq,
400
- "{}: seq should be in ascending order, but got {} vs {}" ,
401
- self . log_id,
402
- last. seq,
403
- first. seq,
404
- ) ;
405
- }
406
- _ => ( ) ,
396
+ if let ( Some ( last) , Some ( first) ) = ( unacked. back ( ) , self . deque . front ( ) ) {
397
+ assert ! (
398
+ last. seq < first. seq,
399
+ "{}: seq should be in ascending order, but got {} vs {}" ,
400
+ self . log_id,
401
+ last. seq,
402
+ first. seq,
403
+ ) ;
407
404
}
408
405
409
406
let mut outbox = unacked;
@@ -537,7 +534,7 @@ impl<M: RemoteMessage> NetTx<M> {
537
534
Some ( msg) => {
538
535
RealClock
539
536
. sleep_until (
540
- msg. received_at . clone ( )
537
+ msg. received_at
541
538
+ config:: global:: get ( config:: MESSAGE_DELIVERY_TIMEOUT ) ,
542
539
)
543
540
. await
@@ -964,10 +961,10 @@ impl<M: RemoteMessage> NetTx<M> {
964
961
}
965
962
} ;
966
963
967
- if !matches ! ( state, State :: Closing { .. } ) {
968
- if let Conn :: Disconnected ( ref mut backoff) = conn {
969
- RealClock . sleep ( backoff . next_backoff ( ) . unwrap ( ) ) . await ;
970
- }
964
+ if !matches ! ( state, State :: Closing { .. } )
965
+ && let Conn :: Disconnected ( ref mut backoff) = conn
966
+ {
967
+ RealClock . sleep ( backoff . next_backoff ( ) . unwrap ( ) ) . await ;
971
968
}
972
969
} ; // loop
973
970
tracing:: debug!( "{log_id}: NetTx exited its loop with state: {state}" ) ;
@@ -1435,13 +1432,10 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
1435
1432
let mut final_ack = final_next. ack ;
1436
1433
// Flush any ongoing write.
1437
1434
if self . write_state . is_writing ( ) {
1438
- match self . write_state . send ( ) . await {
1439
- Ok ( acked_seq) => {
1440
- if acked_seq > final_ack {
1441
- final_ack = acked_seq;
1442
- }
1435
+ if let Ok ( acked_seq) = self . write_state . send ( ) . await {
1436
+ if acked_seq > final_ack {
1437
+ final_ack = acked_seq;
1443
1438
}
1444
- Err ( _) => ( ) ,
1445
1439
} ;
1446
1440
}
1447
1441
// best effort: "flush" any remaining ack before closing this session
@@ -1480,26 +1474,23 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
1480
1474
let Ok ( writer) = replace ( & mut self . write_state , WriteState :: Broken ) . into_idle ( ) else {
1481
1475
panic ! ( "illegal state" ) ;
1482
1476
} ;
1483
- match serialize_response ( NetRxResponse :: Reject ) {
1484
- Ok ( data) => {
1485
- match FrameWrite :: new (
1486
- writer,
1487
- data,
1488
- config:: global:: get ( config:: CODEC_MAX_FRAME_LENGTH ) ,
1489
- ) {
1490
- Ok ( fw) => {
1491
- self . write_state = WriteState :: Writing ( fw, 0 ) ;
1492
- let _ = self . write_state . send ( ) . await ;
1493
- }
1494
- Err ( ( w, e) ) => {
1495
- debug_assert_eq ! ( e. kind( ) , io:: ErrorKind :: InvalidData ) ;
1496
- tracing:: debug!( "failed to create reject frame (should be tiny): {e}" ) ;
1497
- self . write_state = WriteState :: Idle ( w) ;
1498
- // drop the reject; we're closing anyway
1499
- }
1477
+ if let Ok ( data) = serialize_response ( NetRxResponse :: Reject ) {
1478
+ match FrameWrite :: new (
1479
+ writer,
1480
+ data,
1481
+ config:: global:: get ( config:: CODEC_MAX_FRAME_LENGTH ) ,
1482
+ ) {
1483
+ Ok ( fw) => {
1484
+ self . write_state = WriteState :: Writing ( fw, 0 ) ;
1485
+ let _ = self . write_state . send ( ) . await ;
1486
+ }
1487
+ Err ( ( w, e) ) => {
1488
+ debug_assert_eq ! ( e. kind( ) , io:: ErrorKind :: InvalidData ) ;
1489
+ tracing:: debug!( "failed to create reject frame (should be tiny): {e}" ) ;
1490
+ self . write_state = WriteState :: Idle ( w) ;
1491
+ // drop the reject; we're closing anyway
1500
1492
}
1501
1493
}
1502
- Err ( _) => ( ) ,
1503
1494
} ;
1504
1495
}
1505
1496
@@ -2104,7 +2095,7 @@ pub(crate) mod tcp {
2104
2095
type Stream = TcpStream ;
2105
2096
2106
2097
fn dest ( & self ) -> ChannelAddr {
2107
- ChannelAddr :: Tcp ( self . 0 . clone ( ) )
2098
+ ChannelAddr :: Tcp ( self . 0 )
2108
2099
}
2109
2100
2110
2101
async fn connect ( & self ) -> Result < Self :: Stream , ClientError > {
0 commit comments