@@ -19,6 +19,8 @@ package apimachinery
19
19
import (
20
20
"encoding/json"
21
21
"fmt"
22
+ "io/ioutil"
23
+ "net/http"
22
24
"regexp"
23
25
"strings"
24
26
"time"
@@ -35,6 +37,7 @@ import (
35
37
"k8s.io/apimachinery/pkg/util/wait"
36
38
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
37
39
k8sclientset "k8s.io/client-go/kubernetes"
40
+ "k8s.io/client-go/rest"
38
41
openapiutil "k8s.io/kube-openapi/pkg/util"
39
42
"k8s.io/kubernetes/test/e2e/framework"
40
43
"k8s.io/kubernetes/test/utils/crd"
@@ -389,61 +392,102 @@ func patchSchema(schema []byte, crd *crd.TestCrd) error {
389
392
return err
390
393
}
391
394
395
+ const waitSuccessThreshold = 10
396
+
397
+ // mustSucceedMultipleTimes calls f multiple times on success and only returns true if all calls are successful.
398
+ // This is necessary to avoid flaking tests where one call might hit a good apiserver while in HA other apiservers
399
+ // might be lagging behind. Calling f multiple times reduces the chance exponentially.
400
+ func mustSucceedMultipleTimes (n int , f func () (bool , error )) func () (bool , error ) {
401
+ return func () (bool , error ) {
402
+ for i := 0 ; i < n ; i ++ {
403
+ ok , err := f ()
404
+ if err != nil || ! ok {
405
+ return ok , err
406
+ }
407
+ }
408
+ return true , nil
409
+ }
410
+ }
411
+
392
412
// waitForDefinition waits for given definition showing up in swagger with given schema
393
413
func waitForDefinition (c k8sclientset.Interface , name string , schema []byte ) error {
394
414
expect := spec.Schema {}
395
415
if err := convertJSONSchemaProps (schema , & expect ); err != nil {
396
416
return err
397
417
}
398
418
399
- lastMsg := ""
400
- if err := wait .Poll (500 * time .Millisecond , 10 * time .Second , func () (bool , error ) {
401
- bs , err := c .CoreV1 ().RESTClient ().Get ().AbsPath ("openapi" , "v2" ).DoRaw ()
402
- if err != nil {
403
- return false , err
404
- }
405
- spec := spec.Swagger {}
406
- if err := json .Unmarshal (bs , & spec ); err != nil {
407
- return false , err
408
- }
419
+ err := waitForOpenAPISchema (c , func (spec * spec.Swagger ) (bool , string ) {
409
420
d , ok := spec .SwaggerProps .Definitions [name ]
410
421
if ! ok {
411
- lastMsg = fmt .Sprintf ("spec.SwaggerProps.Definitions[\" %s\" ] not found" , name )
412
- return false , nil
422
+ return false , fmt .Sprintf ("spec.SwaggerProps.Definitions[\" %s\" ] not found" , name )
413
423
}
414
424
// drop properties and extension that we added
415
425
dropDefaults (& d )
416
426
if ! apiequality .Semantic .DeepEqual (expect , d ) {
417
- lastMsg = fmt .Sprintf ("spec.SwaggerProps.Definitions[\" %s\" ] not match; expect: %v, actual: %v" , name , expect , d )
418
- return false , nil
427
+ return false , fmt .Sprintf ("spec.SwaggerProps.Definitions[\" %s\" ] not match; expect: %v, actual: %v" , name , expect , d )
419
428
}
420
- return true , nil
421
- }); err != nil {
422
- return fmt .Errorf ("failed to wait for definition %s to be served: %v; lastMsg: %s" , name , err , lastMsg )
429
+ return true , ""
430
+ })
431
+ if err != nil {
432
+ return fmt .Errorf ("failed to wait for definition %q to be served with the right OpenAPI schema: %v" , name , err )
423
433
}
424
434
return nil
425
435
}
426
436
427
437
// waitForDefinitionCleanup waits for given definition to be removed from swagger
428
438
func waitForDefinitionCleanup (c k8sclientset.Interface , name string ) error {
439
+ err := waitForOpenAPISchema (c , func (spec * spec.Swagger ) (bool , string ) {
440
+ if _ , ok := spec .SwaggerProps .Definitions [name ]; ok {
441
+ return false , fmt .Sprintf ("spec.SwaggerProps.Definitions[\" %s\" ] still exists" , name )
442
+ }
443
+ return true , ""
444
+ })
445
+ if err != nil {
446
+ return fmt .Errorf ("failed to wait for definition %q not to be served anymore: %v" , name , err )
447
+ }
448
+ return nil
449
+ }
450
+
451
+ func waitForOpenAPISchema (c k8sclientset.Interface , pred func (* spec.Swagger ) (bool , string )) error {
452
+ client := c .CoreV1 ().RESTClient ().(* rest.RESTClient ).Client
453
+ url := c .CoreV1 ().RESTClient ().Get ().AbsPath ("openapi" , "v2" ).URL ()
429
454
lastMsg := ""
430
- if err := wait .Poll (500 * time .Millisecond , 10 * time .Second , func () (bool , error ) {
431
- bs , err := c .CoreV1 ().RESTClient ().Get ().AbsPath ("openapi" , "v2" ).DoRaw ()
455
+ etag := ""
456
+ var etagSpec * spec.Swagger
457
+ if err := wait .Poll (500 * time .Millisecond , wait .ForeverTestTimeout , mustSucceedMultipleTimes (waitSuccessThreshold , func () (bool , error ) {
458
+ // download spec with etag support
459
+ spec := & spec.Swagger {}
460
+ req , err := http .NewRequest ("GET" , url .String (), nil )
432
461
if err != nil {
433
462
return false , err
434
463
}
435
- spec := spec.Swagger {}
436
- if err := json .Unmarshal (bs , & spec ); err != nil {
464
+ req .Close = true // enforce a new connection to hit different HA API servers
465
+ if len (etag ) > 0 {
466
+ req .Header .Set ("If-None-Match" , fmt .Sprintf (`"%s"` , etag ))
467
+ }
468
+ resp , err := client .Do (req )
469
+ if err != nil {
437
470
return false , err
438
471
}
439
- _ , ok := spec .SwaggerProps .Definitions [name ]
440
- if ok {
441
- lastMsg = fmt .Sprintf ("spec.SwaggerProps.Definitions[\" %s\" ] still exists" , name )
442
- return false , nil
472
+ defer resp .Body .Close ()
473
+ if resp .StatusCode == http .StatusNotModified {
474
+ spec = etagSpec
475
+ } else if resp .StatusCode != http .StatusOK {
476
+ return false , fmt .Errorf ("unexpected response: %d" , resp .StatusCode )
477
+ } else if bs , err := ioutil .ReadAll (resp .Body ); err != nil {
478
+ return false , err
479
+ } else if err := json .Unmarshal (bs , spec ); err != nil {
480
+ return false , err
481
+ } else {
482
+ etag = strings .Trim (resp .Header .Get ("ETag" ), `"` )
483
+ etagSpec = spec
443
484
}
444
- return true , nil
445
- }); err != nil {
446
- return fmt .Errorf ("failed to wait for definition %s to be removed: %v; lastMsg: %s" , name , err , lastMsg )
485
+
486
+ var ok bool
487
+ ok , lastMsg = pred (spec )
488
+ return ok , nil
489
+ })); err != nil {
490
+ return fmt .Errorf ("failed to wait for OpenAPI spec validating condition: %v; lastMsg: %s" , err , lastMsg )
447
491
}
448
492
return nil
449
493
}
0 commit comments