11use std:: {
22 borrow:: Borrow ,
3+ error:: Error ,
34 net:: { IpAddr , SocketAddr } ,
45 sync:: {
56 atomic:: { AtomicUsize , Ordering } ,
@@ -20,14 +21,15 @@ use ic_utils::{
2021 call:: { AsyncCall , SyncCall } ,
2122 interfaces:: http_request:: HttpRequestCanister ,
2223} ;
23- use tracing:: { enabled, info, instrument, trace, Level } ;
24+ use tracing:: { enabled, error , info, instrument, trace, Level } ;
2425
2526use crate :: error:: ErrorFactory ;
27+ use crate :: http;
2628use crate :: http:: request:: HttpRequest ;
2729use crate :: http:: response:: { AgentResponseAny , HttpResponse } ;
2830use crate :: {
2931 canister_id,
30- proxy:: { AppState , HandleError , HyperService } ,
32+ proxy:: { AppState , HandleError , HyperService , REQUEST_BODY_SIZE_LIMIT } ,
3133 validate:: Validate ,
3234} ;
3335
@@ -125,9 +127,24 @@ fn remove_hop_headers(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue
125127 . collect ( )
126128}
127129
130+ // Dive into the error chain and figure out if the underlying error was caused by an HTTP2 GOAWAY frame
131+ fn is_h2_goaway ( e : & anyhow:: Error ) -> bool {
132+ if let Some ( AgentError :: TransportError ( e) ) = e. downcast_ref :: < AgentError > ( ) {
133+ if let Some ( e) = e. downcast_ref :: < hyper:: Error > ( ) {
134+ let def_err = h2:: Error :: from ( h2:: Reason :: INTERNAL_ERROR ) ;
135+
136+ if let Some ( e) = e. source ( ) . unwrap_or ( & def_err) . downcast_ref :: < h2:: Error > ( ) {
137+ return e. is_go_away ( ) ;
138+ }
139+ }
140+ }
141+
142+ false
143+ }
144+
128145#[ instrument( level = "info" , skip_all, fields( addr = display( addr) , replica = display( & * args. replica_uri) ) ) ]
129146pub async fn handler < V : Validate , C : HyperService < Body > > (
130- State ( args) : State < Args < V , C > > ,
147+ State ( mut args) : State < Args < V , C > > ,
131148 ConnectInfo ( addr) : ConnectInfo < SocketAddr > ,
132149 uri_canister_id : Option < canister_id:: UriHost > ,
133150 host_canister_id : Option < canister_id:: HostHeader > ,
@@ -137,28 +154,65 @@ pub async fn handler<V: Validate, C: HyperService<Body>>(
137154 let uri_canister_id = uri_canister_id. map ( |v| v. 0 ) ;
138155 let host_canister_id = host_canister_id. map ( |v| v. 0 ) ;
139156 let query_param_canister_id = query_param_canister_id. map ( |v| v. 0 ) ;
140- process_request_inner (
141- request,
142- addr,
143- args. agent ,
144- & args. replica_uri ,
145- args. validator ,
146- args. client ,
147- uri_canister_id
148- . or ( host_canister_id)
149- . or ( query_param_canister_id) ,
150- )
151- . await
152- . handle_error ( args. debug )
157+
158+ // Read the request body into a Vec
159+ let ( parts, body) = request. into_parts ( ) ;
160+ let body = match http:: body:: read_streaming_body ( body, REQUEST_BODY_SIZE_LIMIT ) . await {
161+ Err ( e) => {
162+ error ! ( "Unable to read body: {}" , e) ;
163+ return Response :: builder ( )
164+ . status ( 500 )
165+ . body ( "Error reading body" . into ( ) )
166+ . unwrap ( ) ;
167+ }
168+ Ok ( b) => b,
169+ } ;
170+
171+ let mut retries = 3 ;
172+ loop {
173+ // Create a new request based on the incoming one
174+ let mut request_new = Request :: new ( Body :: from ( body. clone ( ) ) ) ;
175+ * request_new. headers_mut ( ) = parts. headers . clone ( ) ;
176+ * request_new. method_mut ( ) = parts. method . clone ( ) ;
177+ * request_new. uri_mut ( ) = parts. uri . clone ( ) ;
178+
179+ let res = process_request_inner (
180+ request_new,
181+ addr,
182+ & args. agent ,
183+ & args. replica_uri ,
184+ & args. validator ,
185+ & mut args. client ,
186+ uri_canister_id
187+ . or ( host_canister_id)
188+ . or ( query_param_canister_id) ,
189+ )
190+ . await ;
191+
192+ // If we have retries left - check if the underlying reason is a GOAWAY and retry if that's the case.
193+ // GOAWAY is issued when the server is gracefully shutting down and it will not execute the request.
194+ // So we can safely retry the request even if it's not idempotent since it was never worked on in case of GOAWAY.
195+ if retries > 0 {
196+ if let Err ( e) = & res {
197+ if is_h2_goaway ( e) {
198+ retries -= 1 ;
199+ info ! ( "HTTP GOAWAY received, retrying request" ) ;
200+ continue ;
201+ }
202+ }
203+ }
204+
205+ return res. handle_error ( args. debug ) ;
206+ }
153207}
154208
155209async fn process_request_inner (
156210 request : Request < Body > ,
157211 addr : SocketAddr ,
158- agent : Agent ,
212+ agent : & Agent ,
159213 replica_uri : & Uri ,
160- validator : impl Validate ,
161- mut client : impl HyperService < Body > ,
214+ validator : & impl Validate ,
215+ client : & mut impl HyperService < Body > ,
162216 canister_id : Option < Principal > ,
163217) -> Result < Response < Body > , anyhow:: Error > {
164218 let canister_id = match canister_id {
@@ -217,11 +271,12 @@ async fn process_request_inner(
217271 ) ;
218272 }
219273
220- let canister = HttpRequestCanister :: create ( & agent, canister_id) ;
274+ let canister = HttpRequestCanister :: create ( agent, canister_id) ;
221275 let header_fields = http_request
222276 . headers
223277 . iter ( )
224278 . map ( |( name, value) | HeaderField ( name. into ( ) , value. into ( ) ) ) ;
279+
225280 let query_result = canister
226281 . http_request_custom (
227282 & http_request. method ,
@@ -258,7 +313,7 @@ async fn process_request_inner(
258313 agent_response
259314 } ;
260315
261- let http_response = HttpResponse :: from ( ( & agent, agent_response) ) ;
316+ let http_response = HttpResponse :: from ( ( agent, agent_response) ) ;
262317 let mut response_builder =
263318 Response :: builder ( ) . status ( StatusCode :: from_u16 ( http_response. status_code ) ?) ;
264319 for ( name, value) in & http_response. headers {
@@ -270,7 +325,7 @@ async fn process_request_inner(
270325 // and this could cause memory issues and possibly create DOS attack vectors.
271326 let should_validate = !http_response. has_streaming_body && !is_update_call;
272327 if should_validate {
273- let validation = validator. validate ( & agent, & canister_id, & http_request, & http_response) ;
328+ let validation = validator. validate ( agent, & canister_id, & http_request, & http_response) ;
274329
275330 if validation. is_err ( ) {
276331 return Ok ( Response :: builder ( )
0 commit comments