Skip to content

Commit 56e7284

Browse files
committed
Implement etcd3 progress-notify feature in etcd3 layer
1 parent fbd65a2 commit 56e7284

File tree

7 files changed

+112
-17
lines changed

7 files changed

+112
-17
lines changed

staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ import (
2323
)
2424

2525
type event struct {
26-
key string
27-
value []byte
28-
prevValue []byte
29-
rev int64
30-
isDeleted bool
31-
isCreated bool
26+
key string
27+
value []byte
28+
prevValue []byte
29+
rev int64
30+
isDeleted bool
31+
isCreated bool
32+
isProgressNotify bool
3233
}
3334

3435
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
@@ -61,3 +62,10 @@ func parseEvent(e *clientv3.Event) (*event, error) {
6162
}
6263
return ret, nil
6364
}
65+
66+
func progressNotifyEvent(rev int64) *event {
67+
return &event{
68+
rev: rev,
69+
isProgressNotify: true,
70+
}
71+
}

staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ var (
6161
},
6262
[]string{"endpoint"},
6363
)
64+
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
65+
&compbasemetrics.GaugeOpts{
66+
Name: "etcd_bookmark_counts",
67+
Help: "Number of etcd bookmarks (progress notify events) split by kind.",
68+
StabilityLevel: compbasemetrics.ALPHA,
69+
},
70+
[]string{"resource"},
71+
)
6472
)
6573

6674
var registerMetrics sync.Once
@@ -72,6 +80,7 @@ func Register() {
7280
legacyregistry.MustRegister(etcdRequestLatency)
7381
legacyregistry.MustRegister(objectCounts)
7482
legacyregistry.MustRegister(dbTotalSize)
83+
legacyregistry.MustRegister(etcdBookmarkCounts)
7584
})
7685
}
7786

@@ -85,6 +94,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
8594
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
8695
}
8796

97+
// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
98+
func RecordEtcdBookmark(resource string) {
99+
etcdBookmarkCounts.WithLabelValues(resource).Inc()
100+
}
101+
88102
// Reset resets the etcd_request_duration_seconds metric.
89103
func Reset() {
90104
etcdRequestLatency.Reset()

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object,
8787
return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer)
8888
}
8989

90-
func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
90+
func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
9191
versioner := APIObjectVersioner{}
9292
result := &store{
9393
client: c,
@@ -99,7 +99,7 @@ func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, c
9999
// no-op for default prefix of '/registry'.
100100
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
101101
pathPrefix: path.Join("/", prefix),
102-
watcher: newWatcher(c, codec, versioner, transformer),
102+
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
103103
leaseManager: newDefaultLeaseManager(c),
104104
}
105105
return result
@@ -776,7 +776,7 @@ func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions,
776776
return nil, err
777777
}
778778
key = path.Join(s.pathPrefix, key)
779-
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate)
779+
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate)
780780
}
781781

