Skip to content

Commit a885e44

Browse files
authored
Merge pull request kubernetes#128501 from benluddy/watch-cbor-seq
KEP-4222: Use cbor-seq content-type for CBOR watch responses.
2 parents 0edef5a + 504f149 commit a885e44

File tree

6 files changed

+121
-5
lines changed

6 files changed

+121
-5
lines changed

staging/src/k8s.io/apimachinery/pkg/runtime/types.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ type TypeMeta struct {
4343
}
4444

4545
const (
46-
ContentTypeJSON string = "application/json"
47-
ContentTypeYAML string = "application/yaml"
48-
ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf"
49-
ContentTypeCBOR string = "application/cbor"
46+
ContentTypeJSON string = "application/json"
47+
ContentTypeYAML string = "application/yaml"
48+
ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf"
49+
ContentTypeCBOR string = "application/cbor" // RFC 8949
50+
ContentTypeCBORSequence string = "application/cbor-seq" // RFC 8742
5051
)
5152

5253
// RawExtension is used to hold extensions in external versions.

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation/negotiate.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,19 @@ import (
2626

2727
"k8s.io/apimachinery/pkg/runtime"
2828
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/apiserver/pkg/features"
30+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2931
)
3032

3133
// MediaTypesForSerializer returns a list of media and stream media types for the server.
3234
func MediaTypesForSerializer(ns runtime.NegotiatedSerializer) (mediaTypes, streamMediaTypes []string) {
3335
for _, info := range ns.SupportedMediaTypes() {
3436
mediaTypes = append(mediaTypes, info.MediaType)
3537
if info.StreamSerializer != nil {
38+
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) && info.MediaType == runtime.ContentTypeCBOR {
39+
streamMediaTypes = append(streamMediaTypes, runtime.ContentTypeCBORSequence)
40+
continue
41+
}
3642
// stream=watch is the existing mime-type parameter for watch
3743
streamMediaTypes = append(streamMediaTypes, info.MediaType+";stream=watch")
3844
}

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,17 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
8888
}
8989
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
9090
mediaType := serializer.MediaType
91-
if mediaType != runtime.ContentTypeJSON {
91+
switch mediaType {
92+
case runtime.ContentTypeJSON:
93+
// as-is
94+
case runtime.ContentTypeCBOR:
95+
// If a client indicated it accepts application/cbor (exactly one data item) on a
96+
// watch request, set the conformant application/cbor-seq media type the watch
97+
// response. RFC 9110 allows an origin server to deviate from the indicated
98+
// preference rather than send a 406 (Not Acceptable) response (see
99+
// https://www.rfc-editor.org/rfc/rfc9110.html#section-12.1-5).
100+
mediaType = runtime.ContentTypeCBORSequence
101+
default:
92102
mediaType += ";stream=watch"
93103
}
94104

staging/src/k8s.io/client-go/rest/client.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientConte
129129

130130
func scrubCBORContentConfigIfDisabled(content ClientContentConfig) ClientContentConfig {
131131
if clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) {
132+
content.Negotiator = clientNegotiatorWithCBORSequenceStreamDecoder{content.Negotiator}
132133
return content
133134
}
134135

@@ -294,3 +295,38 @@ func (p *requestClientContentConfigProvider) UnsupportedMediaType(requestContent
294295
p.sawUnsupportedMediaTypeForCBOR.Store(true)
295296
}
296297
}
298+
299+
// clientNegotiatorWithCBORSequenceStreamDecoder is a ClientNegotiator that delegates to another
300+
// ClientNegotiator to select the appropriate Encoder or Decoder for a given media type. As a
301+
// special case, it will resolve "application/cbor-seq" (a CBOR Sequence, the concatenation of zero
302+
// or more CBOR data items) as an alias for "application/cbor" (exactly one CBOR data item) when
303+
// selecting a stream decoder.
304+
type clientNegotiatorWithCBORSequenceStreamDecoder struct {
305+
negotiator runtime.ClientNegotiator
306+
}
307+
308+
func (n clientNegotiatorWithCBORSequenceStreamDecoder) Encoder(contentType string, params map[string]string) (runtime.Encoder, error) {
309+
return n.negotiator.Encoder(contentType, params)
310+
}
311+
312+
func (n clientNegotiatorWithCBORSequenceStreamDecoder) Decoder(contentType string, params map[string]string) (runtime.Decoder, error) {
313+
return n.negotiator.Decoder(contentType, params)
314+
}
315+
316+
func (n clientNegotiatorWithCBORSequenceStreamDecoder) StreamDecoder(contentType string, params map[string]string) (runtime.Decoder, runtime.Serializer, runtime.Framer, error) {
317+
if !clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) {
318+
return n.negotiator.StreamDecoder(contentType, params)
319+
}
320+
321+
switch contentType {
322+
case runtime.ContentTypeCBORSequence:
323+
return n.negotiator.StreamDecoder(runtime.ContentTypeCBOR, params)
324+
case runtime.ContentTypeCBOR:
325+
// This media type is only appropriate for exactly one data item, not the zero or
326+
// more events of a watch stream.
327+
return nil, nil, nil, runtime.NegotiateError{ContentType: contentType, Stream: true}
328+
default:
329+
return n.negotiator.StreamDecoder(contentType, params)
330+
}
331+
332+
}

