Skip to content

Commit 52fb02f

Browse files
authored
Merge pull request kubernetes#87718 from wojtek-t/kubelet_not_watching_immutable_secret_configmaps
WatchBasedManager stops watching immutable objects
2 parents ba10669 + b11b7d3 commit 52fb02f

File tree

6 files changed

+226
-15
lines changed

6 files changed

+226
-15
lines changed

pkg/kubelet/configmap/configmap_manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,14 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
145145
newConfigMap := func() runtime.Object {
146146
return &v1.ConfigMap{}
147147
}
148+
isImmutable := func(object runtime.Object) bool {
149+
if configMap, ok := object.(*v1.ConfigMap); ok {
150+
return configMap.Immutable != nil && *configMap.Immutable
151+
}
152+
return false
153+
}
148154
gr := corev1.Resource("configmap")
149155
return &configMapManager{
150-
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, gr, getConfigMapNames),
156+
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames),
151157
}
152158
}

pkg/kubelet/secret/secret_manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,14 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
146146
newSecret := func() runtime.Object {
147147
return &v1.Secret{}
148148
}
149+
isImmutable := func(object runtime.Object) bool {
150+
if secret, ok := object.(*v1.Secret); ok {
151+
return secret.Immutable != nil && *secret.Immutable
152+
}
153+
return false
154+
}
149155
gr := corev1.Resource("secret")
150156
return &secretManager{
151-
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, gr, getSecretNames),
157+
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames),
152158
}
153159
}

pkg/kubelet/util/manager/BUILD

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager",
1111
visibility = ["//visibility:public"],
1212
deps = [
13+
"//pkg/features:go_default_library",
1314
"//pkg/kubelet/util:go_default_library",
1415
"//staging/src/k8s.io/api/core/v1:go_default_library",
1516
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@@ -22,7 +23,9 @@ go_library(
2223
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
2324
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
2425
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
26+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
2527
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
28+
"//vendor/k8s.io/klog:go_default_library",
2629
],
2730
)
2831

@@ -36,17 +39,21 @@ go_test(
3639
deps = [
3740
"//pkg/api/v1/pod:go_default_library",
3841
"//pkg/apis/core/v1:go_default_library",
42+
"//pkg/features:go_default_library",
3943
"//staging/src/k8s.io/api/core/v1:go_default_library",
44+
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
4045
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
4146
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
4247
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
4348
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
4449
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
4550
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
4651
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
52+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
4753
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
4854
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
4955
"//staging/src/k8s.io/client-go/testing:go_default_library",
56+
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
5057
"//vendor/github.com/stretchr/testify/assert:go_default_library",
5158
],
5259
)

pkg/kubelet/util/manager/watch_based_manager.go

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,6 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// TODO: We did some scalability tests and using watchBasedManager
18-
// seems to help with apiserver performance at scale visibly.
19-
// No issues we also observed at the scale of ~200k watchers with a
20-
// single apiserver.
21-
// However, we need to perform more extensive testing before we
22-
// enable this in production setups.
23-
2417
package manager
2518