782782
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2077,5 +2077,4 @@ func TestConsistentList(t *testing.T) {
20772077
if !reflect.DeepEqual(result3, result4) {
20782078
t.Errorf("inconsistent lists: %#v, %#v", result3, result4)
20792079
}
2080-
20812080
}

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"os"
24+
"reflect"
2425
"strconv"
2526
"strings"
2627
"sync"
@@ -29,6 +30,7 @@ import (
2930
"k8s.io/apimachinery/pkg/runtime"
3031
"k8s.io/apimachinery/pkg/watch"
3132
"k8s.io/apiserver/pkg/storage"
33+
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
3234
"k8s.io/apiserver/pkg/storage/value"
3335

3436
"go.etcd.io/etcd/clientv3"
@@ -68,6 +70,8 @@ func TestOnlySetFatalOnDecodeError(b bool) {
6870
type watcher struct {
6971
client *clientv3.Client
7072
codec runtime.Codec
73+
newFunc func() runtime.Object
74+
objectType string
7175
versioner storage.Versioner
7276
transformer value.Transformer
7377
}
@@ -78,6 +82,7 @@ type watchChan struct {
7882
key string
7983
initialRev int64
8084
recursive bool
85+
progressNotify bool
8186
internalPred storage.SelectionPredicate
8287
ctx context.Context
8388
cancel context.CancelFunc
@@ -86,13 +91,20 @@ type watchChan struct {
8691
errChan chan error
8792
}
8893

89-
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
90-
return &watcher{
94+
func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
95+
res := &watcher{
9196
client: client,
9297
codec: codec,
98+
newFunc: newFunc,
9399
versioner: versioner,
94100
transformer: transformer,
95101
}
102+
if newFunc == nil {
103+
res.objectType = "<unknown>"
104+
} else {
105+
res.objectType = reflect.TypeOf(newFunc()).String()
106+
}
107+
return res
96108
}
97109

98110
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
@@ -102,21 +114,22 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.
102114
// If recursive is false, it watches on given key.
103115
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
104116
// pred must be non-nil. Only if pred matches the change, it will be returned.
105-
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
117+
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) {
106118
if recursive && !strings.HasSuffix(key, "/") {
107119
key += "/"
108120
}
109-
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
121+
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
110122
go wc.run()
111123
return wc, nil
112124
}
113125

114-
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
126+
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
115127
wc := &watchChan{
116128
watcher: w,
117129
key: key,
118130
initialRev: rev,
119131
recursive: recursive,
132+
progressNotify: progressNotify,
120133
internalPred: pred,
121134
incomingEventChan: make(chan *event, incomingBufSize),
122135
resultChan: make(chan watch.Event, outgoingBufSize),
@@ -223,6 +236,9 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
223236
if wc.recursive {
224237
opts = append(opts, clientv3.WithPrefix())
225238
}
239+
if wc.progressNotify {
240+
opts = append(opts, clientv3.WithProgressNotify())
241+
}
226242
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
227243
for wres := range wch {
228244
if wres.Err() != nil {
@@ -232,6 +248,12 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
232248
wc.sendError(err)
233249
return
234250
}
251+
if wres.IsProgressNotify() {
252+
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
253+
metrics.RecordEtcdBookmark(wc.watcher.objectType)
254+
continue
255+
}
256+
235257
for _, e := range wres.Events {
236258
parsedEvent, err := parseEvent(e)
237259
if err != nil {
@@ -299,6 +321,19 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
299321
}
300322

301323
switch {
324+
case e.isProgressNotify:
325+
if wc.watcher.newFunc == nil {
326+
return nil
327+
}
328+
object := wc.watcher.newFunc()
329+
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
330+
klog.Errorf("failed to propagate object version: %v", err)
331+
return nil
332+
}
333+
res = &watch.Event{
334+
Type: watch.Bookmark,
335+
Object: object,
336+
}
302337
case e.isDeleted:
303338
if !wc.filter(oldObj) {
304339
return nil
@@ -376,6 +411,11 @@ func (wc *watchChan) sendEvent(e *event) {
376411
}
377412

378413
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
414+
if e.isProgressNotify {
415+
// progressNotify events doesn't contain neither current nor previous object version,
416+
return nil, nil, nil
417+
}
418+
379419
if !e.isDeleted {
380420
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
381421
if err != nil {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func TestWatchContextCancel(t *testing.T) {
246246
cancel()
247247
// When we watch with a canceled context, we should detect that it's context canceled.
248248
// We won't take it as error and also close the watcher.
249-
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything)
249+
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, false, storage.Everything)
250250
if err != nil {
251251
t.Fatal(err)
252252
}
@@ -265,7 +265,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
265265
origCtx, store, cluster := testSetup(t)
266266
defer cluster.Terminate(t)
267267
ctx, cancel := context.WithCancel(origCtx)
268-
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
268+
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
269269
// make resutlChan and errChan blocking to ensure ordering.
270270
w.resultChan = make(chan watch.Event)
271271
w.errChan = make(chan error)
@@ -314,6 +314,37 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
314314
}
315315
}
316316

317+
func TestProgressNotify(t *testing.T) {
318+
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
319+
clusterConfig := &integration.ClusterConfig{
320+
Size: 1,
321+
WatchProgressNotifyInterval: time.Second,
322+
}
323+
cluster := integration.NewClusterV3(t, clusterConfig)
324+
defer cluster.Terminate(t)
325+
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
326+
ctx := context.Background()
327+
328+
key := "/somekey"
329+
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}
330+
out := &example.Pod{}
331+
if err := store.Create(ctx, key, input, out, 0); err != nil {
332+
t.Fatalf("Create failed: %v", err)
333+
}
334+
335+
opts := storage.ListOptions{
336+
ResourceVersion: out.ResourceVersion,
337+
Predicate: storage.Everything,
338+
ProgressNotify: true,
339+
}
340+
w, err := store.Watch(ctx, key, opts)
341+
if err != nil {
342+
t.Fatalf("Watch failed: %v", err)
343+
}
344+
result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}}
345+
testCheckResult(t, 0, watch.Bookmark, w, result)
346+
}
347+
317348
type testWatchStruct struct {
318349
obj *example.Pod
319350
expectEvent bool

staging/src/k8s.io/apiserver/pkg/storage/interfaces.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,4 +269,7 @@ type ListOptions struct {
269269
ResourceVersionMatch metav1.ResourceVersionMatch
270270
// Predicate provides the selection rules for the list operation.
271271
Predicate SelectionPredicate
272+
// ProgressNotify determines whether storage-originated bookmark (progress notify) events should
273+
// be delivered to the users. The option is ignored for non-watch requests.
274+
ProgressNotify bool
272275
}

0 commit comments

Comments
 (0)