test/integration/client/client_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,6 +1430,30 @@ func TestClientCBOREnablement(t *testing.T) {
14301430
return err
14311431
}
14321432

1433+
DoWatchRequestWithGenericTypedClient := func(t *testing.T, config *rest.Config) error {
1434+
clientset, err := kubernetes.NewForConfig(config)
1435+
if err != nil {
1436+
t.Fatal(err)
1437+
}
1438+
1439+
// Generated clients for built-in types include the PreferProtobuf option, which
1440+
// forces Protobuf encoding on a per-request basis.
1441+
client := gentype.NewClientWithListAndApply[*v1.Namespace, *v1.NamespaceList, *corev1ac.NamespaceApplyConfiguration](
1442+
"namespaces",
1443+
clientset.CoreV1().RESTClient(),
1444+
clientscheme.ParameterCodec,
1445+
"",
1446+
func() *v1.Namespace { return &v1.Namespace{} },
1447+
func() *v1.NamespaceList { return &v1.NamespaceList{} },
1448+
)
1449+
w, err := client.Watch(context.TODO(), metav1.ListOptions{LabelSelector: "a,!a"})
1450+
if err != nil {
1451+
return err
1452+
}
1453+
w.Stop()
1454+
return nil
1455+
}
1456+
14331457
type testCase struct {
14341458
name string
14351459
served bool
@@ -1650,6 +1674,20 @@ func TestClientCBOREnablement(t *testing.T) {
16501674
wantStatusError: false,
16511675
doRequest: DoRequestWithGenericTypedClient,
16521676
},
1677+
{
1678+
name: "generated client watch accept cbor and json get cbor-seq",
1679+
served: true,
1680+
allowed: true,
1681+
preferred: false,
1682+
configuredContentType: "application/json",
1683+
configuredAccept: "application/cbor;q=1,application/json;q=0.9",
1684+
wantRequestContentType: "",
1685+
wantRequestAccept: "application/cbor;q=1,application/json;q=0.9",
1686+
wantResponseContentType: "application/cbor-seq",
1687+
wantResponseStatus: http.StatusOK,
1688+
wantStatusError: false,
1689+
doRequest: DoWatchRequestWithGenericTypedClient,
1690+
},
16531691
{
16541692
name: "generated client accept cbor and json get json cbor not served",
16551693
served: false,

test/integration/client/dynamic_client_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,20 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
408408
return err
409409
}
410410

411+
DoWatch := func(t *testing.T, config *rest.Config) error {
412+
client, err := dynamic.NewForConfig(config)
413+
if err != nil {
414+
t.Fatal(err)
415+
}
416+
417+
w, err := client.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Watch(context.TODO(), metav1.ListOptions{LabelSelector: "a,!a"})
418+
if err != nil {
419+
return err
420+
}
421+
w.Stop()
422+
return nil
423+
}
424+
411425
testCases := []struct {
412426
name string
413427
serving bool
@@ -540,6 +554,17 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
540554
wantStatusError: true,
541555
doRequest: DoApply,
542556
},
557+
{
558+
name: "watch accepts both gets cbor-seq",
559+
serving: true,
560+
allowed: true,
561+
preferred: false,
562+
wantRequestAccept: "application/json;q=0.9,application/cbor;q=1",
563+
wantResponseContentType: "application/cbor-seq",
564+
wantResponseStatus: http.StatusOK,
565+
wantStatusError: false,
566+
doRequest: DoWatch,
567+
},
543568
}
544569

545570
for _, serving := range []bool{true, false} {

0 commit comments

Comments
 (0)