Skip to content

Commit 6de748d

Browse files
authored
Merge pull request kubernetes#126968 from serathius/watchache-refactor-store
Watch cache refactor watch cache store and add tests
2 parents eebc897 + c93d2e8 commit 6de748d

File tree

4 files changed

+269
-65
lines changed

4 files changed

+269
-65
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cacher
18+
19+
import (
20+
"fmt"
21+
22+
"k8s.io/apimachinery/pkg/fields"
23+
"k8s.io/apimachinery/pkg/labels"
24+
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/client-go/tools/cache"
26+
)
27+
28+
type storeIndexer interface {
29+
Add(obj interface{}) error
30+
Update(obj interface{}) error
31+
Delete(obj interface{}) error
32+
List() []interface{}
33+
ListKeys() []string
34+
Get(obj interface{}) (item interface{}, exists bool, err error)
35+
GetByKey(key string) (item interface{}, exists bool, err error)
36+
Replace([]interface{}, string) error
37+
ByIndex(indexName, indexedValue string) ([]interface{}, error)
38+
}
39+
40+
func newStoreIndexer(indexers *cache.Indexers) storeIndexer {
41+
return cache.NewIndexer(storeElementKey, storeElementIndexers(indexers))
42+
}
43+
44+
// Computing a key of an object is generally non-trivial (it performs
45+
// e.g. validation underneath). Similarly computing object fields and
46+
// labels. To avoid computing them multiple times (to serve the event
47+
// in different List/Watch requests), in the underlying store we are
48+
// keeping structs (key, object, labels, fields).
49+
type storeElement struct {
50+
Key string
51+
Object runtime.Object
52+
Labels labels.Set
53+
Fields fields.Set
54+
}
55+
56+
func storeElementKey(obj interface{}) (string, error) {
57+
elem, ok := obj.(*storeElement)
58+
if !ok {
59+
return "", fmt.Errorf("not a storeElement: %v", obj)
60+
}
61+
return elem.Key, nil
62+
}
63+
64+
func storeElementObject(obj interface{}) (runtime.Object, error) {
65+
elem, ok := obj.(*storeElement)
66+
if !ok {
67+
return nil, fmt.Errorf("not a storeElement: %v", obj)
68+
}
69+
return elem.Object, nil
70+
}
71+
72+
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
73+
return func(obj interface{}) (strings []string, e error) {
74+
seo, err := storeElementObject(obj)
75+
if err != nil {
76+
return nil, err
77+
}
78+
return objIndexFunc(seo)
79+
}
80+
}
81+
82+
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
83+
if indexers == nil {
84+
return cache.Indexers{}
85+
}
86+
ret := cache.Indexers{}
87+
for indexName, indexFunc := range *indexers {
88+
ret[indexName] = storeElementIndexFunc(indexFunc)
89+
}
90+
return ret
91+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cacher
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
27+
"k8s.io/client-go/tools/cache"
28+
)
29+
30+
func TestStoreSingleKey(t *testing.T) {
31+
store := newStoreIndexer(nil)
32+
assertStoreEmpty(t, store, "foo")
33+
34+
require.NoError(t, store.Add(testStorageElement("foo", "bar", 1)))
35+
assertStoreSingleKey(t, store, "foo", "bar", 1)
36+
37+
require.NoError(t, store.Update(testStorageElement("foo", "baz", 2)))
38+
assertStoreSingleKey(t, store, "foo", "baz", 2)
39+
40+
require.NoError(t, store.Update(testStorageElement("foo", "baz", 3)))
41+
assertStoreSingleKey(t, store, "foo", "baz", 3)
42+
43+
require.NoError(t, store.Replace([]interface{}{testStorageElement("foo", "bar", 4)}, ""))
44+
assertStoreSingleKey(t, store, "foo", "bar", 4)
45+
46+
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
47+
assertStoreEmpty(t, store, "foo")
48+
49+
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
50+
}
51+
52+
func TestStoreIndexerSingleKey(t *testing.T) {
53+
store := newStoreIndexer(testStoreIndexers())
54+
items, err := store.ByIndex("by_val", "bar")
55+
require.NoError(t, err)
56+
assert.Empty(t, items)
57+
58+
require.NoError(t, store.Add(testStorageElement("foo", "bar", 1)))
59+
items, err = store.ByIndex("by_val", "bar")
60+
require.NoError(t, err)
61+
assert.Equal(t, []interface{}{
62+
testStorageElement("foo", "bar", 1),
63+
}, items)
64+
65+
require.NoError(t, store.Update(testStorageElement("foo", "baz", 2)))
66+
items, err = store.ByIndex("by_val", "bar")
67+
require.NoError(t, err)
68+
assert.Empty(t, items)
69+
items, err = store.ByIndex("by_val", "baz")
70+
require.NoError(t, err)
71+
assert.Equal(t, []interface{}{
72+
testStorageElement("foo", "baz", 2),
73+
}, items)
74+
75+
require.NoError(t, store.Update(testStorageElement("foo", "baz", 3)))
76+
items, err = store.ByIndex("by_val", "bar")
77+
require.NoError(t, err)
78+
assert.Empty(t, items)
79+
items, err = store.ByIndex("by_val", "baz")
80+
require.NoError(t, err)
81+
assert.Equal(t, []interface{}{
82+
testStorageElement("foo", "baz", 3),
83+
}, items)
84+
85+
require.NoError(t, store.Replace([]interface{}{
86+
testStorageElement("foo", "bar", 4),
87+
}, ""))
88+
items, err = store.ByIndex("by_val", "bar")
89+
require.NoError(t, err)
90+
assert.Equal(t, []interface{}{
91+
testStorageElement("foo", "bar", 4),
92+
}, items)
93+
items, err = store.ByIndex("by_val", "baz")
94+
require.NoError(t, err)
95+
assert.Empty(t, items)
96+
97+
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
98+
items, err = store.ByIndex("by_val", "baz")
99+
require.NoError(t, err)
100+
assert.Empty(t, items)
101+
102+
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
103+
}
104+
105+
func assertStoreEmpty(t *testing.T, store storeIndexer, nonExistingKey string) {
106+
item, ok, err := store.Get(testStorageElement(nonExistingKey, "", 0))
107+
require.NoError(t, err)
108+
assert.False(t, ok)
109+
assert.Nil(t, item)
110+
111+
item, ok, err = store.GetByKey(nonExistingKey)
112+
require.NoError(t, err)
113+
assert.False(t, ok)
114+
assert.Nil(t, item)
115+
116+
items := store.List()
117+
assert.Empty(t, items)
118+
}
119+
120+
func assertStoreSingleKey(t *testing.T, store storeIndexer, expectKey, expectValue string, expectRV int) {
121+
item, ok, err := store.Get(testStorageElement(expectKey, "", expectRV))
122+
require.NoError(t, err)
123+
assert.True(t, ok)
124+
assert.Equal(t, expectValue, item.(*storeElement).Object.(fakeObj).value)
125+
126+
item, ok, err = store.GetByKey(expectKey)
127+
require.NoError(t, err)
128+
assert.True(t, ok)
129+
assert.Equal(t, expectValue, item.(*storeElement).Object.(fakeObj).value)
130+
131+
items := store.List()
132+
assert.Equal(t, []interface{}{testStorageElement(expectKey, expectValue, expectRV)}, items)
133+
}
134+
135+
func testStorageElement(key, value string, rv int) *storeElement {
136+
return &storeElement{Key: key, Object: fakeObj{value: value, rv: rv}}
137+
}
138+
139+
type fakeObj struct {
140+
value string
141+
rv int
142+
}
143+
144+
func (f fakeObj) GetObjectKind() schema.ObjectKind { return nil }
145+
func (f fakeObj) DeepCopyObject() runtime.Object { return nil }
146+
147+
var _ runtime.Object = (*fakeObj)(nil)
148+
149+
func testStoreIndexFunc(obj interface{}) ([]string, error) {
150+
return []string{obj.(fakeObj).value}, nil
151+
}
152+
153+
func testStoreIndexers() *cache.Indexers {
154+
indexers := cache.Indexers{}
155+
indexers["by_val"] = testStoreIndexFunc
156+
return &indexers
157+
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 20 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -83,55 +83,6 @@ type watchCacheEvent struct {
8383
RecordTime time.Time
8484
}
8585

86-
// Computing a key of an object is generally non-trivial (it performs
87-
// e.g. validation underneath). Similarly computing object fields and
88-
// labels. To avoid computing them multiple times (to serve the event
89-
// in different List/Watch requests), in the underlying store we are
90-
// keeping structs (key, object, labels, fields).
91-
type storeElement struct {
92-
Key string
93-
Object runtime.Object
94-
Labels labels.Set
95-
Fields fields.Set
96-
}
97-
98-
func storeElementKey(obj interface{}) (string, error) {
99-
elem, ok := obj.(*storeElement)
100-
if !ok {
101-
return "", fmt.Errorf("not a storeElement: %v", obj)
102-
}
103-
return elem.Key, nil
104-
}
105-
106-
func storeElementObject(obj interface{}) (runtime.Object, error) {
107-
elem, ok := obj.(*storeElement)
108-
if !ok {
109-
return nil, fmt.Errorf("not a storeElement: %v", obj)
110-
}
111-
return elem.Object, nil
112-
}
113-
114-
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
115-
return func(obj interface{}) (strings []string, e error) {
116-
seo, err := storeElementObject(obj)
117-
if err != nil {
118-
return nil, err
119-
}
120-
return objIndexFunc(seo)
121-
}
122-
}
123-
124-
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
125-
if indexers == nil {
126-
return cache.Indexers{}
127-
}
128-
ret := cache.Indexers{}
129-
for indexName, indexFunc := range *indexers {
130-
ret[indexName] = storeElementIndexFunc(indexFunc)
131-
}
132-
return ret
133-
}
134-
13586
// watchCache implements a Store interface.
13687
// However, it depends on the elements implementing runtime.Object interface.
13788
//
@@ -173,7 +124,7 @@ type watchCache struct {
173124
// history" i.e. from the moment just after the newest cached watched event.
174125
// It is necessary to effectively allow clients to start watching at now.
175126
// NOTE: We assume that <store> is thread-safe.
176-
store cache.Indexer
127+
store storeIndexer
177128

178129
// ResourceVersion up to which the watchCache is propagated.
179130
resourceVersion uint64
@@ -223,7 +174,7 @@ func newWatchCache(
223174
upperBoundCapacity: defaultUpperBoundCapacity,
224175
startIndex: 0,
225176
endIndex: 0,
226-
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
177+
store: newStoreIndexer(indexers),
227178
resourceVersion: 0,
228179
listResourceVersion: 0,
229180
eventHandler: eventHandler,
@@ -506,19 +457,10 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
506457
if err != nil {
507458
return nil, 0, "", err
508459
}
509-
510-
var result []interface{}
511-
for _, item := range items {
512-
elem, ok := item.(*storeElement)
513-
if !ok {
514-
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
515-
}
516-
if !hasPathPrefix(elem.Key, key) {
517-
continue
518-
}
519-
result = append(result, item)
460+
result, err := filterPrefix(key, items)
461+
if err != nil {
462+
return nil, 0, "", err
520463
}
521-
522464
sort.Sort(sortableStoreElements(result))
523465
return result, rv, index, nil
524466
}
@@ -554,6 +496,21 @@ func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVer
554496
return result, rv, index, err
555497
}
556498

499+
func filterPrefix(prefix string, items []interface{}) ([]interface{}, error) {
500+
var result []interface{}
501+
for _, item := range items {
502+
elem, ok := item.(*storeElement)
503+
if !ok {
504+
return nil, fmt.Errorf("non *storeElement returned from storage: %v", item)
505+
}
506+
if !hasPathPrefix(elem.Key, prefix) {
507+
continue
508+
}
509+
result = append(result, item)
510+
}
511+
return result, nil
512+
}
513+
557514
func (w *watchCache) notFresh(resourceVersion uint64) bool {
558515
w.RLock()
559516
defer w.RUnlock()

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"k8s.io/apimachinery/pkg/labels"
2626
"k8s.io/apimachinery/pkg/runtime"
2727
"k8s.io/apimachinery/pkg/watch"
28-
"k8s.io/client-go/tools/cache"
2928
)
3029

3130
// watchCacheInterval serves as an abstraction over a source
@@ -133,7 +132,7 @@ func (s sortableWatchCacheEvents) Swap(i, j int) {
133132
// returned by Next() need to be events from a List() done on the underlying store of
134133
// the watch cache.
135134
// The items returned in the interval will be sorted by Key.
136-
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
135+
func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
137136
buffer := &watchCacheIntervalBuffer{}
138137
var allItems []interface{}
139138

0 commit comments

Comments
 (0)