Skip to content

Commit 25dc6c9

Browse files
authored
Merge pull request kubernetes#130281 from z1cheng/issue_130264
Implement chunking for gzip encoder in deferredResponseWriter
2 parents b38bf6c + 2472f49 commit 25dc6c9

File tree

2 files changed

+215
-30
lines changed

2 files changed

+215
-30
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ const (
157157
// (usually the entire object), and if the size is smaller no gzipping will be performed
158158
// if the client requests it.
159159
defaultGzipThresholdBytes = 128 * 1024
160+
// Use the length of the first write of streaming implementations.
161+
// TODO: Update when streaming proto is implemented
162+
firstWriteStreamingThresholdBytes = 1
160163
)
161164

162165
// negotiateContentEncoding returns a supported client-requested content encoding for the
@@ -192,14 +195,53 @@ type deferredResponseWriter struct {
192195
statusCode int
193196
contentEncoding string
194197

195-
hasWritten bool
196-
hw http.ResponseWriter
197-
w io.Writer
198+
hasBuffered bool
199+
buffer []byte
200+
hasWritten bool
201+
hw http.ResponseWriter
202+
w io.Writer
198203

199204
ctx context.Context
200205
}
201206

202207
func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
208+
switch {
209+
case w.hasWritten:
210+
// already written, cannot buffer
211+
return w.unbufferedWrite(p)
212+
213+
case w.contentEncoding != "gzip":
214+
// non-gzip, no need to buffer
215+
return w.unbufferedWrite(p)
216+
217+
case !w.hasBuffered && len(p) > defaultGzipThresholdBytes:
218+
// not yet buffered, first write is long enough to trigger gzip, no need to buffer
219+
return w.unbufferedWrite(p)
220+
221+
case !w.hasBuffered && len(p) > firstWriteStreamingThresholdBytes:
222+
// not yet buffered, first write is longer than expected for streaming scenarios that would require buffering, no need to buffer
223+
return w.unbufferedWrite(p)
224+
225+
default:
226+
if !w.hasBuffered {
227+
w.hasBuffered = true
228+
// Start at 80 bytes to avoid rapid reallocation of the buffer.
229+
// The minimum size of a 0-item serialized list object is 80 bytes:
230+
// {"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"1"},"items":[]}\n
231+
w.buffer = make([]byte, 0, max(80, len(p)))
232+
}
233+
w.buffer = append(w.buffer, p...)
234+
var err error
235+
if len(w.buffer) > defaultGzipThresholdBytes {
236+
// we've accumulated enough to trigger gzip, write and clear buffer
237+
_, err = w.unbufferedWrite(w.buffer)
238+
w.buffer = nil
239+
}
240+
return len(p), err
241+
}
242+
}
243+
244+
func (w *deferredResponseWriter) unbufferedWrite(p []byte) (n int, err error) {
203245
ctx := w.ctx
204246
span := tracing.SpanFromContext(ctx)
205247
// This Step usually wraps in-memory object serialization.
@@ -245,11 +287,17 @@ func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
245287
return w.w.Write(p)
246288
}
247289

