@@ -109,12 +109,12 @@ struct ProtocolStatus {
109
109
enum InboundSubstreamState {
110
110
/// Waiting for a request from the remote.
111
111
WaitingMessage {
112
- /// Whether it is the first message to be awaited on this stream.
113
- first : bool ,
112
+ /// How long before we give up on waiting for the first request on this stream.
113
+ /// `None` if there has already been a request on this stream, or it has timed out and can
114
+ /// be re-used.
115
+ first_request_timeout : Option < Delay > ,
114
116
connection_id : UniqueConnecId ,
115
117
substream : KadInStreamSink < Stream > ,
116
- /// How long before we give up on waiting for a request.
117
- request_timeout : Delay ,
118
118
} ,
119
119
/// Waiting for the behaviour to send a [`HandlerIn`] event containing the response.
120
120
WaitingBehaviour (
@@ -198,10 +198,9 @@ impl InboundSubstreamState {
198
198
} ,
199
199
) {
200
200
InboundSubstreamState :: WaitingMessage {
201
+ first_request_timeout : _,
201
202
substream,
202
- first : _,
203
203
connection_id : _,
204
- request_timeout : _,
205
204
}
206
205
| InboundSubstreamState :: WaitingBehaviour ( _, substream, _, _)
207
206
| InboundSubstreamState :: PendingSend ( _, substream, _, _)
@@ -560,7 +559,10 @@ impl Handler {
560
559
matches ! (
561
560
s,
562
561
// An inbound substream waiting to be reused.
563
- InboundSubstreamState :: WaitingMessage { first: false , .. }
562
+ InboundSubstreamState :: WaitingMessage {
563
+ first_request_timeout: None ,
564
+ ..
565
+ }
564
566
)
565
567
} ) {
566
568
* s = InboundSubstreamState :: Cancelled ;
@@ -583,10 +585,9 @@ impl Handler {
583
585
self . next_connec_unique_id . 0 += 1 ;
584
586
self . inbound_substreams
585
587
. push ( InboundSubstreamState :: WaitingMessage {
586
- first : true ,
588
+ first_request_timeout : Some ( Delay :: new ( SUBSTREAM_TIMEOUT ) ) ,
587
589
connection_id : connec_unique_id,
588
590
substream : protocol,
589
- request_timeout : Delay :: new ( SUBSTREAM_TIMEOUT ) ,
590
591
} ) ;
591
592
}
592
593
@@ -916,13 +917,12 @@ impl futures::Stream for InboundSubstreamState {
916
917
} ,
917
918
) {
918
919
InboundSubstreamState :: WaitingMessage {
919
- first ,
920
+ mut first_request_timeout ,
920
921
connection_id,
921
922
mut substream,
922
- mut request_timeout,
923
923
} => match (
924
924
substream. poll_next_unpin ( cx) ,
925
- request_timeout . poll_unpin ( cx) ,
925
+ first_request_timeout . as_mut ( ) . map ( |t| t . poll_unpin ( cx) ) ,
926
926
) {
927
927
// Prefer ready requests over ready timeouts
928
928
( Poll :: Ready ( Some ( Ok ( KadRequestMsg :: Ping ) ) ) , _) => {
@@ -966,13 +966,11 @@ impl futures::Stream for InboundSubstreamState {
966
966
) ) ) ;
967
967
}
968
968
( Poll :: Ready ( Some ( Ok ( KadRequestMsg :: AddProvider { key, provider } ) ) ) , _) => {
969
- // The request has finished, so renew the request timeout
970
- let request_timeout = Delay :: new ( SUBSTREAM_TIMEOUT ) ;
969
+ // This request type requires no response
971
970
* this = InboundSubstreamState :: WaitingMessage {
972
- first : false ,
971
+ first_request_timeout : None ,
973
972
connection_id,
974
973
substream,
975
- request_timeout,
976
974
} ;
977
975
return Poll :: Ready ( Some ( ConnectionHandlerEvent :: NotifyBehaviour (
978
976
HandlerEvent :: AddProvider { key, provider } ,
@@ -1012,30 +1010,28 @@ impl futures::Stream for InboundSubstreamState {
1012
1010
} ,
1013
1011
) ) ) ;
1014
1012
}
1015
- ( Poll :: Pending , Poll :: Pending ) => {
1013
+ ( Poll :: Pending , Some ( Poll :: Pending ) ) | ( Poll :: Pending , None ) => {
1016
1014
// Keep the original request timeout
1017
1015
* this = InboundSubstreamState :: WaitingMessage {
1018
- first ,
1016
+ first_request_timeout ,
1019
1017
connection_id,
1020
1018
substream,
1021
- request_timeout,
1022
1019
} ;
1023
1020
return Poll :: Pending ;
1024
1021
}
1025
- ( Poll :: Pending , Poll :: Ready ( ( ) ) ) => {
1022
+ ( Poll :: Pending , Some ( Poll :: Ready ( ( ) ) ) ) => {
1026
1023
tracing:: debug!(
1027
- ? first,
1024
+ first = ?first_request_timeout . is_some ( ) ,
1028
1025
?connection_id,
1029
1026
"Inbound substream timed out waiting for request" ,
1030
1027
) ;
1031
1028
1032
- // Renew the request timeout, but mark this substream as available for re-use
1033
- let request_timeout = Delay :: new ( SUBSTREAM_TIMEOUT ) ;
1029
+ // Drop the first request timeout, and mark this substream as available for
1030
+ // re-use
1034
1031
* this = InboundSubstreamState :: WaitingMessage {
1035
- first : false ,
1032
+ first_request_timeout : None ,
1036
1033
connection_id,
1037
1034
substream,
1038
- request_timeout,
1039
1035
} ;
1040
1036
return Poll :: Pending ;
1041
1037
}
@@ -1130,10 +1126,9 @@ impl futures::Stream for InboundSubstreamState {
1130
1126
) {
1131
1127
( Poll :: Ready ( Ok ( ( ) ) ) , _) => {
1132
1128
* this = InboundSubstreamState :: WaitingMessage {
1133
- first : false ,
1129
+ first_request_timeout : None ,
1134
1130
connection_id : id,
1135
1131
substream,
1136
- request_timeout : Delay :: new ( SUBSTREAM_TIMEOUT ) ,
1137
1132
} ;
1138
1133
}
1139
1134
( Poll :: Pending , Poll :: Pending ) => {
0 commit comments