Skip to content

Commit ae02d1c

Browse files
authored
Merge pull request kubernetes#120971 from p0lyn0mial/upstream-watchlist-client-go-fallback-integration
integration/apimachinery: add TestReflectorWatchListFallback integration test
2 parents fe84992 + 9b32a47 commit ae02d1c

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package apimachinery
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sort"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/require"
27+
28+
"github.com/google/go-cmp/cmp"
29+
v1 "k8s.io/api/core/v1"
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/util/wait"
33+
"k8s.io/apimachinery/pkg/watch"
34+
"k8s.io/client-go/kubernetes"
35+
"k8s.io/client-go/tools/cache"
36+
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
37+
"k8s.io/kubernetes/test/integration/framework"
38+
)
39+
40+
func TestReflectorWatchListFallback(t *testing.T) {
41+
ctx := context.TODO()
42+
43+
t.Log("Starting etcd that will be used by two different instances of kube-apiserver")
44+
etcdURL, etcdTearDownFn, err := framework.RunCustomEtcd("etcd_watchlist", []string{"--experimental-watch-progress-notify-interval", "1s"}, nil)
45+
require.NoError(t, err)
46+
defer etcdTearDownFn()
47+
etcdOptions := framework.DefaultEtcdOptions()
48+
etcdOptions.StorageConfig.Transport.ServerList = []string{etcdURL}
49+
50+
t.Log("Starting the first server with the WatchList feature enabled")
51+
server1 := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--feature-gates=WatchList=true"}, &etcdOptions.StorageConfig)
52+
defer server1.TearDownFn()
53+
clientSet, err := kubernetes.NewForConfig(server1.ClientConfig)
54+
require.NoError(t, err)
55+
56+
ns := framework.CreateNamespaceOrDie(clientSet, "reflector-fallback-watchlist", t)
57+
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
58+
59+
t.Logf("Adding 5 secrets to %s namespace", ns.Name)
60+
for i := 1; i <= 5; i++ {
61+
_, err := clientSet.CoreV1().Secrets(ns.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
62+
require.NoError(t, err)
63+
}
64+
65+
t.Log("Creating a secret reflector that will use the WatchList feature to synchronise the store")
66+
store := &wrappedStore{Store: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)}
67+
lw := &wrappedListWatch{&cache.ListWatch{}}
68+
lw.SetClient(ctx, clientSet, ns)
69+
target := cache.NewReflector(lw, &v1.Secret{}, store, time.Duration(0))
70+
target.UseWatchList = true
71+
72+
t.Log("Waiting until the secret reflector synchronises to the store (call to the Replace method)")
73+
reflectorCtx, reflectorCtxCancel := context.WithCancel(context.Background())
74+
defer reflectorCtxCancel()
75+
store.setCancelOnReplace(reflectorCtxCancel)
76+
err = target.ListAndWatch(reflectorCtx.Done())
77+
require.NoError(t, err)
78+
79+
t.Log("Verifying if the secret reflector was properly synchronised")
80+
verifyStore(t, ctx, clientSet, store, ns)
81+
82+
t.Log("Starting the second server with the WatchList feature disabled")
83+
server2 := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--feature-gates=WatchList=false"}, &etcdOptions.StorageConfig)
84+
defer server2.TearDownFn()
85+
clientSet2, err := kubernetes.NewForConfig(server2.ClientConfig)
86+
require.NoError(t, err)
87+
88+
t.Log("Pointing the ListWatcher used by the secret reflector to the second server (with the WatchList feature disabled)")
89+
lw.SetClient(ctx, clientSet2, ns)
90+
reflectorCtx, reflectorCtxCancel = context.WithCancel(context.Background())
91+
defer reflectorCtxCancel()
92+
store.setCancelOnReplace(reflectorCtxCancel)
93+
err = target.ListAndWatch(reflectorCtx.Done())
94+
require.NoError(t, err)
95+
96+
t.Log("Verifying if the secret reflector was properly synchronised")
97+
verifyStore(t, ctx, clientSet, store, ns)
98+
}
99+
100+
// TODO(#115478): refactor with e2e/apimachinery/watchlist
101+
func verifyStore(t *testing.T, ctx context.Context, clientSet kubernetes.Interface, store cache.Store, namespace *v1.Namespace) {
102+
t.Logf("Listing secrets directly from the server from %s namespace", namespace.Name)
103+
expectedSecretsList, err := clientSet.CoreV1().Secrets(namespace.Name).List(ctx, metav1.ListOptions{})
104+
require.NoError(t, err)
105+
expectedSecrets := expectedSecretsList.Items
106+
107+
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
108+
t.Log("Comparing secrets retrieved directly from the server with the ones that have been streamed to the secret reflector")
109+
rawStreamedSecrets := store.List()
110+
streamedSecrets := make([]v1.Secret, 0, len(rawStreamedSecrets))
111+
for _, rawSecret := range rawStreamedSecrets {
112+
streamedSecrets = append(streamedSecrets, *rawSecret.(*v1.Secret))
113+
}
114+
sort.Sort(byName(expectedSecrets))
115+
sort.Sort(byName(streamedSecrets))
116+
return cmp.Equal(expectedSecrets, streamedSecrets), nil
117+
})
118+
require.NoError(t, err)
119+
}
120+
121+
type byName []v1.Secret
122+
123+
func (a byName) Len() int { return len(a) }
124+
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
125+
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
126+
127+
func newSecret(name string) *v1.Secret {
128+
return &v1.Secret{
129+
ObjectMeta: metav1.ObjectMeta{Name: name},
130+
}
131+
}
132+
133+
type wrappedStore struct {
134+
cache.Store
135+
ctxCancel context.CancelFunc
136+
}
137+
138+
func (s *wrappedStore) Replace(items []interface{}, rv string) error {
139+
s.ctxCancel()
140+
return s.Store.Replace(items, rv)
141+
}
142+
143+
func (s *wrappedStore) setCancelOnReplace(ctxCancel context.CancelFunc) {
144+
s.ctxCancel = ctxCancel
145+
}
146+
147+
type wrappedListWatch struct {
148+
*cache.ListWatch
149+
}
150+
151+
func (lw *wrappedListWatch) SetClient(ctx context.Context, clientSet kubernetes.Interface, ns *v1.Namespace) {
152+
lw.ListFunc = func(options metav1.ListOptions) (runtime.Object, error) {
153+
return clientSet.CoreV1().Secrets(ns.Name).List(ctx, options)
154+
}
155+
lw.WatchFunc = func(options metav1.ListOptions) (watch.Interface, error) {
156+
return clientSet.CoreV1().Secrets(ns.Name).Watch(ctx, options)
157+
}
158+
}

0 commit comments

Comments
 (0)