2619
import (
@@ -31,6 +24,8 @@ import (
3124
"k8s.io/api/core/v1"
3225
"k8s.io/client-go/tools/cache"
3326

27+
"k8s.io/klog"
28+
3429
apierrors "k8s.io/apimachinery/pkg/api/errors"
3530
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3631
"k8s.io/apimachinery/pkg/fields"
@@ -39,18 +34,37 @@ import (
3934
"k8s.io/apimachinery/pkg/util/sets"
4035
"k8s.io/apimachinery/pkg/util/wait"
4136
"k8s.io/apimachinery/pkg/watch"
37+
utilfeature "k8s.io/apiserver/pkg/util/feature"
38+
"k8s.io/kubernetes/pkg/features"
4239
)
4340

4441
type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
4542
type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
4643
type newObjectFunc func() runtime.Object
44+
type isImmutableFunc func(runtime.Object) bool
4745

4846
// objectCacheItem is a single item stored in objectCache.
4947
type objectCacheItem struct {
5048
refCount int
5149
store cache.Store
5250
hasSynced func() (bool, error)
53-
stopCh chan struct{}
51+
52+
// lock is protecting from closing stopCh multiple times.
53+
lock sync.Mutex
54+
stopCh chan struct{}
55+
}
56+
57+
func (i *objectCacheItem) stop() bool {
58+
i.lock.Lock()
59+
defer i.lock.Unlock()
60+
select {
61+
case <-i.stopCh:
62+
// This means that channel is already closed.
63+
return false
64+
default:
65+
close(i.stopCh)
66+
return true
67+
}
5468
}
5569

5670
// objectCache is a local cache of objects propagated via
@@ -59,18 +73,25 @@ type objectCache struct {
5973
listObject listObjectFunc
6074
watchObject watchObjectFunc
6175
newObject newObjectFunc
76+
isImmutable isImmutableFunc
6277
groupResource schema.GroupResource
6378

6479
lock sync.RWMutex
6580
items map[objectKey]*objectCacheItem
6681
}
6782

6883
// NewObjectCache returns a new watch-based instance of Store interface.
69-
func NewObjectCache(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource) Store {
84+
func NewObjectCache(
85+
listObject listObjectFunc,
86+
watchObject watchObjectFunc,
87+
newObject newObjectFunc,
88+
isImmutable isImmutableFunc,
89+
groupResource schema.GroupResource) Store {
7090
return &objectCache{
7191
listObject: listObject,
7292
watchObject: watchObject,
7393
newObject: newObject,
94+
isImmutable: isImmutable,
7495
groupResource: groupResource,
7596
items: make(map[objectKey]*objectCacheItem),
7697
}
@@ -140,7 +161,7 @@ func (c *objectCache) DeleteReference(namespace, name string) {
140161
item.refCount--
141162
if item.refCount == 0 {
142163
// Stop the underlying reflector.
143-
close(item.stopCh)
164+
item.stop()
144165
delete(c.items, key)
145166
}
146167
}
@@ -177,6 +198,21 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
177198
return nil, apierrors.NewNotFound(c.groupResource, name)
178199
}
179200
if object, ok := obj.(runtime.Object); ok {
201+
// If the returned object is immutable, stop the reflector.
202+
//
203+
// NOTE: we may potentially not even start the reflector if the object is
204+
// already immutable. However, given that:
205+
// - we want to also handle the case when object is marked as immutable later
206+
// - Secrets and ConfigMaps are periodically fetched by volumemanager anyway
207+
// - doing that wouldn't provide visible scalability/performance gain - we
208+
// already have it from here
209+
// - doing that would require significant refactoring to reflector
210+
// we limit ourselves to just quickly stop the reflector here.
211+
if utilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes) && c.isImmutable(object) {
212+
if item.stop() {
213+
klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name)
214+
}
215+
}
180216
return object, nil
181217
}
182218
return nil, fmt.Errorf("unexpected object type: %v", obj)
@@ -188,7 +224,13 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
188224
// - whenever a pod is created or updated, we start individual watches for all
189225
// referenced objects that aren't referenced from other registered pods
190226
// - every GetObject() returns a value from local cache propagated via watches
191-
func NewWatchBasedManager(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource, getReferencedObjects func(*v1.Pod) sets.String) Manager {
192-
objectStore := NewObjectCache(listObject, watchObject, newObject, groupResource)
227+
func NewWatchBasedManager(
228+
listObject listObjectFunc,
229+
watchObject watchObjectFunc,
230+
newObject newObjectFunc,
231+
isImmutable isImmutableFunc,
232+
groupResource schema.GroupResource,
233+
getReferencedObjects func(*v1.Pod) sets.String) Manager {
234+
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource)
193235
return NewCacheBasedManager(objectStore, getReferencedObjects)
194236
}

pkg/kubelet/util/manager/watch_based_manager_test.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,21 @@ import (
2424
"time"
2525

2626
"k8s.io/api/core/v1"
27+
apiequality "k8s.io/apimachinery/pkg/api/equality"
2728
apierrors "k8s.io/apimachinery/pkg/api/errors"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/apimachinery/pkg/runtime"
3031
"k8s.io/apimachinery/pkg/util/wait"
3132
"k8s.io/apimachinery/pkg/watch"
3233

34+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3335
clientset "k8s.io/client-go/kubernetes"
3436
"k8s.io/client-go/kubernetes/fake"
3537
core "k8s.io/client-go/testing"
38+
featuregatetesting "k8s.io/component-base/featuregate/testing"
3639

3740
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
41+
"k8s.io/kubernetes/pkg/features"
3842

3943
"github.com/stretchr/testify/assert"
4044
)
@@ -51,11 +55,19 @@ func watchSecret(fakeClient clientset.Interface) watchObjectFunc {
5155
}
5256
}
5357

