Skip to content

Commit c19d9ed

Browse files
authored
Merge pull request kubernetes#126329 from serathius/concurrent-transformation-chan-of-chan
[chan of chan] Make object transformation concurrent to remove watch cache scalability issue for conversion webhook
2 parents eb729d1 + bb686f2 commit c19d9ed

File tree

4 files changed

+119
-8
lines changed

4 files changed

+119
-8
lines changed

pkg/features/kube_features.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,6 +1264,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
12641264

12651265
genericfeatures.AuthorizeWithSelectors: {Default: false, PreRelease: featuregate.Alpha},
12661266

1267+
genericfeatures.ConcurrentWatchObjectDecode: {Default: false, PreRelease: featuregate.Beta},
1268+
12671269
genericfeatures.ConsistentListFromCache: {Default: true, PreRelease: featuregate.Beta},
12681270

12691271
genericfeatures.CoordinatedLeaderElection: {Default: false, PreRelease: featuregate.Alpha},

staging/src/k8s.io/apiserver/pkg/features/kube_features.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ const (
101101
// Allows authorization to use field and label selectors.
102102
AuthorizeWithSelectors featuregate.Feature = "AuthorizeWithSelectors"
103103

104+
// owner: @serathius
105+
// beta: v1.31
106+
// Enables concurrent watch object decoding to avoid starving watch cache when conversion webhook is installed.
107+
ConcurrentWatchObjectDecode featuregate.Feature = "ConcurrentWatchObjectDecode"
108+
104109
// owner: @cici37 @jpbetz
105110
// kep: http://kep.k8s.io/3488
106111
// alpha: v1.26
@@ -365,6 +370,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
365370

366371
AuthorizeWithSelectors: {Default: false, PreRelease: featuregate.Alpha},
367372

373+
ConcurrentWatchObjectDecode: {Default: false, PreRelease: featuregate.Beta},
374+
368375
ValidatingAdmissionPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
369376

370377
CoordinatedLeaderElection: {Default: false, PreRelease: featuregate.Alpha},

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go

Lines changed: 104 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ import (
4646

4747
const (
4848
// We have set a buffer in order to reduce times of context switches.
49-
incomingBufSize = 100
50-
outgoingBufSize = 100
49+
incomingBufSize = 100
50+
outgoingBufSize = 100
51+
processEventConcurrency = 10
5152
)
5253

5354
// defaultWatcherMaxLimit is used to facilitate construction tests
@@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
230231
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
231232

232233
var resultChanWG sync.WaitGroup
233-
resultChanWG.Add(1)
234-
go wc.processEvent(&resultChanWG)
234+
wc.processEvents(&resultChanWG)
235235

236236
select {
237237
case err := <-wc.errChan:
@@ -424,18 +424,25 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
424424
close(watchClosedCh)
425425
}
426426

427-
// processEvent processes events from etcd watcher and sends results to resultChan.
428-
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
427+
// processEvents processes events from etcd watcher and sends results to resultChan.
428+
func (wc *watchChan) processEvents(wg *sync.WaitGroup) {
429+
if utilfeature.DefaultFeatureGate.Enabled(features.ConcurrentWatchObjectDecode) {
430+
wc.concurrentProcessEvents(wg)
431+
} else {
432+
wg.Add(1)
433+
go wc.serialProcessEvents(wg)
434+
}
435+
}
436+
func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
429437
defer wg.Done()
430-
431438
for {
432439
select {
433440
case e := <-wc.incomingEventChan:
434441
res := wc.transform(e)
435442
if res == nil {
436443
continue
437444
}
438-
if len(wc.resultChan) == outgoingBufSize {
445+
if len(wc.resultChan) == cap(wc.resultChan) {
439446
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
440447
}
441448
// If user couldn't receive results fast enough, we also block incoming events from watcher.
@@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
452459
}
453460
}
454461

462+
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
463+
p := concurrentOrderedEventProcessing{
464+
input: wc.incomingEventChan,
465+
processFunc: wc.transform,
466+
output: wc.resultChan,
467+
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
468+
469+
objectType: wc.watcher.objectType,
470+
groupResource: wc.watcher.groupResource,
471+
}
472+
wg.Add(1)
473+
go func() {
474+
defer wg.Done()
475+
p.scheduleEventProcessing(wc.ctx, wg)
476+
}()
477+
wg.Add(1)
478+
go func() {
479+
defer wg.Done()
480+
p.collectEventProcessing(wc.ctx)
481+
}()
482+
}
483+
484+
type concurrentOrderedEventProcessing struct {
485+
input chan *event
486+
processFunc func(*event) *watch.Event
487+
output chan watch.Event
488+
489+
processingQueue chan chan *watch.Event
490+
// Metadata for logging
491+
objectType string
492+
groupResource schema.GroupResource
493+
}
494+
495+
func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) {
496+
var e *event
497+
for {
498+
select {
499+
case <-ctx.Done():
500+
return
501+
case e = <-p.input:
502+
}
503+
processingResponse := make(chan *watch.Event, 1)
504+
select {
505+
case <-ctx.Done():
506+
return
507+
case p.processingQueue <- processingResponse:
508+
}
509+
wg.Add(1)
510+
go func(e *event, response chan<- *watch.Event) {
511+
defer wg.Done()
512+
select {
513+
case <-ctx.Done():
514+
case response <- p.processFunc(e):
515+
}
516+
}(e, processingResponse)
517+
}
518+
}
519+
520+
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
521+
var processingResponse chan *watch.Event
522+
var e *watch.Event
523+
for {
524+
select {
525+
case <-ctx.Done():
526+
return
527+
case processingResponse = <-p.processingQueue:
528+
}
529+
select {
530+
case <-ctx.Done():
531+
return
532+
case e = <-processingResponse:
533+
}
534+
if e == nil {
535+
continue
536+
}
537+
if len(p.output) == cap(p.output) {
538+
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
539+
}
540+
// If user couldn't receive results fast enough, we also block incoming events from watcher.
541+
// Because storing events in local will cause more memory usage.
542+
// The worst case would be closing the fast watcher.
543+
select {
544+
case <-ctx.Done():
545+
return
546+
case p.output <- *e:
547+
}
548+
}
549+
}
550+
455551
func (wc *watchChan) filter(obj runtime.Object) bool {
456552
if wc.internalPred.Empty() {
457553
return true

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ func TestEtcdWatchSemantics(t *testing.T) {
133133
storagetesting.RunWatchSemantics(ctx, t, store)
134134
}
135135

136+
func TestEtcdWatchSemanticsWithConcurrentDecode(t *testing.T) {
137+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConcurrentWatchObjectDecode, true)
138+
ctx, store, _ := testSetup(t)
139+
storagetesting.RunWatchSemantics(ctx, t, store)
140+
}
141+
136142
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
137143
ctx, store, _ := testSetup(t)
138144
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)

0 commit comments

Comments
 (0)