Skip to content

Commit b11b7d3

Browse files
committed
WatchBasedManager stops watching immutable objects
1 parent 1bb68a2 commit b11b7d3

File tree

6 files changed

+226
-15
lines changed

6 files changed

+226
-15
lines changed

pkg/kubelet/configmap/configmap_manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,14 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
144144
newConfigMap := func() runtime.Object {
145145
return &v1.ConfigMap{}
146146
}
147+
isImmutable := func(object runtime.Object) bool {
148+
if configMap, ok := object.(*v1.ConfigMap); ok {
149+
return configMap.Immutable != nil && *configMap.Immutable
150+
}
151+
return false
152+
}
147153
gr := corev1.Resource("configmap")
148154
return &configMapManager{
149-
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, gr, getConfigMapNames),
155+
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames),
150156
}
151157
}

pkg/kubelet/secret/secret_manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,14 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
145145
newSecret := func() runtime.Object {
146146
return &v1.Secret{}
147147
}
148+
isImmutable := func(object runtime.Object) bool {
149+
if secret, ok := object.(*v1.Secret); ok {
150+
return secret.Immutable != nil && *secret.Immutable
151+
}
152+
return false
153+
}
148154
gr := corev1.Resource("secret")
149155
return &secretManager{
150-
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, gr, getSecretNames),
156+
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames),
151157
}
152158
}

pkg/kubelet/util/manager/BUILD

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager",
1111
visibility = ["//visibility:public"],
1212
deps = [
13+
"//pkg/features:go_default_library",
1314
"//pkg/kubelet/util:go_default_library",
1415
"//staging/src/k8s.io/api/core/v1:go_default_library",
1516
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@@ -22,7 +23,9 @@ go_library(
2223
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
2324
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
2425
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
26+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
2527
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
28+
"//vendor/k8s.io/klog:go_default_library",
2629
],
2730
)
2831

@@ -36,17 +39,21 @@ go_test(
3639
deps = [
3740
"//pkg/api/v1/pod:go_default_library",
3841
"//pkg/apis/core/v1:go_default_library",
42+
"//pkg/features:go_default_library",
3943
"//staging/src/k8s.io/api/core/v1:go_default_library",
44+
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
4045
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
4146
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
4247
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
4348
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
4449
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
4550
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
4651
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
52+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
4753
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
4854
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
4955
"//staging/src/k8s.io/client-go/testing:go_default_library",
56+
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
5057
"//vendor/github.com/stretchr/testify/assert:go_default_library",
5158
],
5259
)

pkg/kubelet/util/manager/watch_based_manager.go

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,6 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// TODO: We did some scalability tests and using watchBasedManager
18-
// seems to help with apiserver performance at scale visibly.
19-
// No issues we also observed at the scale of ~200k watchers with a
20-
// single apiserver.
21-
// However, we need to perform more extensive testing before we
22-
// enable this in production setups.
23-
2417
package manager
2518

