Skip to content

Commit fbf1a0d

Browse files
authored
apiserver/handlers/watch: encode initialEventsListBlueprint with watchEncoder (kubernetes#127587)
* apiserver/handlers/get: construct versionedList * storage/cacher: document caching the serialization of bookmark events * endpoints/handlers/response: add watchListTransformer * endpoints/handlers/watch: wire watchListTransformer
1 parent 22a30e7 commit fbf1a0d

File tree

5 files changed

+262
-19
lines changed

5 files changed

+262
-19
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,16 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
265265
if timeout == 0 && minRequestTimeout > 0 {
266266
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
267267
}
268+
269+
var emptyVersionedList runtime.Object
270+
if isListWatchRequest(opts) {
271+
emptyVersionedList, err = scope.Convertor.ConvertToVersion(r.NewList(), scope.Kind.GroupVersion())
272+
if err != nil {
273+
scope.err(errors.NewInternalError(err), w, req)
274+
return
275+
}
276+
}
277+
268278
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
269279
ctx, cancel := context.WithTimeout(ctx, timeout)
270280
defer func() { cancel() }()
@@ -273,7 +283,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
273283
scope.err(err, w, req)
274284
return
275285
}
276-
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts))
286+
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts), emptyVersionedList)
277287
if err != nil {
278288
scope.err(err, w, req)
279289
return

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go

Lines changed: 111 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package handlers
1818

1919
import (
2020
"context"
21+
"encoding/base64"
2122
"encoding/json"
2223
"fmt"
2324
"io"
@@ -38,8 +39,9 @@ import (
3839
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
3940
"k8s.io/apiserver/pkg/endpoints/metrics"
4041
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
42+
"k8s.io/apiserver/pkg/storage"
4143

42-
klog "k8s.io/klog/v2"
44+
"k8s.io/klog/v2"
4345
)
4446

4547
// watchEmbeddedEncoder performs encoding of the embedded object.
@@ -147,22 +149,25 @@ type watchEncoder struct {
147149
encoder runtime.Encoder
148150
framer io.Writer
149151

152+
watchListTransformerFn watchListTransformerFunction
153+
150154
buffer runtime.Splice
151155
eventBuffer runtime.Splice
152156

153157
currentEmbeddedIdentifier runtime.Identifier
154158
identifiers map[watch.EventType]runtime.Identifier
155159
}
156160

157-
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder {
161+
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer, watchListTransformerFn watchListTransformerFunction) *watchEncoder {
158162
return &watchEncoder{
159-
ctx: ctx,
160-
kind: kind,
161-
embeddedEncoder: embeddedEncoder,
162-
encoder: encoder,
163-
framer: framer,
164-
buffer: runtime.NewSpliceBuffer(),
165-
eventBuffer: runtime.NewSpliceBuffer(),
163+
ctx: ctx,
164+
kind: kind,
165+
embeddedEncoder: embeddedEncoder,
166+
encoder: encoder,
167+
framer: framer,
168+
watchListTransformerFn: watchListTransformerFn,
169+
buffer: runtime.NewSpliceBuffer(),
170+
eventBuffer: runtime.NewSpliceBuffer(),
166171
}
167172
}
168173

@@ -174,6 +179,12 @@ func (e *watchEncoder) Encode(event watch.Event) error {
174179
encodeFunc := func(obj runtime.Object, w io.Writer) error {
175180
return e.doEncode(obj, event, w)
176181
}
182+
if event.Type == watch.Bookmark {
183+
// Bookmark objects are small, and we don't yet support serialization for them.
184+
// Additionally, we need to additionally transform them to support watch-list feature
185+
event = e.watchListTransformerFn(event)
186+
return encodeFunc(event.Object, e.framer)
187+
}
177188
if co, ok := event.Object.(runtime.CacheableObject); ok {
178189
return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer)
179190
}
@@ -479,3 +490,94 @@ func asPartialObjectMetadataList(result runtime.Object, groupVersion schema.Grou
479490
return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion))
480491
}
481492
}
493+
494+
// watchListTransformerFunction an optional function
495+
// applied to watchlist bookmark events that transforms
496+
// the embedded object before sending it to a client.
497+
type watchListTransformerFunction func(watch.Event) watch.Event
498+
499+
// watchListTransformer performs transformation of
500+
// a special watchList bookmark event.
501+
//
502+
// The bookmark is annotated with InitialEventsListBlueprintAnnotationKey
503+
// and contains an empty, versioned list that we must encode in the requested format
504+
// (e.g., protobuf, JSON, CBOR) and then store as a base64-encoded string.
505+
type watchListTransformer struct {
506+
initialEventsListBlueprint runtime.Object
507+
targetGVK *schema.GroupVersionKind
508+
negotiatedEncoder runtime.Encoder
509+
buffer runtime.Splice
510+
}
511+
512+
// createWatchListTransformerIfRequested returns a transformer function for watchlist bookmark event.
513+
func newWatchListTransformer(initialEventsListBlueprint runtime.Object, targetGVK *schema.GroupVersionKind, negotiatedEncoder runtime.Encoder) *watchListTransformer {
514+
return &watchListTransformer{
515+
initialEventsListBlueprint: initialEventsListBlueprint,
516+
targetGVK: targetGVK,
517+
negotiatedEncoder: negotiatedEncoder,
518+
buffer: runtime.NewSpliceBuffer(),
519+
}
520+
}
521+
522+
func (e *watchListTransformer) transform(event watch.Event) watch.Event {
523+
if e.initialEventsListBlueprint == nil {
524+
return event
525+
}
526+
hasAnnotation, err := storage.HasInitialEventsEndBookmarkAnnotation(event.Object)
527+
if err != nil {
528+
return newWatchEventErrorFor(err)
529+
}
530+
if !hasAnnotation {
531+
return event
532+
}
533+
534+
if err = e.encodeInitialEventsListBlueprint(event.Object); err != nil {
535+
return newWatchEventErrorFor(err)
536+
}
537+
538+
return event
539+
}
540+
541+
func (e *watchListTransformer) encodeInitialEventsListBlueprint(object runtime.Object) error {
542+
initialEventsListBlueprint, err := e.transformInitialEventsListBlueprint()
543+
if err != nil {
544+
return err
545+
}
546+
547+
defer e.buffer.Reset()
548+
if err = e.negotiatedEncoder.Encode(initialEventsListBlueprint, e.buffer); err != nil {
549+
return err
550+
}
551+
encodedInitialEventsListBlueprint := e.buffer.Bytes()
552+
553+
// the storage layer creates a deep copy of the obj before modifying it.
554+
// since the object has the annotation, we can modify it directly.
555+
objectMeta, err := meta.Accessor(object)
556+
if err != nil {
557+
return err
558+
}
559+
annotations := objectMeta.GetAnnotations()
560+
annotations[metav1.InitialEventsListBlueprintAnnotationKey] = base64.StdEncoding.EncodeToString(encodedInitialEventsListBlueprint)
561+
objectMeta.SetAnnotations(annotations)
562+
563+
return nil
564+
}
565+
566+
func (e *watchListTransformer) transformInitialEventsListBlueprint() (runtime.Object, error) {
567+
if e.targetGVK != nil && e.targetGVK.Kind == "PartialObjectMetadata" {
568+
return asPartialObjectMetadataList(e.initialEventsListBlueprint, e.targetGVK.GroupVersion())
569+
}
570+
return e.initialEventsListBlueprint, nil
571+
}
572+
573+
func newWatchEventErrorFor(err error) watch.Event {
574+
return watch.Event{
575+
Type: watch.Error,
576+
Object: &metav1.Status{
577+
Status: metav1.StatusFailure,
578+
Message: err.Error(),
579+
Reason: metav1.StatusReasonInternalError,
580+
Code: http.StatusInternalServerError,
581+
},
582+
}
583+
}

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,28 @@ limitations under the License.
1717
package handlers
1818

