From d7c6a6a75cc3adf4f61949111a11933aa16d63f6 Mon Sep 17 00:00:00 2001 From: nkeert <197718357+nkeert@users.noreply.github.com> Date: Sat, 15 Feb 2025 10:23:21 +0530 Subject: [PATCH 1/8] UPSTREAM: 130190: Add a test to validate deferredResponseWriteron multiple write calls Signed-off-by: nkeert <197718357+nkeert@users.noreply.github.com> --- .../handlers/responsewriters/writers_test.go | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 874dc1980ebd6..6ec17847e8343 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -19,6 +19,7 @@ package responsewriters import ( "bytes" "compress/gzip" + "context" "encoding/hex" "encoding/json" "errors" @@ -32,6 +33,7 @@ import ( "os" "reflect" "strconv" + "strings" "testing" "time" @@ -371,6 +373,124 @@ func TestSerializeObject(t *testing.T) { } } +func TestDeferredResponseWriter_Write(t *testing.T) { + smallChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes-1) + largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1) + + tests := []struct { + name string + chunks [][]byte + expectGzip bool + }{ + { + name: "one small chunk write", + chunks: [][]byte{smallChunk}, + expectGzip: false, + }, + { + name: "two small chunk writes", + chunks: [][]byte{smallChunk, smallChunk}, + expectGzip: false, + }, + { + name: "one large chunk writes", + chunks: [][]byte{largeChunk}, + expectGzip: true, + }, + { + name: "two large chunk writes", + chunks: [][]byte{largeChunk, largeChunk}, + expectGzip: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockResponseWriter := httptest.NewRecorder() + + drw := &deferredResponseWriter{ + mediaType: "text/plain", + statusCode: 200, + contentEncoding: "gzip", + hw: mockResponseWriter, + ctx: context.Background(), + } + + fullPayload := []byte{} + + for _, chunk := range tt.chunks { + n, err := drw.Write(chunk) + + if err != nil { + t.Fatalf("unexpected error while writing chunk: %v", err) + } + if n != len(chunk) { + t.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n) + } + + fullPayload = append(fullPayload, chunk...) + } + + err := drw.Close() + if err != nil { + t.Fatalf("unexpected error when closing deferredResponseWriter: %v", err) + } + + res := mockResponseWriter.Result() + + if res.StatusCode != http.StatusOK { + t.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode) + } + contentEncoding := res.Header.Get("Content-Encoding") + varyHeader := res.Header.Get("Vary") + + resBytes, err := io.ReadAll(res.Body) + if err != nil { + t.Fatalf("unexpected error occurred while reading response body: %v", err) + } + + if tt.expectGzip { + if contentEncoding != "gzip" { + t.Fatalf("content-encoding is not set properly, expected: gzip, got: %s", contentEncoding) + } + + if !strings.Contains(varyHeader, "Accept-Encoding") { + t.Errorf("vary header doesn't have Accept-Encoding") + } + + gr, err := gzip.NewReader(bytes.NewReader(resBytes)) + if err != nil { + t.Fatalf("failed to create gzip reader: %v", err) + } + + decompressed, err := io.ReadAll(gr) + if err != nil { + t.Fatalf("failed to decompress: %v", err) + } + + if !bytes.Equal(fullPayload, decompressed) { + t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, decompressed) + } + + } else { + if contentEncoding != "" { + t.Errorf("content-encoding is set unexpectedly") + } + + if strings.Contains(varyHeader, "Accept-Encoding") { + t.Errorf("accept encoding is set unexpectedly") + } + + if !bytes.Equal(fullPayload, resBytes) { + t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, resBytes) + } + + } + + }) + } +} + func randTime(t *time.Time, r *rand.Rand) { *t = time.Unix(r.Int63n(1000*365*24*60*60), r.Int63()) } From c51558694338d13e57dec901cf35040bbb11373a Mon Sep 17 00:00:00 2001 From: z1cheng Date: Thu, 20 Feb 2025 18:36:47 +0800 Subject: [PATCH 2/8] UPSTREAM: 130281: Implement chunking for gzip encoder in deferredResponseWriter Signed-off-by: z1cheng --- .../handlers/responsewriters/writers.go | 60 +++++- .../handlers/responsewriters/writers_test.go | 185 +++++++++++++++--- 2 files changed, 215 insertions(+), 30 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index acd8f0357aaf8..acc95bdc65e59 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -156,6 +156,9 @@ const ( // (usually the entire object), and if the size is smaller no gzipping will be performed // if the client requests it. defaultGzipThresholdBytes = 128 * 1024 + // Use the length of the first write of streaming implementations. + // TODO: Update when streaming proto is implemented + firstWriteStreamingThresholdBytes = 1 ) // negotiateContentEncoding returns a supported client-requested content encoding for the @@ -191,14 +194,53 @@ type deferredResponseWriter struct { statusCode int contentEncoding string - hasWritten bool - hw http.ResponseWriter - w io.Writer + hasBuffered bool + buffer []byte + hasWritten bool + hw http.ResponseWriter + w io.Writer ctx context.Context } func (w *deferredResponseWriter) Write(p []byte) (n int, err error) { + switch { + case w.hasWritten: + // already written, cannot buffer + return w.unbufferedWrite(p) + + case w.contentEncoding != "gzip": + // non-gzip, no need to buffer + return w.unbufferedWrite(p) + + case !w.hasBuffered && len(p) > defaultGzipThresholdBytes: + // not yet buffered, first write is long enough to trigger gzip, no need to buffer + return w.unbufferedWrite(p) + + case !w.hasBuffered && len(p) > firstWriteStreamingThresholdBytes: + // not yet buffered, first write is longer than expected for streaming scenarios that would require buffering, no need to buffer + return w.unbufferedWrite(p) + + default: + if !w.hasBuffered { + w.hasBuffered = true + // Start at 80 bytes to avoid rapid reallocation of the buffer. + // The minimum size of a 0-item serialized list object is 80 bytes: + // {"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"1"},"items":[]}\n + w.buffer = make([]byte, 0, max(80, len(p))) + } + w.buffer = append(w.buffer, p...) + var err error + if len(w.buffer) > defaultGzipThresholdBytes { + // we've accumulated enough to trigger gzip, write and clear buffer + _, err = w.unbufferedWrite(w.buffer) + w.buffer = nil + } + return len(p), err + } +} + +func (w *deferredResponseWriter) unbufferedWrite(p []byte) (n int, err error) { ctx := w.ctx span := tracing.SpanFromContext(ctx) // This Step usually wraps in-memory object serialization. @@ -244,11 +286,17 @@ func (w *deferredResponseWriter) Write(p []byte) (n int, err error) { return w.w.Write(p) } -func (w *deferredResponseWriter) Close() error { +func (w *deferredResponseWriter) Close() (err error) { if !w.hasWritten { - return nil + if !w.hasBuffered { + return nil + } + // never reached defaultGzipThresholdBytes, no need to do the gzip writer cleanup + _, err := w.unbufferedWrite(w.buffer) + w.buffer = nil + return err } - var err error + switch t := w.w.(type) { case *gzip.Writer: err = t.Close() diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 6ec17847e8343..195b161fe0a05 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -33,7 +33,6 @@ import ( "os" "reflect" "strconv" - "strings" "testing" "time" @@ -42,6 +41,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + rand2 "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -378,29 +378,94 @@ func TestDeferredResponseWriter_Write(t *testing.T) { largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1) tests := []struct { - name string - chunks [][]byte - expectGzip bool + name string + chunks [][]byte + expectGzip bool + expectHeaders http.Header }{ + { + name: "no writes", + chunks: nil, + expectGzip: false, + expectHeaders: http.Header{}, + }, + { + name: "one empty write", + chunks: [][]byte{{}}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "one single byte write", + chunks: [][]byte{{'{'}}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, { name: "one small chunk write", chunks: [][]byte{smallChunk}, expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, }, { name: "two small chunk writes", chunks: [][]byte{smallChunk, smallChunk}, expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "one single byte and one small chunk write", + chunks: [][]byte{{'{'}, smallChunk}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "two single bytes and one small chunk write", + chunks: [][]byte{{'{'}, {'{'}, smallChunk}, + expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, }, { name: "one large chunk writes", chunks: [][]byte{largeChunk}, expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, }, { name: "two large chunk writes", chunks: [][]byte{largeChunk, largeChunk}, expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + }, + { + name: "one small chunk and one large chunk write", + chunks: [][]byte{smallChunk, largeChunk}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, }, } @@ -441,8 +506,9 @@ func TestDeferredResponseWriter_Write(t *testing.T) { if res.StatusCode != http.StatusOK { t.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode) } - contentEncoding := res.Header.Get("Content-Encoding") - varyHeader := res.Header.Get("Vary") + if !reflect.DeepEqual(res.Header, tt.expectHeaders) { + t.Fatal(cmp.Diff(tt.expectHeaders, res.Header)) + } resBytes, err := io.ReadAll(res.Body) if err != nil { @@ -450,14 +516,6 @@ func TestDeferredResponseWriter_Write(t *testing.T) { } if tt.expectGzip { - if contentEncoding != "gzip" { - t.Fatalf("content-encoding is not set properly, expected: gzip, got: %s", contentEncoding) - } - - if !strings.Contains(varyHeader, "Accept-Encoding") { - t.Errorf("vary header doesn't have Accept-Encoding") - } - gr, err := gzip.NewReader(bytes.NewReader(resBytes)) if err != nil { t.Fatalf("failed to create gzip reader: %v", err) @@ -471,22 +529,101 @@ func TestDeferredResponseWriter_Write(t *testing.T) { if !bytes.Equal(fullPayload, decompressed) { t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, decompressed) } - } else { - if contentEncoding != "" { - t.Errorf("content-encoding is set unexpectedly") - } - - if strings.Contains(varyHeader, "Accept-Encoding") { - t.Errorf("accept encoding is set unexpectedly") - } - if !bytes.Equal(fullPayload, resBytes) { t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, resBytes) } - } + }) + } +} + +func benchmarkChunkingGzip(b *testing.B, count int, chunk []byte) { + mockResponseWriter := httptest.NewRecorder() + mockResponseWriter.Body = nil + + drw := &deferredResponseWriter{ + mediaType: "text/plain", + statusCode: 200, + contentEncoding: "gzip", + hw: mockResponseWriter, + ctx: context.Background(), + } + b.ResetTimer() + for i := 0; i < count; i++ { + n, err := drw.Write(chunk) + if err != nil { + b.Fatalf("unexpected error while writing chunk: %v", err) + } + if n != len(chunk) { + b.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n) + } + } + err := drw.Close() + if err != nil { + b.Fatalf("unexpected error when closing deferredResponseWriter: %v", err) + } + res := mockResponseWriter.Result() + if res.StatusCode != http.StatusOK { + b.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode) + } +} + +func BenchmarkChunkingGzip(b *testing.B) { + tests := []struct { + count int + size int + }{ + { + count: 100, + size: 1_000, + }, + { + count: 100, + size: 100_000, + }, + { + count: 1_000, + size: 100_000, + }, + { + count: 1_000, + size: 1_000_000, + }, + { + count: 10_000, + size: 100_000, + }, + { + count: 100_000, + size: 10_000, + }, + { + count: 1, + size: 100_000, + }, + { + count: 1, + size: 1_000_000, + }, + { + count: 1, + size: 10_000_000, + }, + { + count: 1, + size: 100_000_000, + }, + { + count: 1, + size: 1_000_000_000, + }, + } + for _, t := range tests { + b.Run(fmt.Sprintf("Count=%d/Size=%d", t.count, t.size), func(b *testing.B) { + chunk := []byte(rand2.String(t.size)) + benchmarkChunkingGzip(b, t.count, chunk) }) } } From 64d5177d5b61f0718114992336b525f8fd97433d Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 17 Feb 2025 16:50:34 +0100 Subject: [PATCH 3/8] UPSTREAM: 130220: Add tests for encoding collections in JSON for KEP-5116 Used test cases from: * Original PR https://github.com/kubernetes/kubernetes/pull/129334 * KEP https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/5116-streaming-response-encoding#unit-tests For now testing current serializer implementation to show encoder behavior and agree on set of tests. Having a separate PR should make review easier. In separate PR will add the implementation for streaming that should provide same response byte-to-byte. --- .../serializer/json/collections_test.go | 655 ++++++++++++++++++ 1 file changed, 655 insertions(+) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go new file mode 100644 index 0000000000000..6d8f34962a2e3 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go @@ -0,0 +1,655 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "bytes" + "testing" + + "github.com/google/go-cmp/cmp" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestCollectionsEncoding(t *testing.T) { + t.Run("Normal", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{})) + }) + // Leave place for testing streaming collection serializer proposed as part of KEP-5116 +} + +// testCollectionsEncoding should provide comprehensive tests to validate streaming implementation of encoder. +func testCollectionsEncoding(t *testing.T, s *Serializer) { + var buf bytes.Buffer + var remainingItems int64 = 1 + // As defined in KEP-5116 we it should include the following scenarios: + // Context: https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/5116-streaming-response-encoding#unit-tests + for _, tc := range []struct { + name string + in runtime.Object + expect string + }{ + // Preserving the distinction between integers and floating-point numbers + { + name: "Struct with floats", + in: &StructWithFloatsList{ + Items: []StructWithFloats{ + { + Int: 1, + Float32: float32(1), + Float64: 1.1, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"Int\":1,\"Float32\":1,\"Float64\":1.1}]}\n", + }, + { + name: "Unstructured object float", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "int": 1, + "float32": float32(1), + "float64": 1.1, + }, + }, + expect: "{\"float32\":1,\"float64\":1.1,\"int\":1,\"items\":[]}\n", + }, + { + name: "Unstructured items float", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "int": 1, + "float32": float32(1), + "float64": 1.1, + }, + }, + }, + }, + expect: "{\"items\":[{\"float32\":1,\"float64\":1.1,\"int\":1}]}\n", + }, + // Handling structs with duplicate field names (JSON tag names) without producing duplicate keys in the encoded output + { + name: "StructWithDuplicatedTags", + in: &StructWithDuplicatedTagsList{ + Items: []StructWithDuplicatedTags{ + { + Key1: "key1", + Key2: "key2", + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null}}]}\n", + }, + // Encoding Go strings containing invalid UTF-8 sequences without error + { + name: "UnstructuredList object invalid UTF-8 ", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "key": "\x80", // first byte is a continuation byte + }, + }, + expect: "{\"items\":[],\"key\":\"\\ufffd\"}\n", + }, + { + name: "UnstructuredList items invalid UTF-8 ", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "key": "\x80", // first byte is a continuation byte + }, + }, + }, + }, + expect: "{\"items\":[{\"key\":\"\\ufffd\"}]}\n", + }, + // Preserving the distinction between absent, present-but-null, and present-and-empty states for slices and maps + { + name: "CarpList items nil", + in: &testapigroupv1.CarpList{ + Items: nil, + }, + expect: "{\"metadata\":{},\"items\":null}\n", + }, + { + name: "CarpList slice nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: nil, + }, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "CarpList map nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: nil, + }, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "UnstructuredList items nil", + in: &unstructured.UnstructuredList{ + Items: nil, + }, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList items slice nil", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": ([]string)(nil), + }, + }, + }, + }, + expect: "{\"items\":[{\"slice\":null}]}\n", + }, + { + name: "UnstructuredList items map nil", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "map": (map[string]string)(nil), + }, + }, + }, + }, + expect: "{\"items\":[{\"map\":null}]}\n", + }, + { + name: "UnstructuredList object nil", + in: &unstructured.UnstructuredList{ + Object: nil, + }, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList object slice nil", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": ([]string)(nil), + }, + }, + expect: "{\"items\":[],\"slice\":null}\n", + }, + { + name: "UnstructuredList object map nil", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "map": (map[string]string)(nil), + }, + }, + expect: "{\"items\":[],\"map\":null}\n", + }, + { + name: "CarpList items empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{}, + }, + expect: "{\"metadata\":{},\"items\":[]}\n", + }, + { + name: "CarpList slice empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: []testapigroupv1.CarpCondition{}, + }, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "CarpList map empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: map[string]string{}, + }, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "UnstructuredList items empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{}, + }, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList items slice empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": []string{}, + }, + }, + }, + }, + expect: "{\"items\":[{\"slice\":[]}]}\n", + }, + { + name: "UnstructuredList items map empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "map": map[string]string{}, + }, + }, + }, + }, + expect: "{\"items\":[{\"map\":{}}]}\n", + }, + { + name: "UnstructuredList object empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{}, + }, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList object slice empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": []string{}, + }, + }, + expect: "{\"items\":[],\"slice\":[]}\n", + }, + { + name: "UnstructuredList object map empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "map": map[string]string{}, + }, + }, + expect: "{\"items\":[],\"map\":{}}\n", + }, + // Handling structs implementing MarshallJSON method, especially built-in collection types. + { + name: "List with MarshallJSON", + in: &ListWithMarshalJSONList{}, + expect: "\"marshallJSON\"\n", + }, + { + name: "Struct with MarshallJSON", + in: &StructWithMarshalJSONList{ + Items: []StructWithMarshalJSON{ + {}, + }, + }, + expect: "{\"metadata\":{},\"items\":[\"marshallJSON\"]}\n", + }, + // Handling raw bytes. + { + name: "Struct with raw bytes", + in: &StructWithRawBytesList{ + Items: []StructWithRawBytes{ + { + Slice: []byte{0x01, 0x02, 0x03}, + Array: [3]byte{0x01, 0x02, 0x03}, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"Slice\":\"AQID\",\"Array\":[1,2,3]}]}\n", + }, + { + name: "UnstructuredList object raw bytes", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": []byte{0x01, 0x02, 0x03}, + "array": [3]byte{0x01, 0x02, 0x03}, + }, + }, + expect: "{\"array\":[1,2,3],\"items\":[],\"slice\":\"AQID\"}\n", + }, + { + name: "UnstructuredList items raw bytes", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": []byte{0x01, 0x02, 0x03}, + "array": [3]byte{0x01, 0x02, 0x03}, + }, + }, + }, + }, + expect: "{\"items\":[{\"array\":[1,2,3],\"slice\":\"AQID\"}]}\n", + }, + // Other scenarios: + { + name: "List just kind", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + }, + }, + expect: "{\"kind\":\"List\",\"metadata\":{},\"items\":null}\n", + }, + { + name: "List just apiVersion", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + }, + }, + expect: "{\"apiVersion\":\"v1\",\"metadata\":{},\"items\":null}\n", + }, + { + name: "List no elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\"},\"items\":[]}\n", + }, + { + name: "List one element with continue", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + Continue: "abc", + RemainingItemCount: &remainingItems, + }, + Items: []testapigroupv1.Carp{ + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }}, + }, + }, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\",\"continue\":\"abc\",\"remainingItemCount\":1},\"items\":[{\"kind\":\"Carp\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"pod\",\"namespace\":\"default\",\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "List two elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{ + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }}, + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default2", + }}, + }, + }, + expect: `{"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"2345"},"items":[{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod","namespace":"default","creationTimestamp":null},"spec":{},"status":{}},{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod2","namespace":"default2","creationTimestamp":null},"spec":{},"status":{}}]} +`, + }, + { + name: "UnstructuredList empty", + in: &unstructured.UnstructuredList{}, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList just kind", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List"}, + }, + expect: "{\"items\":[],\"kind\":\"List\"}\n", + }, + { + name: "UnstructuredList just apiVersion", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "v1"}, + }, + expect: "{\"apiVersion\":\"v1\",\"items\":[]}\n", + }, + { + name: "UnstructuredList no elements", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{"resourceVersion": "2345"}}, + Items: []unstructured.Unstructured{}, + }, + expect: "{\"apiVersion\":\"v1\",\"items\":[],\"kind\":\"List\",\"metadata\":{\"resourceVersion\":\"2345\"}}\n", + }, + { + name: "UnstructuredList one element with continue", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ + "resourceVersion": "2345", + "continue": "abc", + "remainingItemCount": "1", + }}, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod", + "namespace": "default", + }, + }, + }, + }, + }, + expect: "{\"apiVersion\":\"v1\",\"items\":[{\"apiVersion\":\"v1\",\"kind\":\"Carp\",\"metadata\":{\"name\":\"pod\",\"namespace\":\"default\"}}],\"kind\":\"List\",\"metadata\":{\"continue\":\"abc\",\"remainingItemCount\":\"1\",\"resourceVersion\":\"2345\"}}\n", + }, + { + name: "UnstructuredList two elements", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ + "resourceVersion": "2345", + }}, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod", + "namespace": "default", + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod2", + "namespace": "default", + }, + }, + }, + }, + }, + expect: "{\"apiVersion\":\"v1\",\"items\":[{\"apiVersion\":\"v1\",\"kind\":\"Carp\",\"metadata\":{\"name\":\"pod\",\"namespace\":\"default\"}},{\"apiVersion\":\"v1\",\"kind\":\"Carp\",\"metadata\":{\"name\":\"pod2\",\"namespace\":\"default\"}}],\"kind\":\"List\",\"metadata\":{\"resourceVersion\":\"2345\"}}\n", + }, + { + name: "UnstructuredList conflict on items", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"items": []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "name": "pod", + }, + }, + }, + }, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "name": "pod2", + }, + }, + }, + }, + expect: "{\"items\":[{\"name\":\"pod2\"}]}\n", + }, + } { + t.Run(tc.name, func(t *testing.T) { + buf.Reset() + if err := s.Encode(tc.in, &buf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Logf("normal: %s", buf.String()) + if diff := cmp.Diff(buf.String(), tc.expect); diff != "" { + t.Errorf("not matching:\n%s", diff) + } + }) + } +} + +type StructWithFloatsList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithFloats `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *StructWithFloatsList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithFloats struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + Int int + Float32 float32 + Float64 float64 +} + +func (s *StructWithFloats) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithDuplicatedTagsList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithDuplicatedTags `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *StructWithDuplicatedTagsList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithDuplicatedTags struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + Key1 string `json:"key"` + Key2 string `json:"key"` //nolint:govet +} + +func (s *StructWithDuplicatedTags) DeepCopyObject() runtime.Object { + return nil +} + +type ListWithMarshalJSONList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []string `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *ListWithMarshalJSONList) DeepCopyObject() runtime.Object { + return nil +} + +func (l *ListWithMarshalJSONList) MarshalJSON() ([]byte, error) { + return []byte(`"marshallJSON"`), nil +} + +type StructWithMarshalJSONList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithMarshalJSON `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (s *StructWithMarshalJSONList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithMarshalJSON struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` +} + +func (l *StructWithMarshalJSON) DeepCopyObject() runtime.Object { + return nil +} + +func (l *StructWithMarshalJSON) MarshalJSON() ([]byte, error) { + return []byte(`"marshallJSON"`), nil +} + +type StructWithRawBytesList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithRawBytes `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (s *StructWithRawBytesList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithRawBytes struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Slice []byte + Array [3]byte +} + +func (s *StructWithRawBytes) DeepCopyObject() runtime.Object { + return nil +} From b00d3d6fbd9dc5dae0ed664c6a1e821f25c8947b Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 19 Dec 2024 10:38:30 +0100 Subject: [PATCH 4/8] UPSTREAM: 129334: Streaming JSON encoder for List --- pkg/features/kube_features.go | 4 + pkg/features/versioned_kube_features.go | 4 + .../core/rest/storage_core_generic.go | 11 + .../pkg/apiserver/customresource_handler.go | 16 +- .../k8s.io/apimachinery/pkg/api/meta/help.go | 3 + .../pkg/runtime/serializer/codec_factory.go | 12 +- .../runtime/serializer/json/collections.go | 231 ++++++++++++++++++ .../serializer/json/collections_test.go | 161 +++++++++++- .../pkg/runtime/serializer/json/json.go | 16 +- .../handlers/responsewriters/writers_test.go | 80 ++++++ .../apiserver/pkg/features/kube_features.go | 8 + .../apiserver/pkg/server/genericapiserver.go | 9 + 12 files changed, 536 insertions(+), 19 deletions(-) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 44ab162a4b29c..d34db525fb1d8 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -817,6 +817,10 @@ const ( // Enables support for the StorageVersionMigrator controller. StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator" + // owner: @serathius + // Allow API server to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + // owner: @robscott // kep: https://kep.k8s.io/2433 // alpha: v1.21 diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 7ef53861822d9..06915cd808243 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -17,6 +17,7 @@ limitations under the License. package features import ( + "k8s.io/apimachinery/pkg/util/version" "k8s.io/component-base/featuregate" ) @@ -31,4 +32,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate // genericfeatures.EmulationVersion: { // {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha}, // }, + StreamingCollectionEncodingToJSON: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, } diff --git a/pkg/registry/core/rest/storage_core_generic.go b/pkg/registry/core/rest/storage_core_generic.go index 193b5b98f473e..364d4679c1c3f 100644 --- a/pkg/registry/core/rest/storage_core_generic.go +++ b/pkg/registry/core/rest/storage_core_generic.go @@ -33,6 +33,9 @@ import ( "k8s.io/client-go/informers" restclient "k8s.io/client-go/rest" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" configmapstore "k8s.io/kubernetes/pkg/registry/core/configmap/storage" @@ -69,6 +72,14 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API NegotiatedSerializer: legacyscheme.Codecs, } + opts := []serializer.CodecFactoryOptionsMutator{} + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } + if len(opts) != 0 { + apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...) + } + eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) if err != nil { return genericapiserver.APIGroupInfo{}, err diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index 1b549610aa700..0eafff556d173 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -69,8 +69,10 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/generic" genericfilters "k8s.io/apiserver/pkg/server/filters" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/webhook" "k8s.io/apiserver/pkg/warning" "k8s.io/client-go/scale" @@ -848,6 +850,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped // CRDs explicitly do not support protobuf, but some objects returned by the API server do + streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) negotiatedSerializer := unstructuredNegotiatedSerializer{ typer: typer, creator: creator, @@ -861,10 +864,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd MediaTypeType: "application", MediaTypeSubType: "json", EncodesAsText: true, - Serializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, false), - PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, true), + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}), + PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}), StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{ - Strict: true, + Strict: true, + StreamingCollectionsEncoding: streamingCollections, }), StreamSerializer: &runtime.StreamSerializerInfo{ EncodesAsText: true, @@ -958,7 +962,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd scaleScope := *requestScopes[v.Name] scaleConverter := scale.NewScaleConverter() scaleScope.Subresource = "scale" - scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme()) + var opts []serializer.CodecFactoryOptionsMutator + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } + scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...) scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale") scaleScope.Namer = handlers.ContextBasedNaming{ Namer: meta.NewAccessor(), diff --git a/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go b/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go index 1fdd32c4ba3e0..468afd0e9ee09 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go @@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) { if err != nil { return nil, err } + if items.IsNil() { + return nil, nil + } list := make([]runtime.Object, items.Len()) if len(list) == 0 { return list, nil diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go index ff98208420465..363ee2283b116 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go @@ -52,7 +52,7 @@ type serializerType struct { func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []serializerType { jsonSerializer := json.NewSerializerWithOptions( mf, scheme, scheme, - json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict}, + json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON}, ) jsonSerializerType := serializerType{ AcceptContentTypes: []string{runtime.ContentTypeJSON}, @@ -73,7 +73,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option strictJSONSerializer := json.NewSerializerWithOptions( mf, scheme, scheme, - json.SerializerOptions{Yaml: false, Pretty: false, Strict: true}, + json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON}, ) jsonSerializerType.StrictSerializer = strictJSONSerializer @@ -136,6 +136,8 @@ type CodecFactoryOptions struct { Strict bool // Pretty includes a pretty serializer along with the non-pretty one Pretty bool + + StreamingCollectionsEncodingToJSON bool } // CodecFactoryOptionsMutator takes a pointer to an options struct and then modifies it. @@ -162,6 +164,12 @@ func DisableStrict(options *CodecFactoryOptions) { options.Strict = false } +func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator { + return func(options *CodecFactoryOptions) { + options.StreamingCollectionsEncodingToJSON = true + } +} + // NewCodecFactory provides methods for retrieving serializers for the supported wire formats // and conversion wrappers to define preferred internal and external versions. In the future, // as the internal version is used less, callers may instead use a defaulting serializer and diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go new file mode 100644 index 0000000000000..63b4e5ccb5f57 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go @@ -0,0 +1,231 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "encoding/json" + "fmt" + "io" + "sort" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/conversion" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) { + list, ok := obj.(*unstructured.UnstructuredList) + if ok { + return true, streamingEncodeUnstructuredList(w, list) + } + if _, ok := obj.(json.Marshaler); ok { + return false, nil + } + typeMeta, listMeta, items, err := getListMeta(obj) + if err == nil { + return true, streamingEncodeList(w, typeMeta, listMeta, items) + } + return false, nil +} + +// getListMeta implements list extraction logic for json stream serialization. +// +// Reason for a custom logic instead of reusing accessors from meta package: +// * Validate json tags to prevent incompatibility with json standard package. +// * ListMetaAccessor doesn't distinguish empty from nil value. +// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}" +func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) { + listValue, err := conversion.EnforcePtr(list) + if err != nil { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err + } + listType := listValue.Type() + if listType.NumField() != 3 { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields") + } + // TypeMeta + typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta) + if !ok { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type") + } + if listType.Field(0).Tag.Get("json") != ",inline" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`) + } + // ListMeta + listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta) + if !ok { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type") + } + if listType.Field(1).Tag.Get("json") != "metadata,omitempty" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`) + } + // Items + items, err := meta.ExtractList(list) + if err != nil { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err + } + if listType.Field(2).Tag.Get("json") != "items" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`) + } + return typeMeta, listMeta, items, nil +} + +func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error { + // Start + if _, err := w.Write([]byte(`{`)); err != nil { + return err + } + + // TypeMeta + if typeMeta.Kind != "" { + if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil { + return err + } + } + if typeMeta.APIVersion != "" { + if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil { + return err + } + } + + // ListMeta + if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil { + return err + } + + // Items + if err := encodeItemsObjectSlice(w, items); err != nil { + return err + } + + // End + _, err := w.Write([]byte("}\n")) + return err +} + +func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) { + if items == nil { + err := encodeKeyValuePair(w, "items", nil, nil) + return err + } + _, err = w.Write([]byte(`"items":[`)) + if err != nil { + return err + } + suffix := []byte(",") + for i, item := range items { + if i == len(items)-1 { + suffix = nil + } + err := encodeValue(w, item, suffix) + if err != nil { + return err + } + } + _, err = w.Write([]byte("]")) + if err != nil { + return err + } + return err +} + +func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error { + _, err := w.Write([]byte(`{`)) + if err != nil { + return err + } + keys := make([]string, 0, len(list.Object)) + for k := range list.Object { + keys = append(keys, k) + } + if _, exists := list.Object["items"]; !exists { + keys = append(keys, "items") + } + sort.Strings(keys) + + suffix := []byte(",") + for i, key := range keys { + if i == len(keys)-1 { + suffix = nil + } + if key == "items" { + err = encodeItemsUnstructuredSlice(w, list.Items, suffix) + } else { + err = encodeKeyValuePair(w, key, list.Object[key], suffix) + } + if err != nil { + return err + } + } + _, err = w.Write([]byte("}\n")) + return err +} + +func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) { + _, err = w.Write([]byte(`"items":[`)) + if err != nil { + return err + } + comma := []byte(",") + for i, item := range items { + if i == len(items)-1 { + comma = nil + } + err := encodeValue(w, item.Object, comma) + if err != nil { + return err + } + } + _, err = w.Write([]byte("]")) + if err != nil { + return err + } + if len(suffix) > 0 { + _, err = w.Write(suffix) + } + return err +} + +func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) { + err = encodeValue(w, key, []byte(":")) + if err != nil { + return err + } + err = encodeValue(w, value, suffix) + if err != nil { + return err + } + return err +} + +func encodeValue(w io.Writer, value any, suffix []byte) error { + data, err := json.Marshal(value) + if err != nil { + return err + } + _, err = w.Write(data) + if err != nil { + return err + } + if len(suffix) > 0 { + _, err = w.Write(suffix) + } + return err +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go index 6d8f34962a2e3..03ec4d88f2236 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go @@ -18,9 +18,11 @@ package json import ( "bytes" + "fmt" "testing" "github.com/google/go-cmp/cmp" + fuzz "github.com/google/gofuzz" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -30,21 +32,24 @@ import ( func TestCollectionsEncoding(t *testing.T) { t.Run("Normal", func(t *testing.T) { - testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{})) + testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{}), false) + }) + t.Run("Streaming", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true) }) - // Leave place for testing streaming collection serializer proposed as part of KEP-5116 } // testCollectionsEncoding should provide comprehensive tests to validate streaming implementation of encoder. -func testCollectionsEncoding(t *testing.T, s *Serializer) { - var buf bytes.Buffer +func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) { + var buf writeCountingBuffer var remainingItems int64 = 1 // As defined in KEP-5116 we it should include the following scenarios: // Context: https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/5116-streaming-response-encoding#unit-tests for _, tc := range []struct { - name string - in runtime.Object - expect string + name string + in runtime.Object + cannotStream bool + expect string }{ // Preserving the distinction between integers and floating-point numbers { @@ -307,9 +312,10 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { }, // Handling structs implementing MarshallJSON method, especially built-in collection types. { - name: "List with MarshallJSON", - in: &ListWithMarshalJSONList{}, - expect: "\"marshallJSON\"\n", + name: "List with MarshallJSON cannot be streamed", + in: &ListWithMarshalJSONList{}, + expect: "\"marshallJSON\"\n", + cannotStream: true, }, { name: "Struct with MarshallJSON", @@ -435,6 +441,32 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { expect: `{"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"2345"},"items":[{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod","namespace":"default","creationTimestamp":null},"spec":{},"status":{}},{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod2","namespace":"default2","creationTimestamp":null},"spec":{},"status":{}}]} `, }, + { + name: "List with extra field cannot be streamed", + in: &ListWithAdditionalFields{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + cannotStream: true, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\"},\"items\":[],\"AdditionalField\":0}\n", + }, + { + name: "Not a collection cannot be streamed", + in: &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + }, + cannotStream: true, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}\n", + }, { name: "UnstructuredList empty", in: &unstructured.UnstructuredList{}, @@ -543,10 +575,17 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { if err := s.Encode(tc.in, &buf); err != nil { t.Fatalf("unexpected error: %v", err) } - t.Logf("normal: %s", buf.String()) + t.Logf("encoded: %s", buf.String()) if diff := cmp.Diff(buf.String(), tc.expect); diff != "" { t.Errorf("not matching:\n%s", diff) } + expectStreaming := !tc.cannotStream && streamingEnabled + if expectStreaming && buf.writeCount <= 1 { + t.Errorf("expected streaming but Write was called only: %d", buf.writeCount) + } + if !expectStreaming && buf.writeCount > 1 { + t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount) + } }) } } @@ -653,3 +692,103 @@ type StructWithRawBytes struct { func (s *StructWithRawBytes) DeepCopyObject() runtime.Object { return nil } + +type ListWithAdditionalFields struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []testapigroupv1.Carp `json:"items" protobuf:"bytes,2,rep,name=items"` + AdditionalField int +} + +func (s *ListWithAdditionalFields) DeepCopyObject() runtime.Object { + return nil +} + +type writeCountingBuffer struct { + writeCount int + bytes.Buffer +} + +func (b *writeCountingBuffer) Write(data []byte) (int, error) { + b.writeCount++ + return b.Buffer.Write(data) +} + +func (b *writeCountingBuffer) Reset() { + b.writeCount = 0 + b.Buffer.Reset() +} + +func TestFuzzCollectionsEncoding(t *testing.T) { + disableFuzzFieldsV1 := func(field *metav1.FieldsV1, c fuzz.Continue) {} + fuzzUnstructuredList := func(list *unstructured.UnstructuredList, c fuzz.Continue) { + list.Object = map[string]interface{}{ + "kind": "List", + "apiVersion": "v1", + c.RandString(): c.RandString(), + c.RandString(): c.RandUint64(), + c.RandString(): c.RandBool(), + "metadata": map[string]interface{}{ + "resourceVersion": fmt.Sprintf("%d", c.RandUint64()), + "continue": c.RandString(), + "remainingItemCount": fmt.Sprintf("%d", c.RandUint64()), + c.RandString(): c.RandString(), + }} + c.Fuzz(&list.Items) + } + fuzzMap := func(kvs map[string]interface{}, c fuzz.Continue) { + kvs[c.RandString()] = c.RandBool() + kvs[c.RandString()] = c.RandUint64() + kvs[c.RandString()] = c.RandString() + } + f := fuzz.New().Funcs(disableFuzzFieldsV1, fuzzUnstructuredList, fuzzMap) + streamingBuffer := &bytes.Buffer{} + normalSerializer := NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: false}) + normalBuffer := &bytes.Buffer{} + t.Run("CarpList", func(t *testing.T) { + for i := 0; i < 1000; i++ { + list := &testapigroupv1.CarpList{} + f.Fuzz(list) + streamingBuffer.Reset() + normalBuffer.Reset() + ok, err := streamEncodeCollections(list, streamingBuffer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + if err := normalSerializer.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Errorf("not matching:\n%s", diff) + } + } + }) + t.Run("UnstructuredList", func(t *testing.T) { + for i := 0; i < 1000; i++ { + list := &unstructured.UnstructuredList{} + f.Fuzz(list) + streamingBuffer.Reset() + normalBuffer.Reset() + ok, err := streamEncodeCollections(list, streamingBuffer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + if err := normalSerializer.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Errorf("not matching:\n%s", diff) + } + } + }) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go index 1ae4a32eb720c..24f66a10174b0 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go @@ -36,7 +36,7 @@ import ( // is not nil, the object has the group, version, and kind fields set. // Deprecated: use NewSerializerWithOptions instead. func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper, pretty bool) *Serializer { - return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false}) + return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false, false}) } // NewYAMLSerializer creates a YAML serializer that handles encoding versioned objects into the proper YAML form. If typer @@ -44,7 +44,7 @@ func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtim // matches JSON, and will error if constructs are used that do not serialize to JSON. // Deprecated: use NewSerializerWithOptions instead. func NewYAMLSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer { - return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false}) + return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false, false}) } // NewSerializerWithOptions creates a JSON/YAML serializer that handles encoding versioned objects into the proper JSON/YAML @@ -93,6 +93,9 @@ type SerializerOptions struct { // Strict: configures the Serializer to return strictDecodingError's when duplicate fields are present decoding JSON or YAML. // Note that enabling this option is not as performant as the non-strict variant, and should not be used in fast paths. Strict bool + + // StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed. + StreamingCollectionsEncoding bool } // Serializer handles encoding versioned objects into the proper JSON form @@ -242,6 +245,15 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { _, err = w.Write(data) return err } + if s.options.StreamingCollectionsEncoding { + ok, err := streamEncodeCollections(obj, w) + if err != nil { + return err + } + if ok { + return nil + } + } encoder := json.NewEncoder(w) return encoder.Encode(obj) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 195b161fe0a05..979014b37d43b 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -39,8 +39,11 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json" rand2 "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" @@ -804,3 +807,80 @@ func gzipContent(data []byte, level int) []byte { } return buf.Bytes() } + +func TestStreamingGzipIntegration(t *testing.T) { + largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1) + tcs := []struct { + name string + serializer runtime.Encoder + object runtime.Object + expectGzip bool + expectStreaming bool + }{ + { + name: "JSON, small object, default -> no gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: false, + }, + { + name: "JSON, small object, streaming -> no gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: true, + }, + { + name: "JSON, large object, default -> gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: false, + }, + { + name: "JSON, large object, streaming -> gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: true, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + mockResponseWriter := httptest.NewRecorder() + drw := &deferredResponseWriter{ + mediaType: "text/plain", + statusCode: 200, + contentEncoding: "gzip", + hw: mockResponseWriter, + ctx: context.Background(), + } + counter := &writeCounter{Writer: drw} + err := tc.serializer.Encode(tc.object, counter) + if err != nil { + t.Fatal(err) + } + encoding := mockResponseWriter.Header().Get("Content-Encoding") + if (encoding == "gzip") != tc.expectGzip { + t.Errorf("Expect gzip: %v, got: %q", tc.expectGzip, encoding) + } + if counter.writeCount < 1 { + t.Fatalf("Expect at least 1 write") + } + if (counter.writeCount > 1) != tc.expectStreaming { + t.Errorf("Expect streaming: %v, got write count: %d", tc.expectStreaming, counter.writeCount) + } + }) + } +} + +type writeCounter struct { + writeCount int + io.Writer +} + +func (b *writeCounter) Write(data []byte) (int, error) { + b.writeCount++ + return b.Writer.Write(data) +} diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 4f51aef76852b..d63bb96c65170 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -18,6 +18,7 @@ package features import ( "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/version" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/featuregate" ) @@ -265,6 +266,10 @@ const ( // document. StorageVersionHash featuregate.Feature = "StorageVersionHash" + // owner: @serathius + // Allow API server to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + // owner: @aramase, @enj, @nabokihms // kep: https://kep.k8s.io/3331 // alpha: v1.29 @@ -345,6 +350,9 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate // EmulationVersion: { // {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha}, // }, + StreamingCollectionEncodingToJSON: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, } // defaultKubernetesFeatureGates consists of all known Kubernetes-specific feature keys. diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 27a3a110520b2..9047240f104f2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -27,6 +27,7 @@ import ( "time" systemd "github.com/coreos/go-systemd/v22/daemon" + utilfeature "k8s.io/apiserver/pkg/util/feature" "golang.org/x/time/rate" apidiscoveryv2 "k8s.io/api/apidiscovery/v2" @@ -1021,6 +1022,14 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV // NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values // exposed for easier composition from other packages func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo { + opts := []serializer.CodecFactoryOptionsMutator{} + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } + if len(opts) != 0 { + codecs = serializer.NewCodecFactory(scheme, opts...) + } + return APIGroupInfo{ PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group), VersionedResourcesStorageMap: map[string]map[string]rest.Storage{}, From dbe6d7fd0cf3f1e818b4993e668d98cb0ca699d5 Mon Sep 17 00:00:00 2001 From: z1cheng Date: Sat, 1 Mar 2025 13:42:32 +0800 Subject: [PATCH 5/8] UPSTREAM: 130511: Implement tests for encoding collections in Proto Signed-off-by: z1cheng --- .../serializer/protobuf/collections_test.go | 228 ++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go new file mode 100644 index 0000000000000..fa10e290931ad --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go @@ -0,0 +1,228 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf + +import ( + "bytes" + "encoding/base64" + "io" + "os/exec" + "testing" + + "github.com/google/go-cmp/cmp" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestCollectionsEncoding(t *testing.T) { + t.Run("Normal", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializer(nil, nil)) + }) + // Leave place for testing streaming collection serializer proposed as part of KEP-5116 +} + +func testCollectionsEncoding(t *testing.T, s *Serializer) { + var remainingItems int64 = 1 + testCases := []struct { + name string + in runtime.Object + // expect is base64 encoded protobuf bytes + expect string + }{ + { + name: "CarpList items nil", + in: &testapigroupv1.CarpList{ + Items: nil, + }, + expect: "azhzAAoECgASABIICgYKABIAGgAaACIA", + }, + { + name: "CarpList slice nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: nil, + }, + }, + }, + }, + expect: "azhzAAoECgASABJBCgYKABIAGgASNwoQCgASABoAIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + { + name: "CarpList map nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: nil, + }, + }, + }, + }, + expect: "azhzAAoECgASABJBCgYKABIAGgASNwoQCgASABoAIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + { + name: "CarpList items empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{}, + }, + expect: "azhzAAoECgASABIICgYKABIAGgAaACIA", + }, + { + name: "CarpList slice empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: []testapigroupv1.CarpCondition{}, + }, + }, + }, + }, + expect: "azhzAAoECgASABJBCgYKABIAGgASNwoQCgASABoAIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + { + name: "CarpList map empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: map[string]string{}, + }, + }, + }, + }, + expect: "azhzAAoECgASABJBCgYKABIAGgASNwoQCgASABoAIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + { + name: "List just kind", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + }, + }, + expect: "azhzAAoICgASBExpc3QSCAoGCgASABoAGgAiAA==", + }, + { + name: "List just apiVersion", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + }, + }, + expect: "azhzAAoGCgJ2MRIAEggKBgoAEgAaABoAIgA=", + }, + { + name: "List no elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + expect: "azhzAAoKCgJ2MRIETGlzdBIMCgoKABIEMjM0NRoAGgAiAA==", + }, + { + name: "List one element with continue", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + Continue: "abc", + RemainingItemCount: &remainingItems, + }, + Items: []testapigroupv1.Carp{ + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }}, + }, + }, + expect: "azhzAAoKCgJ2MRIETGlzdBJUCg8KABIEMjM0NRoDYWJjIAESQQoaCgNwb2QSABoHZGVmYXVsdCIAKgAyADgAQgASFxoAQgBKAFIAWABgAGgAggEAigEAmgEAGgoKABoAIgAqADIAGgAiAA==", + }, + { + name: "List two elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{ + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }}, + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default2", + }}, + }, + }, + expect: "azhzAAoKCgJ2MRIETGlzdBKUAQoKCgASBDIzNDUaABJBChoKA3BvZBIAGgdkZWZhdWx0IgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgASQwocCgRwb2QyEgAaCGRlZmF1bHQyIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var buf bytes.Buffer + if err := s.Encode(tc.in, &buf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + actualBytes := buf.Bytes() + expectBytes, err := io.ReadAll(base64.NewDecoder(base64.StdEncoding, bytes.NewBufferString(tc.expect))) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(expectBytes, actualBytes) { + t.Errorf("expected:\n%s\ngot:\n%s", tc.expect, base64.StdEncoding.EncodeToString(actualBytes)) + t.Log(cmp.Diff(dumpProto(t, actualBytes[4:]), dumpProto(t, expectBytes[4:]))) + } + }) + } +} + +// dumpProto does a best-effort dump of the given proto bytes using protoc if it can be found in the path. +// This is only used when the test has already failed, to try to give more visibility into the diff of the failure. +func dumpProto(t *testing.T, data []byte) string { + t.Helper() + protoc, err := exec.LookPath("protoc") + if err != nil { + t.Logf("cannot find protoc in path to dump proto contents: %v", err) + return "" + } + cmd := exec.Command(protoc, "--decode_raw") + cmd.Stdin = bytes.NewBuffer(data) + d, err := cmd.CombinedOutput() + if err != nil { + t.Logf("protoc invocation failed: %v", err) + return "" + } + return string(d) +} From f346c26c0c95b57285798fce657144a8d2621676 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 19 Dec 2024 12:30:39 +0100 Subject: [PATCH 6/8] UPSTREAM: 129407: Implement streaming proto encoding --- pkg/features/kube_features.go | 6 +- pkg/features/versioned_kube_features.go | 4 + .../core/rest/storage_core_generic.go | 3 + .../pkg/apiserver/customresource_handler.go | 7 +- .../pkg/runtime/serializer/codec_factory.go | 13 +- .../serializer/protobuf/collections.go | 174 ++++++++++++++++++ .../serializer/protobuf/collections_test.go | 117 +++++++++++- .../runtime/serializer/protobuf/protobuf.go | 87 +++++++-- .../apimachinery/pkg/runtime/types_proto.go | 127 ++++++++++++- .../pkg/runtime/types_proto_test.go | 107 +++++++++++ .../handlers/responsewriters/writers.go | 6 +- .../handlers/responsewriters/writers_test.go | 29 +++ .../apiserver/pkg/features/kube_features.go | 10 +- .../apiserver/pkg/server/genericapiserver.go | 3 + 14 files changed, 661 insertions(+), 32 deletions(-) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index d34db525fb1d8..dbb2384b6a2b1 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -818,9 +818,13 @@ const ( StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator" // owner: @serathius - // Allow API server to encode collections item by item, instead of all at once. + // Allow API server JSON encoder to encode collections item by item, instead of all at once. StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + // owner: serathius + // Allow API server Protobuf encoder to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf" + // owner: @robscott // kep: https://kep.k8s.io/2433 // alpha: v1.21 diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 06915cd808243..517b652f39fc0 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -35,4 +35,8 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate StreamingCollectionEncodingToJSON: { {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, }, + + StreamingCollectionEncodingToProtobuf: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, } diff --git a/pkg/registry/core/rest/storage_core_generic.go b/pkg/registry/core/rest/storage_core_generic.go index 364d4679c1c3f..5e81e048d7f9e 100644 --- a/pkg/registry/core/rest/storage_core_generic.go +++ b/pkg/registry/core/rest/storage_core_generic.go @@ -76,6 +76,9 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } if len(opts) != 0 { apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index 0eafff556d173..e698c0c08d8cf 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -891,7 +891,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd MediaType: "application/vnd.kubernetes.protobuf", MediaTypeType: "application", MediaTypeSubType: "vnd.kubernetes.protobuf", - Serializer: protobuf.NewSerializer(creator, typer), + Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{ + StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf), + }), StreamSerializer: &runtime.StreamSerializerInfo{ Serializer: protobuf.NewRawSerializer(creator, typer), Framer: protobuf.LengthDelimitedFramer, @@ -966,6 +968,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...) scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale") scaleScope.Namer = handlers.ContextBasedNaming{ diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go index 363ee2283b116..805e9b68bf3d9 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go @@ -85,7 +85,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option mf, scheme, scheme, json.SerializerOptions{Yaml: true, Pretty: false, Strict: true}, ) - protoSerializer := protobuf.NewSerializer(scheme, scheme) + protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{ + StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf, + }) protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme) serializers := []serializerType{ @@ -137,7 +139,8 @@ type CodecFactoryOptions struct { // Pretty includes a pretty serializer along with the non-pretty one Pretty bool - StreamingCollectionsEncodingToJSON bool + StreamingCollectionsEncodingToJSON bool + StreamingCollectionsEncodingToProtobuf bool } // CodecFactoryOptionsMutator takes a pointer to an options struct and then modifies it. @@ -170,6 +173,12 @@ func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator { } } +func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator { + return func(options *CodecFactoryOptions) { + options.StreamingCollectionsEncodingToProtobuf = true + } +} + // NewCodecFactory provides methods for retrieving serializers for the supported wire formats // and conversion wrappers to define preferred internal and external versions. In the future, // as the internal version is used less, callers may instead use a defaulting serializer and diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go new file mode 100644 index 0000000000000..754a80820b06c --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go @@ -0,0 +1,174 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf + +import ( + "errors" + "io" + "math/bits" + + "github.com/gogo/protobuf/proto" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" +) + +var ( + errFieldCount = errors.New("expected ListType to have 3 fields") + errTypeMetaField = errors.New("expected TypeMeta field to have TypeMeta type") + errTypeMetaProtobufTag = errors.New(`expected TypeMeta protobuf field tag to be ""`) + errListMetaField = errors.New("expected ListMeta field to have ListMeta type") + errListMetaProtobufTag = errors.New(`expected ListMeta protobuf field tag to be "bytes,1,opt,name=metadata"`) + errItemsProtobufTag = errors.New(`expected Items protobuf field tag to be "bytes,2,rep,name=items"`) + errItemsSizer = errors.New(`expected Items elements to implement proto.Sizer`) +) + +// getStreamingListData implements list extraction logic for protobuf stream serialization. +// +// Reason for a custom logic instead of reusing accessors from meta package: +// * Validate proto tags to prevent incompatibility with proto standard package. +// * ListMetaAccessor doesn't distinguish empty from nil value. +// * TypeAccessor reparsing "apiVersion" and serializing it with "{group}/{version}" +func getStreamingListData(list runtime.Object) (data streamingListData, err error) { + listValue, err := conversion.EnforcePtr(list) + if err != nil { + return data, err + } + listType := listValue.Type() + if listType.NumField() != 3 { + return data, errFieldCount + } + // TypeMeta: validated, but not returned as is not serialized. + _, ok := listValue.Field(0).Interface().(metav1.TypeMeta) + if !ok { + return data, errTypeMetaField + } + if listType.Field(0).Tag.Get("protobuf") != "" { + return data, errTypeMetaProtobufTag + } + // ListMeta + listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta) + if !ok { + return data, errListMetaField + } + // if we were ever to relax the protobuf tag check we should update the hardcoded `0xa` below when writing ListMeta. + if listType.Field(1).Tag.Get("protobuf") != "bytes,1,opt,name=metadata" { + return data, errListMetaProtobufTag + } + data.listMeta = listMeta + // Items; if we were ever to relax the protobuf tag check we should update the hardcoded `0x12` below when writing Items. + if listType.Field(2).Tag.Get("protobuf") != "bytes,2,rep,name=items" { + return data, errItemsProtobufTag + } + items, err := meta.ExtractList(list) + if err != nil { + return data, err + } + data.items = items + data.totalSize, data.listMetaSize, data.itemsSizes, err = listSize(listMeta, items) + return data, err +} + +type streamingListData struct { + // totalSize is the total size of the serialized List object, including their proto headers/size bytes + totalSize int + + // listMetaSize caches results from .Size() call to listMeta, doesn't include header bytes (field identifier, size) + listMetaSize int + listMeta metav1.ListMeta + + // itemsSizes caches results from .Size() call to items, doesn't include header bytes (field identifier, size) + itemsSizes []int + items []runtime.Object +} + +// listSize return size of ListMeta and items to be later used for preallocations. +// listMetaSize and itemSizes do not include header bytes (field identifier, size). +func listSize(listMeta metav1.ListMeta, items []runtime.Object) (totalSize, listMetaSize int, itemSizes []int, err error) { + // ListMeta + listMetaSize = listMeta.Size() + totalSize += 1 + sovGenerated(uint64(listMetaSize)) + listMetaSize + // Items + itemSizes = make([]int, len(items)) + for i, item := range items { + sizer, ok := item.(proto.Sizer) + if !ok { + return totalSize, listMetaSize, nil, errItemsSizer + } + n := sizer.Size() + itemSizes[i] = n + totalSize += 1 + sovGenerated(uint64(n)) + n + } + return totalSize, listMetaSize, itemSizes, nil +} + +func streamingEncodeUnknownList(w io.Writer, unk runtime.Unknown, listData streamingListData, memAlloc runtime.MemoryAllocator) error { + _, err := w.Write(protoEncodingPrefix) + if err != nil { + return err + } + // encodeList is responsible for encoding the List into the unknown Raw. + encodeList := func(writer io.Writer) (int, error) { + return streamingEncodeList(writer, listData, memAlloc) + } + _, err = unk.MarshalToWriter(w, listData.totalSize, encodeList) + return err +} + +func streamingEncodeList(w io.Writer, listData streamingListData, memAlloc runtime.MemoryAllocator) (size int, err error) { + // ListMeta; 0xa = (1 << 3) | 2; field number: 1, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure + n, err := doEncodeWithHeader(&listData.listMeta, w, 0xa, listData.listMetaSize, memAlloc) + size += n + if err != nil { + return size, err + } + // Items; 0x12 = (2 << 3) | 2; field number: 2, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure + for i, item := range listData.items { + n, err := doEncodeWithHeader(item, w, 0x12, listData.itemsSizes[i], memAlloc) + size += n + if err != nil { + return size, err + } + } + return size, nil +} + +func writeVarintGenerated(w io.Writer, v int) (int, error) { + buf := make([]byte, sovGenerated(uint64(v))) + encodeVarintGenerated(buf, len(buf), uint64(v)) + return w.Write(buf) +} + +// sovGenerated is copied from `generated.pb.go` returns size of varint. +func sovGenerated(v uint64) int { + return (bits.Len64(v|1) + 6) / 7 +} + +// encodeVarintGenerated is copied from `generated.pb.go` encodes varint. +func encodeVarintGenerated(dAtA []byte, offset int, v uint64) int { + offset -= sovGenerated(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go index fa10e290931ad..c48d416b5dc53 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go @@ -23,21 +23,26 @@ import ( "os/exec" "testing" + "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" + "sigs.k8s.io/randfill" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" ) func TestCollectionsEncoding(t *testing.T) { t.Run("Normal", func(t *testing.T) { - testCollectionsEncoding(t, NewSerializer(nil, nil)) + testCollectionsEncoding(t, NewSerializer(nil, nil), false) + }) + t.Run("Streaming", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true) }) - // Leave place for testing streaming collection serializer proposed as part of KEP-5116 } -func testCollectionsEncoding(t *testing.T, s *Serializer) { +func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) { var remainingItems int64 = 1 testCases := []struct { name string @@ -191,7 +196,7 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - var buf bytes.Buffer + var buf writeCountingBuffer if err := s.Encode(tc.in, &buf); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -201,8 +206,25 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { t.Fatal(err) } if !bytes.Equal(expectBytes, actualBytes) { - t.Errorf("expected:\n%s\ngot:\n%s", tc.expect, base64.StdEncoding.EncodeToString(actualBytes)) - t.Log(cmp.Diff(dumpProto(t, actualBytes[4:]), dumpProto(t, expectBytes[4:]))) + expectedBytes, err := base64.StdEncoding.DecodeString(tc.expect) + if err == nil { + t.Errorf("expected:\n%v\ngot:\n%v", expectedBytes, actualBytes) + } else { + t.Errorf("expected:\n%v\ngot:\n%v", tc.expect, base64.StdEncoding.EncodeToString(actualBytes)) + } + actualProto := dumpProto(t, actualBytes[4:]) + expectedProto := dumpProto(t, expectBytes[4:]) + if actualProto != "" && expectedProto != "" { + t.Log(cmp.Diff(actualProto, expectedProto)) + } else { + t.Log(cmp.Diff(actualBytes, expectBytes)) + } + } + if streamingEnabled && buf.writeCount <= 1 { + t.Errorf("expected streaming but Write was called only: %d", buf.writeCount) + } + if !streamingEnabled && buf.writeCount > 1 { + t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount) } }) } @@ -226,3 +248,86 @@ func dumpProto(t *testing.T, data []byte) string { } return string(d) } + +type writeCountingBuffer struct { + writeCount int + bytes.Buffer +} + +func (b *writeCountingBuffer) Write(data []byte) (int, error) { + b.writeCount++ + return b.Buffer.Write(data) +} + +func (b *writeCountingBuffer) Reset() { + b.writeCount = 0 + b.Buffer.Reset() +} + +func TestFuzzCollection(t *testing.T) { + f := randfill.New() + streamingEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}) + streamingBuffer := &bytes.Buffer{} + normalEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: false}) + normalBuffer := &bytes.Buffer{} + for i := 0; i < 1000; i++ { + list := &testapigroupv1.CarpList{} + f.FillNoCustom(list) + streamingBuffer.Reset() + normalBuffer.Reset() + if err := streamingEncoder.Encode(list, streamingBuffer); err != nil { + t.Fatal(err) + } + if err := normalEncoder.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(streamingBuffer.String(), normalBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Fatalf("unexpected output:\n%s", diff) + } + } +} + +func TestCallsToSize(t *testing.T) { + counter := &countingSizer{data: []byte("abba")} + listMeta := metav1.ListMeta{} + listData := streamingListData{ + totalSize: 14, + listMeta: listMeta, + listMetaSize: listMeta.Size(), + itemsSizes: []int{counter.Size()}, + items: []runtime.Object{counter}, + } + err := streamingEncodeUnknownList(io.Discard, runtime.Unknown{}, listData, &runtime.Allocator{}) + if err != nil { + t.Fatal(err) + } + if counter.count != 1 { + t.Errorf("Expected only 1 call to sizer, got %d", counter.count) + } +} + +type countingSizer struct { + data []byte + count int +} + +var _ proto.Sizer = (*countingSizer)(nil) +var _ runtime.ProtobufMarshaller = (*countingSizer)(nil) + +func (s *countingSizer) MarshalTo(data []byte) (int, error) { + return copy(data, s.data), nil +} +func (s *countingSizer) Size() int { + s.count++ + return len(s.data) +} + +func (s *countingSizer) DeepCopyObject() runtime.Object { + return nil +} + +func (s *countingSizer) GetObjectKind() schema.ObjectKind { + return nil +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go index c63e6dc63f6bb..c66c49ac4c2d2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go @@ -72,10 +72,18 @@ func IsNotMarshalable(err error) bool { // is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written // as-is (any type info passed with the object will be used). func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer { + return NewSerializerWithOptions(creater, typer, SerializerOptions{}) +} + +// NewSerializerWithOptions creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer +// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written +// as-is (any type info passed with the object will be used). +func NewSerializerWithOptions(creater runtime.ObjectCreater, typer runtime.ObjectTyper, opts SerializerOptions) *Serializer { return &Serializer{ prefix: protoEncodingPrefix, creater: creater, typer: typer, + options: opts, } } @@ -84,6 +92,14 @@ type Serializer struct { prefix []byte creater runtime.ObjectCreater typer runtime.ObjectTyper + + options SerializerOptions +} + +// SerializerOptions holds the options which are used to configure a Proto serializer. +type SerializerOptions struct { + // StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed. + StreamingCollectionsEncoding bool } var _ runtime.Serializer = &Serializer{} @@ -209,6 +225,13 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime. }, } } + if s.options.StreamingCollectionsEncoding { + listData, err := getStreamingListData(obj) + if err == nil { + // Doesn't honor custom proto marshaling methods (like json streaming), because all proto objects implement proto methods. + return streamingEncodeUnknownList(w, unk, listData, memAlloc) + } + } switch t := obj.(type) { case bufferedMarshaller: @@ -428,6 +451,39 @@ func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime } func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + _, err := doEncode(obj, w, nil, memAlloc) + return err +} + +func doEncodeWithHeader(obj any, w io.Writer, field byte, precomputedSize int, memAlloc runtime.MemoryAllocator) (size int, err error) { + // Field identifier + n, err := w.Write([]byte{field}) + size += n + if err != nil { + return size, err + } + // Size + n, err = writeVarintGenerated(w, precomputedSize) + size += n + if err != nil { + return size, err + } + // Obj + n, err = doEncode(obj, w, &precomputedSize, memAlloc) + size += n + if err != nil { + return size, err + } + if n != precomputedSize { + return size, fmt.Errorf("the size value was %d, but doEncode wrote %d bytes to data", precomputedSize, n) + } + return size, nil +} + +// doEncode encodes provided object into writer using a allocator if possible. +// Avoids call by object Size if precomputedObjSize is provided. +// precomputedObjSize should not include header bytes (field identifier, size). +func doEncode(obj any, w io.Writer, precomputedObjSize *int, memAlloc runtime.MemoryAllocator) (int, error) { if memAlloc == nil { klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator") memAlloc = &runtime.SimpleAllocator{} @@ -436,40 +492,43 @@ func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runti case bufferedReverseMarshaller: // this path performs a single allocation during write only when the Allocator wasn't provided // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods - encodedSize := uint64(t.Size()) - data := memAlloc.Allocate(encodedSize) + if precomputedObjSize == nil { + s := t.Size() + precomputedObjSize = &s + } + data := memAlloc.Allocate(uint64(*precomputedObjSize)) n, err := t.MarshalToSizedBuffer(data) if err != nil { - return err + return 0, err } - _, err = w.Write(data[:n]) - return err + return w.Write(data[:n]) case bufferedMarshaller: // this path performs a single allocation during write only when the Allocator wasn't provided // it also requires the caller to implement the more efficient Size and MarshalTo methods - encodedSize := uint64(t.Size()) - data := memAlloc.Allocate(encodedSize) + if precomputedObjSize == nil { + s := t.Size() + precomputedObjSize = &s + } + data := memAlloc.Allocate(uint64(*precomputedObjSize)) n, err := t.MarshalTo(data) if err != nil { - return err + return 0, err } - _, err = w.Write(data[:n]) - return err + return w.Write(data[:n]) case proto.Marshaler: // this path performs extra allocations data, err := t.Marshal() if err != nil { - return err + return 0, err } - _, err = w.Write(data) - return err + return w.Write(data) default: - return errNotMarshalable{reflect.TypeOf(obj)} + return 0, errNotMarshalable{reflect.TypeOf(obj)} } } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go index a82227b239af5..27a2064c4163e 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go @@ -18,6 +18,7 @@ package runtime import ( "fmt" + "io" ) type ProtobufMarshaller interface { @@ -28,6 +29,124 @@ type ProtobufReverseMarshaller interface { MarshalToSizedBuffer(data []byte) (int, error) } +const ( + typeMetaTag = 0xa + rawTag = 0x12 + contentEncodingTag = 0x1a + contentTypeTag = 0x22 + + // max length of a varint for a uint64 + maxUint64VarIntLength = 10 +) + +// MarshalToWriter allows a caller to provide a streaming writer for raw bytes, +// instead of populating them inside the Unknown struct. +// rawSize is the number of bytes rawWriter will write in a success case. +// writeRaw is called when it is time to write the raw bytes. It must return `rawSize, nil` or an error. +func (m *Unknown) MarshalToWriter(w io.Writer, rawSize int, writeRaw func(io.Writer) (int, error)) (int, error) { + size := 0 + + // reuse the buffer for varint marshaling + varintBuffer := make([]byte, maxUint64VarIntLength) + writeVarint := func(i int) (int, error) { + offset := encodeVarintGenerated(varintBuffer, len(varintBuffer), uint64(i)) + return w.Write(varintBuffer[offset:]) + } + + // TypeMeta + { + n, err := w.Write([]byte{typeMetaTag}) + size += n + if err != nil { + return size, err + } + + typeMetaBytes, err := m.TypeMeta.Marshal() + if err != nil { + return size, err + } + + n, err = writeVarint(len(typeMetaBytes)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write(typeMetaBytes) + size += n + if err != nil { + return size, err + } + } + + // Raw, delegating write to writeRaw() + { + n, err := w.Write([]byte{rawTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(rawSize) + size += n + if err != nil { + return size, err + } + + n, err = writeRaw(w) + size += n + if err != nil { + return size, err + } + if n != int(rawSize) { + return size, fmt.Errorf("the size value was %d, but encoding wrote %d bytes to data", rawSize, n) + } + } + + // ContentEncoding + { + n, err := w.Write([]byte{contentEncodingTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(len(m.ContentEncoding)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write([]byte(m.ContentEncoding)) + size += n + if err != nil { + return size, err + } + } + + // ContentEncoding + { + n, err := w.Write([]byte{contentTypeTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(len(m.ContentType)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write([]byte(m.ContentType)) + size += n + if err != nil { + return size, err + } + } + return size, nil +} + // NestedMarshalTo allows a caller to avoid extra allocations during serialization of an Unknown // that will contain an object that implements ProtobufMarshaller or ProtobufReverseMarshaller. func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64) (int, error) { @@ -43,12 +162,12 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 copy(data[i:], m.ContentType) i = encodeVarintGenerated(data, i, uint64(len(m.ContentType))) i-- - data[i] = 0x22 + data[i] = contentTypeTag i -= len(m.ContentEncoding) copy(data[i:], m.ContentEncoding) i = encodeVarintGenerated(data, i, uint64(len(m.ContentEncoding))) i-- - data[i] = 0x1a + data[i] = contentEncodingTag if b != nil { if r, ok := b.(ProtobufReverseMarshaller); ok { n1, err := r.MarshalToSizedBuffer(data[:i]) @@ -75,7 +194,7 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 } i = encodeVarintGenerated(data, i, size) i-- - data[i] = 0x12 + data[i] = rawTag } n2, err := m.TypeMeta.MarshalToSizedBuffer(data[:i]) if err != nil { @@ -84,6 +203,6 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 i -= n2 i = encodeVarintGenerated(data, i, uint64(n2)) i-- - data[i] = 0xa + data[i] = typeMetaTag return msgSize - i, nil } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go new file mode 100644 index 0000000000000..39fb709426dcc --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "bytes" + "io" + "math" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestVarint(t *testing.T) { + varintBuffer := make([]byte, maxUint64VarIntLength) + offset := encodeVarintGenerated(varintBuffer, len(varintBuffer), math.MaxUint64) + used := len(varintBuffer) - offset + if used != maxUint64VarIntLength { + t.Fatalf("expected encodeVarintGenerated to use %d bytes to encode MaxUint64, got %d", maxUint64VarIntLength, used) + } +} + +func TestNestedMarshalToWriter(t *testing.T) { + testcases := []struct { + name string + raw []byte + }{ + { + name: "zero-length", + raw: []byte{}, + }, + { + name: "simple", + raw: []byte{0x00, 0x01, 0x02, 0x03}, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + u := &Unknown{ + ContentType: "ct", + ContentEncoding: "ce", + TypeMeta: TypeMeta{ + APIVersion: "v1", + Kind: "k", + }, + } + + // Marshal normally with Raw inlined + u.Raw = tc.raw + marshalData, err := u.Marshal() + if err != nil { + t.Fatal(err) + } + u.Raw = nil + + // Marshal with NestedMarshalTo + nestedMarshalData := make([]byte, len(marshalData)) + n, err := u.NestedMarshalTo(nestedMarshalData, copyMarshaler(tc.raw), uint64(len(tc.raw))) + if err != nil { + t.Fatal(err) + } + if n != len(marshalData) { + t.Errorf("NestedMarshalTo returned %d, expected %d", n, len(marshalData)) + } + if e, a := marshalData, nestedMarshalData; !bytes.Equal(e, a) { + t.Errorf("NestedMarshalTo and Marshal differ:\n%s", cmp.Diff(e, a)) + } + + // Streaming marshal with MarshalToWriter + buf := bytes.NewBuffer(nil) + n, err = u.MarshalToWriter(buf, len(tc.raw), func(w io.Writer) (int, error) { + return w.Write(tc.raw) + }) + if err != nil { + t.Fatal(err) + } + if n != len(marshalData) { + t.Errorf("MarshalToWriter returned %d, expected %d", n, len(marshalData)) + } + if e, a := marshalData, buf.Bytes(); !bytes.Equal(e, a) { + t.Errorf("MarshalToWriter and Marshal differ:\n%s", cmp.Diff(e, a)) + } + }) + } +} + +type copyMarshaler []byte + +func (c copyMarshaler) MarshalTo(dest []byte) (int, error) { + n := copy(dest, []byte(c)) + return n, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index acc95bdc65e59..e6f566aa3a32a 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -156,9 +156,9 @@ const ( // (usually the entire object), and if the size is smaller no gzipping will be performed // if the client requests it. defaultGzipThresholdBytes = 128 * 1024 - // Use the length of the first write of streaming implementations. - // TODO: Update when streaming proto is implemented - firstWriteStreamingThresholdBytes = 1 + // Use the length of the first write to recognize streaming implementations. + // When streaming JSON first write is "{", while Kubernetes protobuf starts unique 4 byte header. + firstWriteStreamingThresholdBytes = 4 ) // negotiateContentEncoding returns a supported client-requested content encoding for the diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 979014b37d43b..862d6d9601312 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" rand2 "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" @@ -845,6 +846,34 @@ func TestStreamingGzipIntegration(t *testing.T) { expectGzip: true, expectStreaming: true, }, + { + name: "Protobuf, small object, default -> no gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: false, + }, + { + name: "Protobuf, small object, streaming -> no gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: true, + }, + { + name: "Protobuf, large object, default -> gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: false, + }, + { + name: "Protobuf, large object, streaming -> gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: true, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index d63bb96c65170..942c79d8bb1f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -267,9 +267,13 @@ const ( StorageVersionHash featuregate.Feature = "StorageVersionHash" // owner: @serathius - // Allow API server to encode collections item by item, instead of all at once. + // Allow API server JSON encoder to encode collections item by item, instead of all at once. StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + // owner: @serathius + // Allow API server Protobuf encoder to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf" + // owner: @aramase, @enj, @nabokihms // kep: https://kep.k8s.io/3331 // alpha: v1.29 @@ -353,6 +357,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate StreamingCollectionEncodingToJSON: { {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, }, + + StreamingCollectionEncodingToProtobuf: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, } // defaultKubernetesFeatureGates consists of all known Kubernetes-specific feature keys. diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 9047240f104f2..86e179f60f626 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -1026,6 +1026,9 @@ func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } if len(opts) != 0 { codecs = serializer.NewCodecFactory(scheme, opts...) } From 1b3519c72e2adbe00b44e8adde120d4147c38811 Mon Sep 17 00:00:00 2001 From: Roman Feldman Date: Tue, 30 Sep 2025 10:30:07 -0700 Subject: [PATCH 7/8] UPSTREAM: : Switch test to gofuzz randfill isn't available in 4.18 --- .../pkg/runtime/serializer/protobuf/collections_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go index c48d416b5dc53..d6e3a6d444daf 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go @@ -25,7 +25,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" - "sigs.k8s.io/randfill" + fuzz "github.com/google/gofuzz" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" @@ -265,14 +265,14 @@ func (b *writeCountingBuffer) Reset() { } func TestFuzzCollection(t *testing.T) { - f := randfill.New() + f := fuzz.New() streamingEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}) streamingBuffer := &bytes.Buffer{} normalEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: false}) normalBuffer := &bytes.Buffer{} for i := 0; i < 1000; i++ { list := &testapigroupv1.CarpList{} - f.FillNoCustom(list) + f.FuzzNoCustom(list) streamingBuffer.Reset() normalBuffer.Reset() if err := streamingEncoder.Encode(list, streamingBuffer); err != nil { From 2c61e8e79990fdd1603b294e01dc8bbb42aff0ec Mon Sep 17 00:00:00 2001 From: Roman Feldman Date: Fri, 10 Oct 2025 13:15:39 -0700 Subject: [PATCH 8/8] UPSTREAM: : Make streaming collection encoding off by default Require an explicit opt in for special handling of collection encoding. --- pkg/features/versioned_kube_features.go | 4 ++-- staging/src/k8s.io/apiserver/pkg/features/kube_features.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 517b652f39fc0..90dad73f7a3db 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -33,10 +33,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate // {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha}, // }, StreamingCollectionEncodingToJSON: { - {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta}, }, StreamingCollectionEncodingToProtobuf: { - {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta}, }, } diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 942c79d8bb1f6..9a119ba9410d2 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -355,11 +355,11 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate // {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha}, // }, StreamingCollectionEncodingToJSON: { - {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta}, }, StreamingCollectionEncodingToProtobuf: { - {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta}, }, }