58+
func isSecretImmutable(object runtime.Object) bool {
59+
if secret, ok := object.(*v1.Secret); ok {
60+
return secret.Immutable != nil && *secret.Immutable
61+
}
62+
return false
63+
}
64+
5465
func newSecretCache(fakeClient clientset.Interface) *objectCache {
5566
return &objectCache{
5667
listObject: listSecret(fakeClient),
5768
watchObject: watchSecret(fakeClient),
5869
newObject: func() runtime.Object { return &v1.Secret{} },
70+
isImmutable: isSecretImmutable,
5971
groupResource: corev1.Resource("secret"),
6072
items: make(map[objectKey]*objectCacheItem),
6173
}
@@ -183,3 +195,138 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
183195
actions = fakeClient.Actions()
184196
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
185197
}
198+
199+
func TestImmutableSecretStopsTheReflector(t *testing.T) {
200+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ImmutableEphemeralVolumes, true)()
201+
202+
secret := func(rv string, immutable bool) *v1.Secret {
203+
result := &v1.Secret{
204+
ObjectMeta: metav1.ObjectMeta{
205+
Name: "name",
206+
Namespace: "ns",
207+
ResourceVersion: rv,
208+
},
209+
}
210+
if immutable {
211+
trueVal := true
212+
result.Immutable = &trueVal
213+
}
214+
return result
215+
}
216+
217+
tests := []struct {
218+
desc string
219+
initial *v1.Secret
220+
eventual *v1.Secret
221+
}{
222+
{
223+
desc: "secret doesn't exist, created as mutable",
224+
initial: nil,
225+
eventual: secret("200", false),
226+
},
227+
{
228+
desc: "secret doesn't exist, created as immutable",
229+
initial: nil,
230+
eventual: secret("200", true),
231+
},
232+
{
233+
desc: "mutable secret modified to mutable",
234+
initial: secret("100", false),
235+
eventual: secret("200", false),
236+
},
237+
{
238+
desc: "mutable secret modified to immutable",
239+
initial: secret("100", false),
240+
eventual: secret("200", true),
241+
},
242+
{
243+
desc: "immutable secret",
244+
initial: secret("100", true),
245+
eventual: nil,
246+
},
247+
}
248+
249+
for _, tc := range tests {
250+
t.Run(tc.desc, func(t *testing.T) {
251+
fakeClient := &fake.Clientset{}
252+
listReactor := func(a core.Action) (bool, runtime.Object, error) {
253+
result := &v1.SecretList{
254+
ListMeta: metav1.ListMeta{
255+
ResourceVersion: "100",
256+
},
257+
}
258+
if tc.initial != nil {
259+
result.Items = []v1.Secret{*tc.initial}
260+
}
261+
return true, result, nil
262+
}
263+
fakeClient.AddReactor("list", "secrets", listReactor)
264+
fakeWatch := watch.NewFake()
265+
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
266+
267+
store := newSecretCache(fakeClient)
268+
269+
key := objectKey{namespace: "ns", name: "name"}
270+
itemExists := func() (bool, error) {
271+
store.lock.Lock()
272+
defer store.lock.Unlock()
273+
_, ok := store.items[key]
274+
return ok, nil
275+
}
276+
reflectorRunning := func() bool {
277+
store.lock.Lock()
278+
defer store.lock.Unlock()
279+
item := store.items[key]
280+
281+
item.lock.Lock()
282+
defer item.lock.Unlock()
283+
select {
284+
case <-item.stopCh:
285+
return false
286+
default:
287+
return true
288+
}
289+
}
290+
291+
// AddReference should start reflector.
292+
store.AddReference("ns", "name")
293+
if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil {
294+
t.Errorf("item wasn't added to cache")
295+
}
296+
297+
obj, err := store.Get("ns", "name")
298+
if tc.initial != nil {
299+
assert.True(t, apiequality.Semantic.DeepEqual(tc.initial, obj))
300+
} else {
301+
assert.True(t, apierrors.IsNotFound(err))
302+
}
303+
304+
// Reflector should already be stopped for immutable secrets.
305+
assert.Equal(t, tc.initial == nil || !isSecretImmutable(tc.initial), reflectorRunning())
306+
307+
if tc.eventual == nil {
308+
return
309+
}
310+
fakeWatch.Add(tc.eventual)
311+
312+
// Eventually Get should return that secret.
313+
getFn := func() (bool, error) {
314+
object, err := store.Get("ns", "name")
315+
if err != nil {
316+
if apierrors.IsNotFound(err) {
317+
return false, nil
318+
}
319+
return false, err
320+
}
321+
secret := object.(*v1.Secret)
322+
return apiequality.Semantic.DeepEqual(tc.eventual, secret), nil
323+
}
324+
if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
325+
t.Errorf("unexpected error: %v", err)
326+
}
327+
328+
// Reflector should already be stopped for immutable secrets.
329+
assert.Equal(t, tc.eventual == nil || !isSecretImmutable(tc.eventual), reflectorRunning())
330+
})
331+
}
332+
}

0 commit comments

Comments
 (0)