Skip to content

Commit e7c743b

Browse files
committed
Streaming JSON encoder for List
1 parent a18b4a8 commit e7c743b

File tree

13 files changed

+532
-19
lines changed

13 files changed

+532
-19
lines changed

pkg/features/kube_features.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,10 @@ const (
667667
// Enables support for the StorageVersionMigrator controller.
668668
StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"
669669

670+
// owner: @serathius
671+
// Allow API server to encode collections item by item, instead of all at once.
672+
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
673+
670674
// owner: @robscott
671675
// kep: https://kep.k8s.io/2433
672676
//

pkg/features/versioned_kube_features.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
741741
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
742742
},
743743

744+
StreamingCollectionEncodingToJSON: {
745+
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
746+
},
747+
744748
SupplementalGroupsPolicy: {
745749
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
746750
},

pkg/registry/core/rest/storage_core_generic.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,15 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
7373
ParameterCodec: legacyscheme.ParameterCodec,
7474
NegotiatedSerializer: legacyscheme.Codecs,
7575
}
76+
opts := []serializer.CodecFactoryOptionsMutator{}
7677
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
77-
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, serializer.WithSerializer(cbor.NewSerializerInfo))
78+
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
79+
}
80+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
81+
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
82+
}
83+
if len(opts) != 0 {
84+
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
7885
}
7986

8087
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
851851
clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped
852852

