@@ -19,11 +19,13 @@ package apimachinery
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ "net/http"
22
23
"sort"
23
24
"time"
24
25
25
26
"github.com/google/go-cmp/cmp"
26
27
"github.com/onsi/ginkgo/v2"
28
+ "github.com/onsi/gomega"
27
29
28
30
v1 "k8s.io/api/core/v1"
29
31
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -32,7 +34,9 @@ import (
32
34
"k8s.io/apimachinery/pkg/watch"
33
35
utilfeature "k8s.io/apiserver/pkg/util/feature"
34
36
clientfeatures "k8s.io/client-go/features"
37
+ "k8s.io/client-go/kubernetes"
35
38
"k8s.io/client-go/tools/cache"
39
+ "k8s.io/client-go/util/consistencydetector"
36
40
"k8s.io/component-base/featuregate"
37
41
featuregatetesting "k8s.io/component-base/featuregate/testing"
38
42
"k8s.io/kubernetes/test/e2e/feature"
@@ -41,7 +45,7 @@ import (
41
45
42
46
var _ = SIGDescribe ("API Streaming (aka. WatchList)" , framework .WithSerial (), feature .WatchList , func () {
43
47
f := framework .NewDefaultFramework ("watchlist" )
44
- ginkgo .It ("should be requested when WatchListClient is enabled" , func (ctx context.Context ) {
48
+ ginkgo .It ("should be requested by informers when WatchListClient is enabled" , func (ctx context.Context ) {
45
49
featuregatetesting .SetFeatureGateDuringTest (ginkgo .GinkgoTB (), utilfeature .DefaultFeatureGate , featuregate .Feature (clientfeatures .WatchListClient ), true )
46
50
stopCh := make (chan struct {})
47
51
defer close (stopCh )
@@ -85,8 +89,56 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
85
89
framework .ExpectNoError (err )
86
90
verifyStore (ctx , f , secretInformer .GetStore ())
87
91
})
92
+ ginkgo .It ("should be requested by client-go's List method when WatchListClient is enabled" , func (ctx context.Context ) {
93
+ featuregatetesting .SetFeatureGateDuringTest (ginkgo .GinkgoTB (), utilfeature .DefaultFeatureGate , featuregate .Feature (clientfeatures .WatchListClient ), true )
94
+
95
+ ginkgo .By (fmt .Sprintf ("Adding 5 secrets to %s namespace" , f .Namespace .Name ))
96
+ var expectedSecrets []v1.Secret
97
+ for i := 1 ; i <= 5 ; i ++ {
98
+ secret , err := f .ClientSet .CoreV1 ().Secrets (f .Namespace .Name ).Create (ctx , newSecret (fmt .Sprintf ("secret-%d" , i )), metav1.CreateOptions {})
99
+ framework .ExpectNoError (err )
100
+ expectedSecrets = append (expectedSecrets , * secret )
101
+ }
102
+
103
+ var actualRequestsMadeByKubeClient []string
104
+ clientConfig := f .ClientConfig ()
105
+ clientConfig .Wrap (func (rt http.RoundTripper ) http.RoundTripper {
106
+ return roundTripFunc (func (req * http.Request ) (* http.Response , error ) {
107
+ actualRequestsMadeByKubeClient = append (actualRequestsMadeByKubeClient , req .URL .RawQuery )
108
+ return rt .RoundTrip (req )
109
+ })
110
+ })
111
+ wrappedKubeClient , err := kubernetes .NewForConfig (clientConfig )
112
+ framework .ExpectNoError (err )
113
+
114
+ ginkgo .By ("Streaming secrets from the server" )
115
+ secretList , err := wrappedKubeClient .CoreV1 ().Secrets (f .Namespace .Name ).List (ctx , metav1.ListOptions {})
116
+ framework .ExpectNoError (err )
117
+
118
+ ginkgo .By ("Verifying if the secret list was properly streamed" )
119
+ streamedSecrets := secretList .Items
120
+ sort .Sort (byName (expectedSecrets ))
121
+ gomega .Expect (cmp .Equal (expectedSecrets , streamedSecrets )).To (gomega .BeTrueBecause ("data received via watchlist must match the added data" ))
122
+
123
+ ginkgo .By ("Verifying if expected requests were sent to the server" )
124
+ expectedRequestMadeByKubeClient := []string {
125
+ // corresponds to a streaming request made by the kube client to stream the secrets
126
+ "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true" ,
127
+ }
128
+ if consistencydetector .IsDataConsistencyDetectionForWatchListEnabled () {
129
+ // corresponds to a standard list request made by the consistency detector build in into the kube client
130
+ expectedRequestMadeByKubeClient = append (expectedRequestMadeByKubeClient , fmt .Sprintf ("resourceVersion=%s&resourceVersionMatch=Exact" , secretList .ResourceVersion ))
131
+ }
132
+ gomega .Expect (actualRequestsMadeByKubeClient ).To (gomega .Equal (expectedRequestMadeByKubeClient ))
133
+ })
88
134
})
89
135
136
+ type roundTripFunc func (req * http.Request ) (* http.Response , error )
137
+
138
+ func (f roundTripFunc ) RoundTrip (req * http.Request ) (* http.Response , error ) {
139
+ return f (req )
140
+ }
141
+
90
142
func verifyStore (ctx context.Context , f * framework.Framework , store cache.Store ) {
91
143
ginkgo .By (fmt .Sprintf ("Listing secrets directly from the server from %s namespace" , f .Namespace .Name ))
92
144
expectedSecretsList , err := f .ClientSet .CoreV1 ().Secrets (f .Namespace .Name ).List (ctx , metav1.ListOptions {})
0 commit comments