Skip to content

Commit 2173a0f

Browse files
committed
Add watch cache capacity upper bound adjusting logic
1 parent fa03b93 commit 2173a0f

File tree

2 files changed

+68
-1
lines changed

2 files changed

+68
-1
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func newWatchCache(
169169
getAttrsFunc: getAttrsFunc,
170170
cache: make([]*watchCacheEvent, defaultLowerBoundCapacity),
171171
lowerBoundCapacity: defaultLowerBoundCapacity,
172-
upperBoundCapacity: defaultUpperBoundCapacity,
172+
upperBoundCapacity: capacityUpperBound(eventFreshDuration),
173173
startIndex: 0,
174174
endIndex: 0,
175175
store: newStoreIndexer(indexers),
@@ -189,6 +189,30 @@ func newWatchCache(
189189
return wc
190190
}
191191

192+
// capacityUpperBound denotes the maximum possible capacity of the watch cache
193+
// to which it can resize.
194+
func capacityUpperBound(eventFreshDuration time.Duration) int {
195+
if eventFreshDuration <= DefaultEventFreshDuration {
196+
return defaultUpperBoundCapacity
197+
}
198+
// eventFreshDuration determines how long the watch events are supposed
199+
// to be stored in the watch cache.
200+
// In very high churn situations, there is a need to store more events
201+
// in the watch cache, hence it would have to be upsized accordingly.
202+
// Because of that, for larger values of eventFreshDuration, we set the
203+
// upper bound of the watch cache's capacity proportionally to the ratio
204+
// between eventFreshDuration and DefaultEventFreshDuration.
205+
// Given that the watch cache size can only double, we round up that
206+
// proportion to the next power of two.
207+
exponent := int(math.Ceil((math.Log2(eventFreshDuration.Seconds() / DefaultEventFreshDuration.Seconds()))))
208+
if maxExponent := int(math.Floor((math.Log2(math.MaxInt32 / defaultUpperBoundCapacity)))); exponent > maxExponent {
209+
// Making sure that the capacity's upper bound fits in a 32-bit integer.
210+
exponent = maxExponent
211+
klog.Warningf("Capping watch cache capacity upper bound to %v", defaultUpperBoundCapacity<<exponent)
212+
}
213+
return defaultUpperBoundCapacity << exponent
214+
}
215+
192216
// Add takes runtime.Object as an argument.
193217
func (w *watchCache) Add(obj interface{}) error {
194218
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,49 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
11631163
}
11641164
}
11651165

1166+
func TestCapacityUpperBound(t *testing.T) {
1167+
testCases := []struct {
1168+
name string
1169+
eventFreshDuration time.Duration
1170+
expected int
1171+
}{
1172+
{
1173+
name: "default eventFreshDuration",
1174+
eventFreshDuration: DefaultEventFreshDuration, // 75s
1175+
expected: defaultUpperBoundCapacity, // 100 * 1024
1176+
},
1177+
{
1178+
name: "lower eventFreshDuration, capacity limit unchanged",
1179+
eventFreshDuration: 45 * time.Second, // 45s
1180+
expected: defaultUpperBoundCapacity, // 100 * 1024
1181+
},
1182+
{
1183+
name: "higher eventFreshDuration, capacity limit scaled up",
1184+
eventFreshDuration: 4 * DefaultEventFreshDuration, // 4 * 75s
1185+
expected: 4 * defaultUpperBoundCapacity, // 4 * 100 * 1024
1186+
},
1187+
{
1188+
name: "higher eventFreshDuration, capacity limit scaled and rounded up",
1189+
eventFreshDuration: 3 * DefaultEventFreshDuration, // 3 * 75s
1190+
expected: 4 * defaultUpperBoundCapacity, // 4 * 100 * 1024
1191+
},
1192+
{
1193+
name: "higher eventFreshDuration, capacity limit scaled up and capped",
1194+
eventFreshDuration: DefaultEventFreshDuration << 20, // 2^20 * 75s
1195+
expected: defaultUpperBoundCapacity << 14, // 2^14 * 100 * 1024
1196+
},
1197+
}
1198+
1199+
for _, test := range testCases {
1200+
t.Run(test.name, func(t *testing.T) {
1201+
capacity := capacityUpperBound(test.eventFreshDuration)
1202+
if test.expected != capacity {
1203+
t.Errorf("expected %v, got %v", test.expected, capacity)
1204+
}
1205+
})
1206+
}
1207+
}
1208+
11661209
func BenchmarkWatchCache_updateCache(b *testing.B) {
11671210
store := newTestWatchCache(defaultUpperBoundCapacity, DefaultEventFreshDuration, &cache.Indexers{})
11681211
defer store.Stop()

0 commit comments

Comments
 (0)