Skip to content

Commit 3d9fcb7

Browse files
authored
Merge pull request kubernetes#130412 from serathius/watchcache-progress
Move watch progress to separate package.
2 parents 6ff0354 + 740db0f commit 3d9fcb7

File tree

6 files changed

+19
-23
lines changed

6 files changed

+19
-23
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"k8s.io/apiserver/pkg/features"
4444
"k8s.io/apiserver/pkg/storage"
4545
"k8s.io/apiserver/pkg/storage/cacher/metrics"
46+
"k8s.io/apiserver/pkg/storage/cacher/progress"
4647
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
4748
utilfeature "k8s.io/apiserver/pkg/util/feature"
4849
"k8s.io/client-go/tools/cache"
@@ -420,7 +421,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
420421
return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration)
421422
}
422423

423-
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
424+
progressRequester := progress.NewConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
424425
watchCache := newWatchCache(
425426
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers,
426427
config.Clock, eventFreshDuration, config.GroupResource, progressRequester)

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2815,13 +2815,6 @@ func TestWatchStreamSeparation(t *testing.T) {
28152815
expectBookmarkOnEtcd: true,
28162816
expectBookmarkOnWatchCache: true,
28172817
},
2818-
{
2819-
name: "common RPC & watch cache context > both get bookmarks",
2820-
separateCacheWatchRPC: false,
2821-
useWatchCacheContextMetadata: true,
2822-
expectBookmarkOnEtcd: true,
2823-
expectBookmarkOnWatchCache: true,
2824-
},
28252818
{
28262819
name: "separate RPC > only etcd gets bookmarks",
28272820
separateCacheWatchRPC: true,
@@ -2877,7 +2870,7 @@ func TestWatchStreamSeparation(t *testing.T) {
28772870

28782871
var contextMetadata metadata.MD
28792872
if tc.useWatchCacheContextMetadata {
2880-
contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata
2873+
contextMetadata = metadata.New(map[string]string{"source": "cache"})
28812874
}
28822875
// For the first 100ms from watch creation, watch progress requests are ignored.
28832876
time.Sleep(200 * time.Millisecond)

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go renamed to staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package cacher
17+
package progress
1818

1919
import (
2020
"context"
@@ -36,8 +36,8 @@ const (
3636
progressRequestPeriod = 100 * time.Millisecond
3737
)
3838

39-
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester {
40-
pr := &conditionalProgressRequester{
39+
func NewConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *ConditionalProgressRequester {
40+
pr := &ConditionalProgressRequester{
4141
clock: clock,
4242
requestWatchProgress: requestWatchProgress,
4343
contextMetadata: contextMetadata,
@@ -52,9 +52,9 @@ type TickerFactory interface {
5252
NewTimer(time.Duration) clock.Timer
5353
}
5454

55-
// conditionalProgressRequester will request progress notification if there
55+
// ConditionalProgressRequester will request progress notification if there
5656
// is a request waiting for watch cache to be fresh.
57-
type conditionalProgressRequester struct {
57+
type ConditionalProgressRequester struct {
5858
clock TickerFactory
5959
requestWatchProgress WatchProgressRequester
6060
contextMetadata metadata.MD
@@ -65,7 +65,7 @@ type conditionalProgressRequester struct {
6565
stopped bool
6666
}
6767

68-
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
68+
func (pr *ConditionalProgressRequester) Run(stopCh <-chan struct{}) {
6969
ctx := wait.ContextForChannel(stopCh)
7070
if pr.contextMetadata != nil {
7171
ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata)
@@ -115,14 +115,14 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
115115
}
116116
}
117117

118-
func (pr *conditionalProgressRequester) Add() {
118+
func (pr *ConditionalProgressRequester) Add() {
119119
pr.mux.Lock()
120120
defer pr.mux.Unlock()
121121
pr.waiting += 1
122122
pr.cond.Signal()
123123
}
124124

125-
func (pr *conditionalProgressRequester) Remove() {
125+
func (pr *ConditionalProgressRequester) Remove() {
126126
pr.mux.Lock()
127127
defer pr.mux.Unlock()
128128
pr.waiting -= 1

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go renamed to staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package cacher
17+
package progress
1818

1919
import (
2020
"context"
@@ -115,12 +115,12 @@ func TestConditionalProgressRequester(t *testing.T) {
115115

116116
func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester {
117117
pr := &testConditionalProgressRequester{}
118-
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil)
118+
pr.ConditionalProgressRequester = NewConditionalProgressRequester(pr.RequestWatchProgress, clock, nil)
119119
return pr
120120
}
121121

122122
type testConditionalProgressRequester struct {
123-
*conditionalProgressRequester
123+
*ConditionalProgressRequester
124124
progressRequestsSentCount atomic.Int32
125125
}
126126

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"k8s.io/apiserver/pkg/features"
3434
"k8s.io/apiserver/pkg/storage"
3535
"k8s.io/apiserver/pkg/storage/cacher/metrics"
36+
"k8s.io/apiserver/pkg/storage/cacher/progress"
3637
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
3738
utilfeature "k8s.io/apiserver/pkg/util/feature"
3839
"k8s.io/client-go/tools/cache"
@@ -150,7 +151,7 @@ type watchCache struct {
150151

151152
// Requests progress notification if there are requests waiting for watch
152153
// to be fresh
153-
waitingUntilFresh *conditionalProgressRequester
154+
waitingUntilFresh *progress.ConditionalProgressRequester
154155

155156
// Stores previous snapshots of orderedLister to allow serving requests from previous revisions.
156157
snapshots *storeSnapshotter
@@ -165,7 +166,7 @@ func newWatchCache(
165166
clock clock.WithTicker,
166167
eventFreshDuration time.Duration,
167168
groupResource schema.GroupResource,
168-
progressRequester *conditionalProgressRequester) *watchCache {
169+
progressRequester *progress.ConditionalProgressRequester) *watchCache {
169170
wc := &watchCache{
170171
capacity: defaultLowerBoundCapacity,
171172
keyFunc: keyFunc,

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"k8s.io/apiserver/pkg/features"
4141
"k8s.io/apiserver/pkg/storage"
4242
"k8s.io/apiserver/pkg/storage/cacher/metrics"
43+
"k8s.io/apiserver/pkg/storage/cacher/progress"
4344
utilfeature "k8s.io/apiserver/pkg/util/feature"
4445
"k8s.io/client-go/tools/cache"
4546
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -128,7 +129,7 @@ func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers
128129
wc := &testWatchCache{}
129130
wc.bookmarkRevision = make(chan int64, 1)
130131
wc.stopCh = make(chan struct{})
131-
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
132+
pr := progress.NewConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
132133
go pr.Run(wc.stopCh)
133134
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr)
134135
// To preserve behavior of tests that assume a given capacity,

0 commit comments

Comments
 (0)