Skip to content

Commit a454563

Browse files
authored
Merge pull request kubernetes#127812 from p0lyn0mial/upstream-decode-list-blueprint
client-go/rest/request: decodes initialEventsListBlueprint for watchlist requests
2 parents faf89fe + 7be192a commit a454563

File tree

4 files changed

+158
-100
lines changed

4 files changed

+158
-100
lines changed

staging/src/k8s.io/client-go/dynamic/client_test.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package dynamic
1919
import (
2020
"bytes"
2121
"context"
22+
"encoding/base64"
2223
"fmt"
2324
"io"
2425
"net/http"
@@ -183,7 +184,10 @@ func TestWatchList(t *testing.T) {
183184
{Type: watch.Bookmark, Object: func() runtime.Object {
184185
obj := getObject("gtest/vTest", "rTest", "item2")
185186
obj.SetResourceVersion("10")
186-
obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"})
187+
obj.SetAnnotations(map[string]string{
188+
metav1.InitialEventsAnnotationKey: "true",
189+
metav1.InitialEventsListBlueprintAnnotationKey: base64.StdEncoding.EncodeToString(getJSON("vTest", "rTests", "")),
190+
})
187191
return obj
188192
}()},
189193
},
@@ -195,9 +199,10 @@ func TestWatchList(t *testing.T) {
195199
},
196200
expectedList: &unstructured.UnstructuredList{
197201
Object: map[string]interface{}{
198-
"apiVersion": "",
199-
"kind": "UnstructuredList",
202+
"apiVersion": "vTest",
203+
"kind": "rTests",
200204
"metadata": map[string]interface{}{
205+
"name": "",
201206
"resourceVersion": "10",
202207
},
203208
},
@@ -215,7 +220,10 @@ func TestWatchList(t *testing.T) {
215220
{Type: watch.Bookmark, Object: func() runtime.Object {
216221
obj := getObject("gtest/vTest", "rTest", "item2")
217222
obj.SetResourceVersion("39")
218-
obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"})
223+
obj.SetAnnotations(map[string]string{
224+
metav1.InitialEventsAnnotationKey: "true",
225+
metav1.InitialEventsListBlueprintAnnotationKey: base64.StdEncoding.EncodeToString(getJSON("vTest", "rTests", "")),
226+
})
219227
return obj
220228
}()},
221229
},
@@ -227,9 +235,10 @@ func TestWatchList(t *testing.T) {
227235
},
228236
expectedList: &unstructured.UnstructuredList{
229237
Object: map[string]interface{}{
230-
"apiVersion": "",
231-
"kind": "UnstructuredList",
238+
"apiVersion": "vTest",
239+
"kind": "rTests",
232240
"metadata": map[string]interface{}{
241+
"name": "",
233242
"resourceVersion": "39",
234243
},
235244
},

staging/src/k8s.io/client-go/rest/request.go

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package rest
1919
import (
2020
"bytes"
2121
"context"
22+
"encoding/base64"
2223
"encoding/hex"
2324
"fmt"
2425
"io"
@@ -701,14 +702,19 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) {
701702
// Watch attempts to begin watching the requested location.
702703
// Returns a watch.Interface, or an error.
703704
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
705+
w, _, e := r.watchInternal(ctx)
706+
return w, e
707+
}
708+
709+
func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) {
704710
if r.body == nil {
705711
logBody(ctx, 2, "Request Body", r.bodyBytes)
706712
}
707713

708714
// We specifically don't want to rate limit watches, so we
709715
// don't use r.rateLimiter here.
710716
if r.err != nil {
711-
return nil, r.err
717+
return nil, nil, r.err
712718
}
713719

714720
client := r.c.Client
@@ -728,12 +734,12 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
728734
url := r.URL().String()
729735
for {
730736
if err := retry.Before(ctx, r); err != nil {
731-
return nil, retry.WrapPreviousError(err)
737+
return nil, nil, retry.WrapPreviousError(err)
732738
}
733739

734740
req, err := r.newHTTPRequest(ctx)
735741
if err != nil {
736-
return nil, err
742+
return nil, nil, err
737743
}
738744

739745
resp, err := client.Do(req)
@@ -761,14 +767,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
761767
}()
762768
if done {
763769
if isErrRetryableFunc(req, err) {
764-
return watch.NewEmptyWatch(), nil
770+
return watch.NewEmptyWatch(), nil, nil
765771
}
766772
if err == nil {
767773
// if the server sent us an HTTP Response object,
768774
// we need to return the error object from that.
769775
err = transformErr
770776
}
771-
return nil, retry.WrapPreviousError(err)
777+
return nil, nil, retry.WrapPreviousError(err)
772778
}
773779
}
774780
}
@@ -786,29 +792,52 @@ type WatchListResult struct {
786792
// the end of the stream.
787793
initialEventsEndBookmarkRV string
788794

789-
// gv represents the API version
790-
// it is used to construct the final list response
791-
// normally this information is filled by the server
792-
gv schema.GroupVersion
795+
// negotiatedObjectDecoder knows how to decode
796+
// the initialEventsListBlueprint
797+
negotiatedObjectDecoder runtime.Decoder
798+
799+
// base64EncodedInitialEventsListBlueprint contains an empty,
800+
// versioned list encoded in the requested format
801+
// (e.g., protobuf, JSON, CBOR) and stored as a base64-encoded string
802+
base64EncodedInitialEventsListBlueprint string
793803
}
794804

805+
// Into stores the result into obj. The passed obj parameter must be a pointer to a list type.
806+
//
807+
// Note:
808+
//
809+
// Special attention should be given to the type *unstructured.Unstructured,
810+
// which represents a list type but does not have an "Items" field.
811+
// Users who directly use RESTClient may store the response in such an object.
812+
// This particular case is not handled by the current implementation of this function,
813+
// but may be considered for future updates.
795814
func (r WatchListResult) Into(obj runtime.Object) error {
796815
if r.err != nil {
797816
return r.err
798817
}
799818

800-
listPtr, err := meta.GetItemsPtr(obj)
819+
listItemsPtr, err := meta.GetItemsPtr(obj)
801820
if err != nil {
802821
return err
803822
}
804-
listVal, err := conversion.EnforcePtr(listPtr)
823+
listVal, err := conversion.EnforcePtr(listItemsPtr)
805824
if err != nil {
806825
return err
807826
}
808827
if listVal.Kind() != reflect.Slice {
809828
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
810829
}
811830

831+
encodedInitialEventsListBlueprint, err := base64.StdEncoding.DecodeString(r.base64EncodedInitialEventsListBlueprint)
832+
if err != nil {
833+
return fmt.Errorf("failed to decode the received blueprint list, err %w", err)
834+
}
835+
836+
err = runtime.DecodeInto(r.negotiatedObjectDecoder, encodedInitialEventsListBlueprint, obj)
837+
if err != nil {
838+
return err
839+
}
840+
812841
if len(r.items) == 0 {
813842
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
814843
} else {
@@ -826,15 +855,6 @@ func (r WatchListResult) Into(obj runtime.Object) error {
826855
return err
827856
}
828857
listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV)
829-
830-
typeMeta, err := meta.TypeAccessor(obj)
831-
if err != nil {
832-
return err
833-
}
834-
version := r.gv.String()
835-
typeMeta.SetAPIVersion(version)
836-
typeMeta.SetKind(reflect.TypeOf(obj).Elem().Name())
837-
838858
return nil
839859
}
840860

@@ -857,16 +877,16 @@ func (r *Request) WatchList(ctx context.Context) WatchListResult {
857877
// Most users use the generated client, which handles the proper setting of parameters.
858878
// We don't have validation for other methods (e.g., the Watch)
859879
// thus, for symmetry, we haven't added additional checks for the WatchList method.
860-
w, err := r.Watch(ctx)
880+
w, d, err := r.watchInternal(ctx)
861881
if err != nil {
862882
return WatchListResult{err: err}
863883
}
864-
return r.handleWatchList(ctx, w)
884+
return r.handleWatchList(ctx, w, d)
865885
}
866886

867887
// handleWatchList holds the actual logic for easier unit testing.
868888
// Note that this function will close the passed watch.
869-
func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchListResult {
889+
func (r *Request) handleWatchList(ctx context.Context, w watch.Interface, negotiatedObjectDecoder runtime.Decoder) WatchListResult {
870890
defer w.Stop()
871891
var lastKey string
872892
var items []runtime.Object
@@ -900,10 +920,15 @@ func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchL
900920
lastKey = key
901921
case watch.Bookmark:
902922
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
923+
base64EncodedInitialEventsListBlueprint := meta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey]
924+
if len(base64EncodedInitialEventsListBlueprint) == 0 {
925+
return WatchListResult{err: fmt.Errorf("%q annotation is missing content", metav1.InitialEventsListBlueprintAnnotationKey)}
926+
}
903927
return WatchListResult{
904-
items: items,
905-
initialEventsEndBookmarkRV: meta.GetResourceVersion(),
906-
gv: r.c.content.GroupVersion,
928+
items: items,
929+
initialEventsEndBookmarkRV: meta.GetResourceVersion(),
930+
negotiatedObjectDecoder: negotiatedObjectDecoder,
931+
base64EncodedInitialEventsListBlueprint: base64EncodedInitialEventsListBlueprint,
907932
}
908933
}
909934
default:
@@ -913,15 +938,15 @@ func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchL
913938
}
914939
}
915940

916-
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
941+
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, runtime.Decoder, error) {
917942
contentType := resp.Header.Get("Content-Type")
918943
mediaType, params, err := mime.ParseMediaType(contentType)
919944
if err != nil {
920945
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
921946
}
922947
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
923948
if err != nil {
924-
return nil, err
949+
return nil, nil, err
925950
}
926951

927952
handleWarnings(resp.Header, r.warningHandler)
@@ -934,7 +959,7 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
934959
// use 500 to indicate that the cause of the error is unknown - other error codes
935960
// are more specific to HTTP interactions, and set a reason
936961
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
937-
), nil
962+
), objectDecoder, nil
938963
}
939964

940965
// updateRequestResultMetric increments the RequestResult metric counter,

0 commit comments

Comments
 (0)