@@ -103,12 +103,12 @@ struct ProtocolStatus {
103
103
enum InboundSubstreamState {
104
104
/// Waiting for a request from the remote.
105
105
WaitingMessage {
106
- /// Whether it is the first message to be awaited on this stream.
107
- first : bool ,
106
+ /// How long before we give up on waiting for the first request on this stream.
107
+ /// `None` if there has already been a request on this stream, or it has timed out and can
108
+ /// be re-used.
109
+ first_request_timeout : Option < Delay > ,
108
110
connection_id : UniqueConnecId ,
109
111
substream : KadInStreamSink < Stream > ,
110
- /// How long before we give up on waiting for a request.
111
- request_timeout : Delay ,
112
112
} ,
113
113
/// Waiting for the behaviour to send a [`HandlerIn`] event containing the response.
114
114
WaitingBehaviour (
@@ -191,10 +191,9 @@ impl InboundSubstreamState {
191
191
} ,
192
192
) {
193
193
InboundSubstreamState :: WaitingMessage {
194
+ first_request_timeout : _,
194
195
substream,
195
- first : _,
196
196
connection_id : _,
197
- request_timeout : _,
198
197
}
199
198
| InboundSubstreamState :: WaitingBehaviour ( _, substream, _, _)
200
199
| InboundSubstreamState :: PendingSend ( _, substream, _, _)
@@ -559,7 +558,10 @@ impl Handler {
559
558
matches ! (
560
559
s,
561
560
// An inbound substream waiting to be reused.
562
- InboundSubstreamState :: WaitingMessage { first: false , .. }
561
+ InboundSubstreamState :: WaitingMessage {
562
+ first_request_timeout: None ,
563
+ ..
564
+ }
563
565
)
564
566
} ) {
565
567
* s = InboundSubstreamState :: Cancelled ;
@@ -582,10 +584,9 @@ impl Handler {
582
584
self . next_connec_unique_id . 0 += 1 ;
583
585
self . inbound_substreams
584
586
. push ( InboundSubstreamState :: WaitingMessage {
585
- first : true ,
587
+ first_request_timeout : Some ( Delay :: new ( SUBSTREAM_TIMEOUT ) ) ,
586
588
connection_id : connec_unique_id,
587
589
substream : protocol,
588
- request_timeout : Delay :: new ( SUBSTREAM_TIMEOUT ) ,
589
590
} ) ;
590
591
}
591
592
@@ -921,13 +922,12 @@ impl futures::Stream for InboundSubstreamState {
921
922
} ,
922
923
) {
923
924
InboundSubstreamState :: WaitingMessage {
924
- first ,
925
+ mut first_request_timeout ,
925
926
connection_id,
926
927
mut substream,
927
- mut request_timeout,
928
928
} => match (
929
929
substream. poll_next_unpin ( cx) ,
930
- request_timeout . poll_unpin ( cx) ,
930
+ first_request_timeout . as_mut ( ) . map ( |t| t . poll_unpin ( cx) ) ,
931
931
) {
932
932
// Prefer ready requests over ready timeouts
933
933
( Poll :: Ready ( Some ( Ok ( KadRequestMsg :: Ping ) ) ) , _) => {
@@ -971,13 +971,11 @@ impl futures::Stream for InboundSubstreamState {
971
971
) ) ) ;
972
972
}
973
973
( Poll :: Ready ( Some ( Ok ( KadRequestMsg :: AddProvider { key, provider } ) ) ) , _) => {
974
- // The request has finished, so renew the request timeout
975
- let request_timeout = Delay :: new ( SUBSTREAM_TIMEOUT ) ;
974
+ // This request type requires no response
976
975
* this = InboundSubstreamState :: WaitingMessage {
977
- first : false ,
976
+ first_request_timeout : None ,
978
977
connection_id,
979
978
substream,
980
- request_timeout,
981
979
} ;
982
980
return Poll :: Ready ( Some ( ConnectionHandlerEvent :: NotifyBehaviour (
983
981
HandlerEvent :: AddProvider { key, provider } ,
@@ -1017,30 +1015,28 @@ impl futures::Stream for InboundSubstreamState {
1017
1015
} ,
1018
1016
) ) ) ;
1019
1017
}
1020
- ( Poll :: Pending , Poll :: Pending ) => {
1018
+ ( Poll :: Pending , Some ( Poll :: Pending ) ) | ( Poll :: Pending , None ) => {
1021
1019
// Keep the original request timeout
1022
1020
* this = InboundSubstreamState :: WaitingMessage {
1023
- first ,
1021
+ first_request_timeout ,
1024
1022
connection_id,
1025
1023
substream,
1026
- request_timeout,
1027
1024
} ;
1028
1025
return Poll :: Pending ;
1029
1026
}
1030
- ( Poll :: Pending , Poll :: Ready ( ( ) ) ) => {
1027
+ ( Poll :: Pending , Some ( Poll :: Ready ( ( ) ) ) ) => {
1031
1028
tracing:: debug!(
1032
- ? first,
1029
+ first = ?first_request_timeout . is_some ( ) ,
1033
1030
?connection_id,
1034
1031
"Inbound substream timed out waiting for request" ,
1035
1032
) ;
1036
1033
1037
- // Renew the request timeout, but mark this substream as available for re-use
1038
- let request_timeout = Delay :: new ( SUBSTREAM_TIMEOUT ) ;
1034
+ // Drop the first request timeout, and mark this substream as available for
1035
+ // re-use
1039
1036
* this = InboundSubstreamState :: WaitingMessage {
1040
- first : false ,
1037
+ first_request_timeout : None ,
1041
1038
connection_id,
1042
1039
substream,
1043
- request_timeout,
1044
1040
} ;
1045
1041
return Poll :: Pending ;
1046
1042
}
@@ -1086,7 +1082,12 @@ impl futures::Stream for InboundSubstreamState {
1086
1082
return Poll :: Pending ;
1087
1083
}
1088
1084
} ,
1089
- InboundSubstreamState :: PendingSend ( id, mut substream, msg, mut response_timeout) => {
1085
+ InboundSubstreamState :: PendingSend (
1086
+ id,
1087
+ mut substream,
1088
+ msg,
1089
+ mut response_timeout,
1090
+ ) => {
1090
1091
match (
1091
1092
substream. poll_ready_unpin ( cx) ,
1092
1093
response_timeout. poll_unpin ( cx) ,
@@ -1130,10 +1131,9 @@ impl futures::Stream for InboundSubstreamState {
1130
1131
) {
1131
1132
( Poll :: Ready ( Ok ( ( ) ) ) , _) => {
1132
1133
* this = InboundSubstreamState :: WaitingMessage {
1133
- first : false ,
1134
+ first_request_timeout : None ,
1134
1135
connection_id : id,
1135
1136
substream,
1136
- request_timeout : Delay :: new ( SUBSTREAM_TIMEOUT ) ,
1137
1137
} ;
1138
1138
}
1139
1139
( Poll :: Pending , Poll :: Pending ) => {
0 commit comments