1919
import (
20+
"bytes"
2021
"context"
22+
"encoding/base64"
2123
"fmt"
2224
"io"
2325
"net/http"
2426
"reflect"
2527
"testing"
2628
"time"
2729

30+
v1 "k8s.io/api/core/v1"
31+
"k8s.io/apimachinery/pkg/api/meta"
2832
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2933
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
3034
"k8s.io/apimachinery/pkg/runtime"
3135
"k8s.io/apimachinery/pkg/runtime/schema"
36+
runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json"
37+
"k8s.io/apimachinery/pkg/watch"
3238
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
3339
"k8s.io/apiserver/pkg/endpoints/request"
3440
"k8s.io/apiserver/pkg/registry/rest"
41+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3542
)
3643

3744
var _ runtime.CacheableObject = &mockCacheableObject{}
@@ -222,3 +229,118 @@ func TestWatchEncoderIdentifier(t *testing.T) {
222229
t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier")
223230
}
224231
}
232+
233+
func TestWatchListEncoder(t *testing.T) {
234+
makePartialObjectMetadataListWithoutKind := func(rv string) *metav1.PartialObjectMetadataList {
235+
return &metav1.PartialObjectMetadataList{
236+
// do not set the type info to match
237+
// newWatchListTransformer
238+
ListMeta: metav1.ListMeta{ResourceVersion: rv},
239+
}
240+
}
241+
makePodListWithKind := func(rv string) *v1.PodList {
242+
return &v1.PodList{
243+
TypeMeta: metav1.TypeMeta{
244+
// set the type info so
245+
// that it differs from
246+
// PartialObjectMetadataList
247+
Kind: "PodList",
248+
},
249+
ListMeta: metav1.ListMeta{
250+
ResourceVersion: rv,
251+
},
252+
}
253+
}
254+
makeBookmarkEventFor := func(pod *v1.Pod) watch.Event {
255+
return watch.Event{
256+
Type: watch.Bookmark,
257+
Object: pod,
258+
}
259+
}
260+
makePod := func(name string) *v1.Pod {
261+
return &v1.Pod{
262+
ObjectMeta: metav1.ObjectMeta{
263+
Name: name,
264+
Namespace: "ns",
265+
Annotations: map[string]string{},
266+
},
267+
}
268+
}
269+
makePodWithInitialEventsAnnotation := func(name string) *v1.Pod {
270+
p := makePod(name)
271+
p.Annotations[metav1.InitialEventsAnnotationKey] = "true"
272+
return p
273+
}
274+
275+
scenarios := []struct {
276+
name string
277+
negotiatedEncoder runtime.Serializer
278+
targetGVK *schema.GroupVersionKind
279+
280+
actualEvent watch.Event
281+
listBlueprint runtime.Object
282+
283+
expectedBase64ListBlueprint string
284+
}{
285+
{
286+
name: "pass through, an obj without the annotation received",
287+
actualEvent: makeBookmarkEventFor(makePod("1")),
288+
negotiatedEncoder: newJSONSerializer(),
289+
},
290+
{
291+
name: "encodes the initialEventsListBlueprint if an obj with the annotation is passed",
292+
actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("1")),
293+
listBlueprint: makePodListWithKind("100"),
294+
expectedBase64ListBlueprint: encodeObjectToBase64String(makePodListWithKind("100"), t),
295+
negotiatedEncoder: newJSONSerializer(),
296+
},
297+
{
298+
name: "encodes the initialEventsListBlueprint as PartialObjectMetadata when requested",
299+
targetGVK: &schema.GroupVersionKind{Group: "meta.k8s.io", Version: "v1", Kind: "PartialObjectMetadata"},
300+
actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("2")),
301+
listBlueprint: makePodListWithKind("101"),
302+
expectedBase64ListBlueprint: encodeObjectToBase64String(makePartialObjectMetadataListWithoutKind("101"), t),
303+
negotiatedEncoder: newJSONSerializer(),
304+
},
305+
}
306+
307+
for _, scenario := range scenarios {
308+
t.Run(scenario.name, func(t *testing.T) {
309+
target := newWatchListTransformer(scenario.listBlueprint, scenario.targetGVK, scenario.negotiatedEncoder)
310+
transformedEvent := target.transform(scenario.actualEvent)
311+
312+
actualObjectMeta, err := meta.Accessor(transformedEvent.Object)
313+
if err != nil {
314+
t.Fatal(err)
315+
}
316+
317+
base64ListBlueprint, ok := actualObjectMeta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey]
318+
if !ok && len(scenario.expectedBase64ListBlueprint) != 0 {
319+
t.Fatalf("the encoded obj doesn't have %q", metav1.InitialEventsListBlueprintAnnotationKey)
320+
}
321+
if base64ListBlueprint != scenario.expectedBase64ListBlueprint {
322+
t.Fatalf("unexpected base64ListBlueprint = %s, expected = %s", base64ListBlueprint, scenario.expectedBase64ListBlueprint)
323+
}
324+
})
325+
}
326+
}
327+
328+
func encodeObjectToBase64String(obj runtime.Object, t *testing.T) string {
329+
e := newJSONSerializer()
330+
331+
var buf bytes.Buffer
332+
err := e.Encode(obj, &buf)
333+
if err != nil {
334+
t.Fatal(err)
335+
}
336+
return base64.StdEncoding.EncodeToString(buf.Bytes())
337+
}
338+
339+
func newJSONSerializer() runtime.Serializer {
340+
return runtimejson.NewSerializerWithOptions(
341+
runtimejson.DefaultMetaFactory,
342+
clientgoscheme.Scheme,
343+
clientgoscheme.Scheme,
344+
runtimejson.SerializerOptions{},
345+
)
346+
}

0 commit comments

Comments
 (0)