11use super :: * ;
22use crate :: test_util:: * ;
3+ use :: http:: StatusCode ;
34use linkerd_app_core:: {
45 errors, exp_backoff:: ExponentialBackoff , svc:: NewService , svc:: ServiceExt , trace, Error ,
56} ;
@@ -12,6 +13,9 @@ use tracing::Instrument;
1213const AUTHORITY : & str = "logical.test.svc.cluster.local" ;
1314const PORT : u16 = 666 ;
1415
16+ type Request = http:: Request < http:: BoxBody > ;
17+ type Response = http:: Response < http:: BoxBody > ;
18+
1519#[ tokio:: test( flavor = "current_thread" ) ]
1620async fn routes ( ) {
1721 let _trace = trace:: test:: trace_init ( ) ;
@@ -47,7 +51,7 @@ async fn routes() {
4751
4852 handle. allow ( 1 ) ;
4953 let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
50- serve_req ( & mut handle, http :: Response :: builder ( ) . status ( 200 ) ) . await ;
54+ serve_req ( & mut handle, mk_rsp ( StatusCode :: OK , "good" ) ) . await ;
5155 assert_eq ! (
5256 rsp. await . expect( "request must succeed" ) . status( ) ,
5357 http:: StatusCode :: OK
@@ -108,25 +112,19 @@ async fn consecutive_failures_accrue() {
108112
109113 handle. allow ( 1 ) ;
110114 let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
111- serve_req ( & mut handle, http:: Response :: builder ( ) . status ( 200 ) ) . await ;
112- assert_eq ! (
113- rsp. await . expect( "request must succeed" ) . status( ) ,
114- http:: StatusCode :: OK
115- ) ;
115+ serve_req ( & mut handle, mk_rsp ( StatusCode :: OK , "good" ) ) . await ;
116+ assert_rsp ( rsp, StatusCode :: OK , "good" ) . await ;
116117
117118 // fail 3 requests so that we hit the consecutive failures accrual limit
118119 for _ in 0 ..3 {
119120 handle. allow ( 1 ) ;
120121 let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
121122 serve_req (
122123 & mut handle,
123- http :: Response :: builder ( ) . status ( http :: StatusCode :: INTERNAL_SERVER_ERROR ) ,
124+ mk_rsp ( StatusCode :: INTERNAL_SERVER_ERROR , "bad" ) ,
124125 )
125126 . await ;
126- assert_eq ! (
127- rsp. await . expect( "request should succeed with 500" ) . status( ) ,
128- http:: StatusCode :: INTERNAL_SERVER_ERROR
129- ) ;
127+ assert_rsp ( rsp, StatusCode :: INTERNAL_SERVER_ERROR , "bad" ) . await ;
130128 }
131129
132130 // Ensure that the error is because of the breaker, and not because the
@@ -158,13 +156,10 @@ async fn consecutive_failures_accrue() {
158156 let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
159157 serve_req (
160158 & mut handle,
161- http :: Response :: builder ( ) . status ( http :: StatusCode :: INTERNAL_SERVER_ERROR ) ,
159+ mk_rsp ( StatusCode :: INTERNAL_SERVER_ERROR , "bad" ) ,
162160 )
163161 . await ;
164- assert_eq ! (
165- rsp. await . expect( "request should succeed with 500" ) . status( ) ,
166- http:: StatusCode :: INTERNAL_SERVER_ERROR
167- ) ;
162+ assert_rsp ( rsp, StatusCode :: INTERNAL_SERVER_ERROR , "bad" ) . await ;
168163
169164 // We are now in failfast.
170165 let error = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) )
@@ -182,28 +177,111 @@ async fn consecutive_failures_accrue() {
182177
183178 // The probe request succeeds
184179 let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
185- serve_req (
186- & mut handle,
187- http:: Response :: builder ( ) . status ( http:: StatusCode :: OK ) ,
188- )
189- . await ;
190- assert_eq ! (
191- rsp. await . expect( "request should succeed" ) . status( ) ,
192- http:: StatusCode :: OK
193- ) ;
180+ serve_req ( & mut handle, mk_rsp ( StatusCode :: OK , "good" ) ) . await ;
181+ assert_rsp ( rsp, StatusCode :: OK , "good" ) . await ;
194182
195183 // The gate is now open again
196184 handle. allow ( 1 ) ;
197185 let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
198- serve_req (
199- & mut handle,
200- http:: Response :: builder ( ) . status ( http:: StatusCode :: OK ) ,
186+ serve_req ( & mut handle, mk_rsp ( StatusCode :: OK , "good" ) ) . await ;
187+ assert_rsp ( rsp, StatusCode :: OK , "good" ) . await ;
188+ }
189+
190+ #[ tokio:: test( flavor = "current_thread" , start_paused = true ) ]
191+ async fn balancer_doesnt_select_tripped_breakers ( ) {
192+ let _trace = trace:: test:: with_default_filter ( format ! (
193+ "{},linkerd_app_outbound=trace,linkerd_stack=trace,linkerd2_proxy_http_balance=trace" ,
194+ trace:: test:: DEFAULT_LOG
195+ ) ) ;
196+
197+ let addr1 = SocketAddr :: new ( [ 192 , 0 , 2 , 41 ] . into ( ) , PORT ) ;
198+ let addr2 = SocketAddr :: new ( [ 192 , 0 , 2 , 42 ] . into ( ) , PORT ) ;
199+ let dest: NameAddr = format ! ( "{AUTHORITY}:{PORT}" )
200+ . parse :: < NameAddr > ( )
201+ . expect ( "dest addr is valid" ) ;
202+ let ( svc1, mut handle1) = tower_test:: mock:: pair ( ) ;
203+ let ( svc2, mut handle2) = tower_test:: mock:: pair ( ) ;
204+ let connect = HttpConnect :: default ( )
205+ . service ( addr1, svc1)
206+ . service ( addr2, svc2) ;
207+ let resolve = support:: resolver ( ) ;
208+ let mut dest_tx = resolve. endpoint_tx ( dest. clone ( ) ) ;
209+ dest_tx
210+ . add ( [ ( addr1, Default :: default ( ) ) , ( addr2, Default :: default ( ) ) ] )
211+ . unwrap ( ) ;
212+ let ( rt, _shutdown) = runtime ( ) ;
213+ let cfg = default_config ( ) ;
214+ let stack = Outbound :: new ( cfg. clone ( ) , rt)
215+ . with_stack ( connect)
216+ . push_http_cached ( resolve)
217+ . into_inner ( ) ;
218+
219+ let backend = default_backend ( & dest) ;
220+ // Ensure that the probe delay is longer than the failfast timeout, so that
221+ // the service is only probed after it has entered failfast when the gate
222+ // shuts.
223+ let min_backoff = cfg. http_request_queue . failfast_timeout + Duration :: from_secs ( 1 ) ;
224+ let backoff = ExponentialBackoff :: try_new (
225+ min_backoff,
226+ min_backoff * 6 ,
227+ // no jitter --- ensure the test is deterministic
228+ 0.0 ,
201229 )
202- . await ;
203- assert_eq ! (
204- rsp. await . expect( "request should succeed" ) . status( ) ,
205- http:: StatusCode :: OK
206- ) ;
230+ . unwrap ( ) ;
231+ let ( _route_tx, routes) =
232+ watch:: channel ( Routes :: Policy ( policy:: Params :: Http ( policy:: HttpParams {
233+ addr : dest. into ( ) ,
234+ backends : Arc :: new ( [ backend. clone ( ) ] ) ,
235+ routes : Arc :: new ( [ default_route ( backend) ] ) ,
236+ failure_accrual : client_policy:: FailureAccrual :: ConsecutiveFailures {
237+ max_failures : 3 ,
238+ backoff,
239+ } ,
240+ } ) ) ) ;
241+ let target = Target {
242+ num : 1 ,
243+ version : http:: Version :: H2 ,
244+ routes,
245+ } ;
246+ let svc = stack. new_service ( target) ;
247+
248+ // fail 3 requests so that we hit the consecutive failures accrual limit
249+ let mut failed = 0 ;
250+ while failed < 3 {
251+ handle1. allow ( 1 ) ;
252+ handle2. allow ( 1 ) ;
253+ tracing:: info!( failed) ;
254+ let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
255+ let ( expected_status, expected_body) = tokio:: select! {
256+ _ = serve_req( & mut handle1, mk_rsp( StatusCode :: OK , "endpoint 1" ) ) => {
257+ tracing:: info!( "Balancer selected good endpoint" ) ;
258+ ( StatusCode :: OK , "endpoint 1" )
259+ }
260+ _ = serve_req( & mut handle2, mk_rsp( StatusCode :: INTERNAL_SERVER_ERROR , "endpoint 2" ) ) => {
261+ tracing:: info!( "Balancer selected bad endpoint" ) ;
262+ failed += 1 ;
263+ ( StatusCode :: INTERNAL_SERVER_ERROR , "endpoint 2" )
264+ }
265+ } ;
266+ assert_rsp ( rsp, expected_status, expected_body) . await ;
267+ tokio:: task:: yield_now ( ) . await ;
268+ }
269+
270+ handle1. allow ( 1 ) ;
271+ handle2. allow ( 1 ) ;
272+ let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
273+ // The load balancer will select endpoint 1, because endpoint 2 isn't ready.
274+ serve_req ( & mut handle1, mk_rsp ( StatusCode :: OK , "endpoint 1" ) ) . await ;
275+ assert_rsp ( rsp, StatusCode :: OK , "endpoint 1" ) . await ;
276+
277+ // The load balancer should continue selecting the non-failing endpoint.
278+ for _ in 0 ..8 {
279+ handle1. allow ( 1 ) ;
280+ handle2. allow ( 1 ) ;
281+ let rsp = send_req ( svc. clone ( ) , http:: Request :: get ( "/" ) ) ;
282+ serve_req ( & mut handle1, mk_rsp ( StatusCode :: OK , "endpoint 1" ) ) . await ;
283+ assert_rsp ( rsp, StatusCode :: OK , "endpoint 1" ) . await ;
284+ }
207285}
208286
209287#[ derive( Clone , Debug ) ]
@@ -213,7 +291,7 @@ struct Target {
213291 routes : watch:: Receiver < Routes > ,
214292}
215293
216- type MockSvc = tower_test:: mock:: Mock < http :: Request < http :: BoxBody > , http :: Response < http :: BoxBody > > ;
294+ type MockSvc = tower_test:: mock:: Mock < Request , Response > ;
217295
218296#[ derive( Clone , Default ) ]
219297struct HttpConnect {
@@ -271,15 +349,11 @@ impl<T: svc::Param<Remote<ServerAddr>>> svc::NewService<T> for HttpConnect {
271349
272350#[ track_caller]
273351fn send_req (
274- svc : impl svc:: Service <
275- http:: Request < http:: BoxBody > ,
276- Response = http:: Response < http:: BoxBody > ,
277- Error = Error ,
278- Future = impl Send + ' static ,
279- > + Send
352+ svc : impl svc:: Service < Request , Response = Response , Error = Error , Future = impl Send + ' static >
353+ + Send
280354 + ' static ,
281355 builder : :: http:: request:: Builder ,
282- ) -> impl Future < Output = Result < http :: Response < http :: BoxBody > , Error > > + Send + ' static {
356+ ) -> impl Future < Output = Result < Response , Error > > + Send + ' static {
283357 let mut req = builder. body ( http:: BoxBody :: default ( ) ) . unwrap ( ) ;
284358 let span = tracing:: info_span!(
285359 "send_req" ,
@@ -304,19 +378,36 @@ fn send_req(
304378 async move { rsp. await . expect ( "request task must not panic" ) }
305379}
306380
307- async fn serve_req (
308- handle : & mut tower_test:: mock:: Handle <
309- http:: Request < http:: BoxBody > ,
310- http:: Response < http:: BoxBody > ,
311- > ,
312- rsp : :: http:: response:: Builder ,
313- ) {
381+ fn mk_rsp ( status : StatusCode , body : impl ToString ) -> Response {
382+ http:: Response :: builder ( )
383+ . status ( status)
384+ . body ( http:: BoxBody :: new ( body. to_string ( ) ) )
385+ . unwrap ( )
386+ }
387+
388+ #[ track_caller]
389+ async fn assert_rsp < T : std:: fmt:: Debug > (
390+ rsp : impl Future < Output = Result < Response , Error > > ,
391+ status : StatusCode ,
392+ expected_body : T ,
393+ ) where
394+ bytes:: Bytes : PartialEq < T > ,
395+ {
396+ let rsp = rsp. await . expect ( "response must not fail" ) ;
397+ assert_eq ! ( rsp. status( ) , status, "expected status code to be {status}" ) ;
398+ let body = hyper:: body:: to_bytes ( rsp. into_body ( ) )
399+ . await
400+ . expect ( "body must not fail" ) ;
401+ assert_eq ! ( body, expected_body, "expected body to be {expected_body:?}" ) ;
402+ }
403+
404+ async fn serve_req ( handle : & mut tower_test:: mock:: Handle < Request , Response > , rsp : Response ) {
314405 let ( req, send_rsp) = handle
315406 . next_request ( )
316407 . await
317408 . expect ( "service must receive request" ) ;
318409 tracing:: info!( ?req, "received request" ) ;
319- send_rsp. send_response ( rsp. body ( http :: BoxBody :: default ( ) ) . unwrap ( ) ) ;
410+ send_rsp. send_response ( rsp) ;
320411 tracing:: info!( ?req, "response sent" ) ;
321412}
322413
0 commit comments