2619
import (
@@ -31,6 +24,8 @@ import (
3124
"k8s.io/api/core/v1"
3225
"k8s.io/client-go/tools/cache"
3326

27+
"k8s.io/klog"
28+
3429
apierrors "k8s.io/apimachinery/pkg/api/errors"
3530
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3631
"k8s.io/apimachinery/pkg/fields"
@@ -39,18 +34,37 @@ import (
3934
"k8s.io/apimachinery/pkg/util/sets"
4035
"k8s.io/apimachinery/pkg/util/wait"
4136
"k8s.io/apimachinery/pkg/watch"
37+
utilfeature "k8s.io/apiserver/pkg/util/feature"
38+
"k8s.io/kubernetes/pkg/features"
4239
)
4340

4441
type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
4542
type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
4643
type newObjectFunc func() runtime.Object
44+
type isImmutableFunc func(runtime.Object) bool
4745

4846
// objectCacheItem is a single item stored in objectCache.
4947
type objectCacheItem struct {
5048
refCount int
5149
store cache.Store
5250
hasSynced func() (bool, error)
53-
stopCh chan struct{}
51+
52+
// lock is protecting from closing stopCh multiple times.
53+
lock sync.Mutex
54+
stopCh chan struct{}
55+
}
56+
57+
func (i *objectCacheItem) stop() bool {
58+
i.lock.Lock()
59+
defer i.lock.Unlock()
60+
select {
61+
case <-i.stopCh:
62+
// This means that channel is already closed.
63+
return false
64+
default:
65+
close(i.stopCh)
66+
return true
67+
}
5468
}
5569

5670
// objectCache is a local cache of objects propagated via
@@ -59,18 +73,25 @@ type objectCache struct {
5973
listObject listObjectFunc
6074
watchObject watchObjectFunc
6175
newObject newObjectFunc
76+
isImmutable isImmutableFunc
6277
groupResource schema.GroupResource
6378

6479
lock sync.RWMutex
6580
items map[objectKey]*objectCacheItem
6681
}
6782

6883
// NewObjectCache returns a new watch-based instance of Store interface.
69-
func NewObjectCache(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource) Store {
84+
func NewObjectCache(
85+
listObject listObjectFunc,
86+
watchObject watchObjectFunc,
87+
newObject newObjectFunc,
88+
isImmutable isImmutableFunc,
89+
groupResource schema.GroupResource) Store {
7090
return &objectCache{
7191
listObject: listObject,
7292
watchObject: watchObject,
7393
newObject: newObject,
94+
isImmutable: isImmutable,
7495
groupResource: groupResource,
7596
items: make(map[objectKey]*objectCacheItem),
7697
}
@@ -140,7 +161,7 @@ func (c *objectCache) DeleteReference(namespace, name string) {
140161
item.refCount--
141162
if item.refCount == 0 {
142163
// Stop the underlying reflector.
143-
close(item.stopCh)
164+
item.stop()
144165
delete(c.items, key)
145166
}
146167
}
@@ -177,6 +198,21 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
177198
return nil, apierrors.NewNotFound(c.groupResource, name)
178199
}
179200
if object, ok := obj.(runtime.Object); ok {
201+
// If the returned object is immutable, stop the reflector.
202+
//
203+
// NOTE: we may potentially not even start the reflector if the object is
204+
// already immutable. However, given that:
205+
// - we want to also handle the case when object is marked as immutable later
206+
// - Secrets and ConfigMaps are periodically fetched by volumemanager anyway
207+
// - doing that wouldn't provide visible scalability/performance gain - we
208+
// already have it from here
209+
// - doing that would require significant refactoring to reflector
210+
// we limit ourselves to just quickly stop the reflector here.
211+
if utilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes) && c.isImmutable(object) {
212+
if item.stop() {
213+
klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name)
214+
}
215+
}
180216
return object, nil
181217
}
182218
return nil, fmt.Errorf("unexpected object type: %v", obj)
@@ -188,7 +224,13 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
188224
// - whenever a pod is created or updated, we start individual watches for all
189225
// referenced objects that aren't referenced from other registered pods
190226
// - every GetObject() returns a value from local cache propagated via watches
191-
func NewWatchBasedManager(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource, getReferencedObjects func(*v1.Pod) sets.String) Manager {
192-
objectStore := NewObjectCache(listObject, watchObject, newObject, groupResource)
227+
func NewWatchBasedManager(
228+
listObject listObjectFunc,
229+
watchObject watchObjectFunc,
230+
newObject newObjectFunc,
231+
isImmutable isImmutableFunc,
232+
groupResource schema.GroupResource,
233+
getReferencedObjects func(*v1.Pod) sets.String) Manager {
234+
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource)
193235
return NewCacheBasedManager(objectStore, getReferencedObjects)
194236
}

pkg/kubelet/util/manager/watch_based_manager_test.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@ import (
2323
"time"
2424

2525
"k8s.io/api/core/v1"
26+
apiequality "k8s.io/apimachinery/pkg/api/equality"
2627
apierrors "k8s.io/apimachinery/pkg/api/errors"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/runtime"
2930
"k8s.io/apimachinery/pkg/util/wait"
3031
"k8s.io/apimachinery/pkg/watch"
3132

33+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3234
clientset "k8s.io/client-go/kubernetes"
3335
"k8s.io/client-go/kubernetes/fake"
3436
core "k8s.io/client-go/testing"
37+
featuregatetesting "k8s.io/component-base/featuregate/testing"
3538

3639
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
40+
"k8s.io/kubernetes/pkg/features"
3741

3842
"github.com/stretchr/testify/assert"
3943
)
@@ -50,11 +54,19 @@ func watchSecret(fakeClient clientset.Interface) watchObjectFunc {
5054
}
5155
}
5256