248-
func (w *deferredResponseWriter) Close() error {
290+
func (w *deferredResponseWriter) Close() (err error) {
249291
if !w.hasWritten {
250-
return nil
292+
if !w.hasBuffered {
293+
return nil
294+
}
295+
// never reached defaultGzipThresholdBytes, no need to do the gzip writer cleanup
296+
_, err := w.unbufferedWrite(w.buffer)
297+
w.buffer = nil
298+
return err
251299
}
252-
var err error
300+
253301
switch t := w.w.(type) {
254302
case *gzip.Writer:
255303
err = t.Close()

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go

Lines changed: 161 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"os"
3434
"reflect"
3535
"strconv"
36-
"strings"
3736
"testing"
3837
"time"
3938

@@ -42,6 +41,7 @@ import (
4241
kerrors "k8s.io/apimachinery/pkg/api/errors"
4342
"k8s.io/apimachinery/pkg/runtime"
4443
"k8s.io/apimachinery/pkg/runtime/schema"
44+
rand2 "k8s.io/apimachinery/pkg/util/rand"
4545
"k8s.io/apimachinery/pkg/util/uuid"
4646
"k8s.io/apiserver/pkg/features"
4747
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -378,29 +378,94 @@ func TestDeferredResponseWriter_Write(t *testing.T) {
378378
largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1)
379379

380380
tests := []struct {
381-
name string
382-
chunks [][]byte
383-
expectGzip bool
381+
name string
382+
chunks [][]byte
383+
expectGzip bool
384+
expectHeaders http.Header
384385
}{
386+
{
387+
name: "no writes",
388+
chunks: nil,
389+
expectGzip: false,
390+
expectHeaders: http.Header{},
391+
},
392+
{
393+
name: "one empty write",
394+
chunks: [][]byte{{}},
395+
expectGzip: false,
396+
expectHeaders: http.Header{
397+
"Content-Type": []string{"text/plain"},
398+
},
399+
},
400+
{
401+
name: "one single byte write",
402+
chunks: [][]byte{{'{'}},
403+
expectGzip: false,
404+
expectHeaders: http.Header{
405+
"Content-Type": []string{"text/plain"},
406+
},
407+
},
385408
{
386409
name: "one small chunk write",
387410
chunks: [][]byte{smallChunk},
388411
expectGzip: false,
412+
expectHeaders: http.Header{
413+
"Content-Type": []string{"text/plain"},
414+
},
389415
},
390416
{
391417
name: "two small chunk writes",
392418
chunks: [][]byte{smallChunk, smallChunk},
393419
expectGzip: false,
420+
expectHeaders: http.Header{
421+
"Content-Type": []string{"text/plain"},
422+
},
423+
},
424+
{
425+
name: "one single byte and one small chunk write",
426+
chunks: [][]byte{{'{'}, smallChunk},
427+
expectGzip: false,
428+
expectHeaders: http.Header{
429+
"Content-Type": []string{"text/plain"},
430+
},
431+
},
432+
{
433+
name: "two single bytes and one small chunk write",
434+
chunks: [][]byte{{'{'}, {'{'}, smallChunk},
435+
expectGzip: true,
436+
expectHeaders: http.Header{
437+
"Content-Type": []string{"text/plain"},
438+
"Content-Encoding": []string{"gzip"},
439+
"Vary": []string{"Accept-Encoding"},
440+
},
394441
},
395442
{
396443
name: "one large chunk writes",
397444
chunks: [][]byte{largeChunk},
398445
expectGzip: true,
446+
expectHeaders: http.Header{
447+
"Content-Type": []string{"text/plain"},
448+
"Content-Encoding": []string{"gzip"},
449+
"Vary": []string{"Accept-Encoding"},
450+
},
399451
},
400452
{
401453
name: "two large chunk writes",
402454
chunks: [][]byte{largeChunk, largeChunk},
403455
expectGzip: true,
456+
expectHeaders: http.Header{
457+
"Content-Type": []string{"text/plain"},
458+
"Content-Encoding": []string{"gzip"},
459+
"Vary": []string{"Accept-Encoding"},
460+
},
461+
},
462+
{
463+
name: "one small chunk and one large chunk write",
464+
chunks: [][]byte{smallChunk, largeChunk},
465+
expectGzip: false,
466+
expectHeaders: http.Header{
467+
"Content-Type": []string{"text/plain"},
468+
},
404469
},
405470
}
406471

@@ -441,23 +506,16 @@ func TestDeferredResponseWriter_Write(t *testing.T) {
441506
if res.StatusCode != http.StatusOK {
442507
t.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode)
443508
}
444-
contentEncoding := res.Header.Get("Content-Encoding")
445-
varyHeader := res.Header.Get("Vary")
509+
if !reflect.DeepEqual(res.Header, tt.expectHeaders) {
510+
t.Fatal(cmp.Diff(tt.expectHeaders, res.Header))
511+
}
446512

447513
resBytes, err := io.ReadAll(res.Body)
448514
if err != nil {
449515
t.Fatalf("unexpected error occurred while reading response body: %v", err)
450516
}
451517

452518
if tt.expectGzip {
453-
if contentEncoding != "gzip" {
454-
t.Fatalf("content-encoding is not set properly, expected: gzip, got: %s", contentEncoding)
455-
}
456-
457-
if !strings.Contains(varyHeader, "Accept-Encoding") {
458-
t.Errorf("vary header doesn't have Accept-Encoding")
459-
}
460-
461519
gr, err := gzip.NewReader(bytes.NewReader(resBytes))
462520
if err != nil {
463521
t.Fatalf("failed to create gzip reader: %v", err)
@@ -471,22 +529,101 @@ func TestDeferredResponseWriter_Write(t *testing.T) {
471529
if !bytes.Equal(fullPayload, decompressed) {
472530
t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, decompressed)
473531
}
474-
475532
} else {
476-
if contentEncoding != "" {
477-
t.Errorf("content-encoding is set unexpectedly")
478-
}
479-
480-
if strings.Contains(varyHeader, "Accept-Encoding") {
481-
t.Errorf("accept encoding is set unexpectedly")
482-
}
483-
484533
if !bytes.Equal(fullPayload, resBytes) {
485534
t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, resBytes)
486535
}
487-
488536
}
537+
})
538+
}
539+
}
540+
541+
func benchmarkChunkingGzip(b *testing.B, count int, chunk []byte) {
542+
mockResponseWriter := httptest.NewRecorder()
543+
mockResponseWriter.Body = nil
544+
545+
drw := &deferredResponseWriter{
546+
mediaType: "text/plain",
547+
statusCode: 200,
548+
contentEncoding: "gzip",
549+
hw: mockResponseWriter,
550+
ctx: context.Background(),
551+
}
552+
b.ResetTimer()
553+
for i := 0; i < count; i++ {
554+
n, err := drw.Write(chunk)
555+
if err != nil {
556+
b.Fatalf("unexpected error while writing chunk: %v", err)
557+
}
558+
if n != len(chunk) {
559+
b.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n)
560+
}
561+
}
562+
err := drw.Close()
563+
if err != nil {
564+
b.Fatalf("unexpected error when closing deferredResponseWriter: %v", err)
565+
}
566+
res := mockResponseWriter.Result()
567+
if res.StatusCode != http.StatusOK {
568+
b.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode)
569+
}
570+
}
571+
572+
func BenchmarkChunkingGzip(b *testing.B) {
573+
tests := []struct {
574+
count int
575+
size int
576+
}{
577+
{
578+
count: 100,
579+
size: 1_000,
580+
},
581+
{
582+
count: 100,
583+
size: 100_000,
584+
},
585+
{
586+
count: 1_000,
587+
size: 100_000,
588+
},
589+
{
590+
count: 1_000,
591+
size: 1_000_000,
592+
},
593+
{
594+
count: 10_000,
595+
size: 100_000,
596+
},
597+
{
598+
count: 100_000,
599+
size: 10_000,
600+
},
601+
{
602+
count: 1,
603+
size: 100_000,
604+
},
605+
{
606+
count: 1,
607+
size: 1_000_000,
608+
},
609+
{
610+
count: 1,
611+
size: 10_000_000,
612+
},
613+
{
614+
count: 1,
615+
size: 100_000_000,
616+
},
617+
{
618+
count: 1,
619+
size: 1_000_000_000,
620+
},
621+
}
489622

623+
for _, t := range tests {
624+
b.Run(fmt.Sprintf("Count=%d/Size=%d", t.count, t.size), func(b *testing.B) {
625+
chunk := []byte(rand2.String(t.size))
626+
benchmarkChunkingGzip(b, t.count, chunk)
490627
})
491628
}
492629
}

0 commit comments

Comments
 (0)