853853
// CRDs explicitly do not support protobuf, but some objects returned by the API server do
854+
streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON)
854855
negotiatedSerializer := unstructuredNegotiatedSerializer{
855856
typer: typer,
856857
creator: creator,
@@ -864,10 +865,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
864865
MediaTypeType: "application",
865866
MediaTypeSubType: "json",
866867
EncodesAsText: true,
867-
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{}),
868+
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}),
868869
PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
869870
StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
870-
Strict: true,
871+
Strict: true,
872+
StreamingCollectionsEncoding: streamingCollections,
871873
}),
872874
StreamSerializer: &runtime.StreamSerializerInfo{
873875
EncodesAsText: true,
@@ -970,6 +972,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
970972
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
971973
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
972974
}
975+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
976+
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
977+
}
973978
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
974979
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
975980
scaleScope.Namer = handlers.ContextBasedNaming{

staging/src/k8s.io/apimachinery/pkg/api/meta/help.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
221221
if err != nil {
222222
return nil, err
223223
}
224+
if items.IsNil() {
225+
return nil, nil
226+
}
224227
list := make([]runtime.Object, items.Len())
225228
if len(list) == 0 {
226229
return list, nil

staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []runtime.SerializerInfo {
2929
jsonSerializer := json.NewSerializerWithOptions(
3030
mf, scheme, scheme,
31-
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
31+
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
3232
)
3333
jsonSerializerType := runtime.SerializerInfo{
3434
MediaType: runtime.ContentTypeJSON,
@@ -38,7 +38,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
3838
Serializer: jsonSerializer,
3939
StrictSerializer: json.NewSerializerWithOptions(
4040
mf, scheme, scheme,
41-
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true},
41+
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
4242
),
4343
StreamSerializer: &runtime.StreamSerializerInfo{
4444
EncodesAsText: true,
@@ -113,6 +113,8 @@ type CodecFactoryOptions struct {
113113
// Pretty includes a pretty serializer along with the non-pretty one
114114
Pretty bool
115115

116+
StreamingCollectionsEncodingToJSON bool
117+
116118
serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
117119
}
118120

@@ -147,6 +149,12 @@ func WithSerializer(f func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.S
147149
}
148150
}
149151

152+
func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
153+
return func(options *CodecFactoryOptions) {
154+
options.StreamingCollectionsEncodingToJSON = true
155+
}
156+
}
157+
150158
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
151159
// and conversion wrappers to define preferred internal and external versions. In the future,
152160
// as the internal version is used less, callers may instead use a defaulting serializer and
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package json
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"io"
23+
"maps"
24+
"slices"
25+
"sort"
26+
27+
"k8s.io/apimachinery/pkg/api/meta"
28+
"k8s.io/apimachinery/pkg/conversion"
29+
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
32+
"k8s.io/apimachinery/pkg/runtime"
33+
)
34+
35+
func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) {
36+
list, ok := obj.(*unstructured.UnstructuredList)
37+
if ok {
38+
return true, streamingEncodeUnstructuredList(w, list)
39+
}
40+
if _, ok := obj.(json.Marshaler); ok {
41+
return false, nil
42+
}
43+
typeMeta, listMeta, items, err := getListMeta(obj)
44+
if err == nil {
45+
return true, streamingEncodeList(w, typeMeta, listMeta, items)
46+
}
47+
return false, nil
48+
}
49+
50+
// getListMeta implements list extraction logic for json stream serialization.
51+
//
52+
// Reason for a custom logic instead of reusing accessors from meta package:
53+
// * Validate json tags to prevent incompatibility with json standard package.
54+
// * ListMetaAccessor doesn't distinguish empty from nil value.
55+
// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}"
56+
func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) {
57+
listValue, err := conversion.EnforcePtr(list)
58+
if err != nil {
59+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
60+
}
61+
listType := listValue.Type()
62+
if listType.NumField() != 3 {
63+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields")
64+
}
65+
// TypeMeta
66+
typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta)
67+
if !ok {
68+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type")
69+
}
70+
if listType.Field(0).Tag.Get("json") != ",inline" {
71+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`)
72+
}
73+
// ListMeta
74+
listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta)
75+
if !ok {
76+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type")
77+
}
78+
if listType.Field(1).Tag.Get("json") != "metadata,omitempty" {
79+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`)
80+
}
81+
// Items
82+
items, err := meta.ExtractList(list)
83+
if err != nil {
84+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
85+
}
86+
if listType.Field(2).Tag.Get("json") != "items" {
87+
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`)
88+
}
89+
return typeMeta, listMeta, items, nil
90+
}
91+
92+
func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error {
93+
// Start
94+
if _, err := w.Write([]byte(`{`)); err != nil {
95+
return err
96+
}
97+
98+
// TypeMeta
99+
if typeMeta.Kind != "" {
100+
if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil {
101+
return err
102+
}
103+
}
104+
if typeMeta.APIVersion != "" {
105+
if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil {
106+
return err
107+
}
108+
}
109+
110+
// ListMeta
111+
if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil {
112+
return err
113+
}
114+
115+
// Items
116+
if err := encodeItemsObjectSlice(w, items); err != nil {
117+
return err
118+
}
119+
120+
// End
121+
_, err := w.Write([]byte("}\n"))
122+
return err
123+
}
124+
125+
func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) {
126+
if items == nil {
127+
err := encodeKeyValuePair(w, "items", nil, nil)
128+
return err
129+
}
130+
_, err = w.Write([]byte(`"items":[`))
131+
if err != nil {
132+
return err
133+
}
134+
suffix := []byte(",")
135+
for i, item := range items {
136+
if i == len(items)-1 {
137+
suffix = nil
138+
}
139+
err := encodeValue(w, item, suffix)
140+
if err != nil {
141+
return err
142+
}
143+
}
144+
_, err = w.Write([]byte("]"))
145+
if err != nil {
146+
return err
147+
}
148+
return err
149+
}
150+
151+
func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error {
152+
_, err := w.Write([]byte(`{`))
153+
if err != nil {
154+
return err
155+
}
156+
keys := slices.Collect(maps.Keys(list.Object))
157+
if _, exists := list.Object["items"]; !exists {
158+
keys = append(keys, "items")
159+
}
160+
sort.Strings(keys)
161+
162+
suffix := []byte(",")
163+
for i, key := range keys {
164+
if i == len(keys)-1 {
165+
suffix = nil
166+
}
167+
if key == "items" {
168+
err = encodeItemsUnstructuredSlice(w, list.Items, suffix)
169+
} else {
170+
err = encodeKeyValuePair(w, key, list.Object[key], suffix)
171+
}
172+
if err != nil {
173+
return err
174+
}
175+
}
176+
_, err = w.Write([]byte("}\n"))
177+
return err
178+
}
179+
180+
func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) {
181+
_, err = w.Write([]byte(`"items":[`))
182+
if err != nil {
183+
return err
184+
}
185+
comma := []byte(",")
186+
for i, item := range items {
187+
if i == len(items)-1 {
188+
comma = nil
189+
}
190+
err := encodeValue(w, item.Object, comma)
191+
if err != nil {
192+
return err
193+
}
194+
}
195+
_, err = w.Write([]byte("]"))
196+
if err != nil {
197+
return err
198+
}
199+
if len(suffix) > 0 {
200+
_, err = w.Write(suffix)
201+
}
202+
return err
203+
}
204+
205+
func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) {
206+
err = encodeValue(w, key, []byte(":"))
207+
if err != nil {
208+
return err
209+
}
210+
err = encodeValue(w, value, suffix)
211+
if err != nil {
212+
return err
213+
}
214+
return err
215+
}
216+
217+
func encodeValue(w io.Writer, value any, suffix []byte) error {
218+
data, err := json.Marshal(value)
219+
if err != nil {
220+
return err
221+
}
222+
_, err = w.Write(data)
223+
if err != nil {
224+
return err
225+
}
226+
if len(suffix) > 0 {
227+
_, err = w.Write(suffix)
228+
}
229+
return err
230+
}

0 commit comments

Comments
 (0)