57+
func isSecretImmutable(object runtime.Object) bool {
58+
if secret, ok := object.(*v1.Secret); ok {
59+
return secret.Immutable != nil && *secret.Immutable
60+
}
61+
return false
62+
}
63+
5364
func newSecretCache(fakeClient clientset.Interface) *objectCache {
5465
return &objectCache{
5566
listObject: listSecret(fakeClient),
5667
watchObject: watchSecret(fakeClient),
5768
newObject: func() runtime.Object { return &v1.Secret{} },
69+
isImmutable: isSecretImmutable,
5870
groupResource: corev1.Resource("secret"),
5971
items: make(map[objectKey]*objectCacheItem),
6072
}
@@ -182,3 +194,138 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
182194
actions = fakeClient.Actions()
183195
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
184196
}
197+
198+
func TestImmutableSecretStopsTheReflector(t *testing.T) {
199+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ImmutableEphemeralVolumes, true)()
200+
201+
secret := func(rv string, immutable bool) *v1.Secret {
202+
result := &v1.Secret{
203+
ObjectMeta: metav1.ObjectMeta{
204+
Name: "name",
205+
Namespace: "ns",
206+
ResourceVersion: rv,
207+
},
208+
}
209+
if immutable {
210+
trueVal := true
211+
result.Immutable = &trueVal
212+
}
213+
return result
214+
}
215+
216+
tests := []struct {
217+
desc string
218+
initial *v1.Secret
219+
eventual *v1.Secret
220+
}{
221+
{
222+
desc: "secret doesn't exist, created as mutable",
223+
initial: nil,
224+
eventual: secret("200", false),
225+
},
226+
{
227+
desc: "secret doesn't exist, created as immutable",
228+
initial: nil,
229+
eventual: secret("200", true),
230+
},
231+
{
232+
desc: "mutable secret modified to mutable",
233+
initial: secret("100", false),
234+
eventual: secret("200", false),
235+
},
236+
{
237+
desc: "mutable secret modified to immutable",
238+
initial: secret("100", false),
239+
eventual: secret("200", true),
240+
},
241+
{
242+
desc: "immutable secret",
243+
initial: secret("100", true),
244+
eventual: nil,
245+
},
246+
}
247+
248+
for _, tc := range tests {
249+
t.Run(tc.desc, func(t *testing.T) {
250+
fakeClient := &fake.Clientset{}
251+
listReactor := func(a core.Action) (bool, runtime.Object, error) {
252+
result := &v1.SecretList{
253+
ListMeta: metav1.ListMeta{
254+
ResourceVersion: "100",
255+
},
256+
}
257+
if tc.initial != nil {
258+
result.Items = []v1.Secret{*tc.initial}
259+
}
260+
return true, result, nil
261+
}
262+
fakeClient.AddReactor("list", "secrets", listReactor)
263+
fakeWatch := watch.NewFake()
264+
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
265+
266+
store := newSecretCache(fakeClient)
267+
268+
key := objectKey{namespace: "ns", name: "name"}
269+
itemExists := func() (bool, error) {
270+
store.lock.Lock()
271+
defer store.lock.Unlock()
272+
_, ok := store.items[key]
273+
return ok, nil
274+
}
275+
reflectorRunning := func() bool {
276+
store.lock.Lock()
277+
defer store.lock.Unlock()
278+
item := store.items[key]
279+
280+
item.lock.Lock()
281+
defer item.lock.Unlock()
282+
select {
283+
case <-item.stopCh:
284+
return false
285+
default:
286+
return true
287+
}
288+
}
289+
290+
// AddReference should start reflector.
291+
store.AddReference("ns", "name")
292+
if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil {
293+
t.Errorf("item wasn't added to cache")
294+
}
295+
296+
obj, err := store.Get("ns", "name")
297+
if tc.initial != nil {
298+
assert.True(t, apiequality.Semantic.DeepEqual(tc.initial, obj))
299+
} else {
300+
assert.True(t, apierrors.IsNotFound(err))
301+
}
302+
303+
// Reflector should already be stopped for immutable secrets.
304+
assert.Equal(t, tc.initial == nil || !isSecretImmutable(tc.initial), reflectorRunning())
305+
306+
if tc.eventual == nil {
307+
return
308+
}
309+
fakeWatch.Add(tc.eventual)
310+
311+
// Eventually Get should return that secret.
312+
getFn := func() (bool, error) {
313+
object, err := store.Get("ns", "name")
314+
if err != nil {
315+
if apierrors.IsNotFound(err) {
316+
return false, nil
317+
}
318+
return false, err
319+
}
320+
secret := object.(*v1.Secret)
321+
return apiequality.Semantic.DeepEqual(tc.eventual, secret), nil
322+
}
323+
if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
324+
t.Errorf("unexpected error: %v", err)
325+
}
326+
327+
// Reflector should already be stopped for immutable secrets.
328+
assert.Equal(t, tc.eventual == nil || !isSecretImmutable(tc.eventual), reflectorRunning())
329+
})
330+
}
331+
}

0 commit comments

Comments
 (0)