Skip to content

Commit 861c90c

Browse files
AlexanderYastrebovgopherbot
authored andcommitted
net/http: pool transport gzip readers
goos: linux goarch: amd64 pkg: net/http │ HEAD~1 │ HEAD │ │ sec/op │ sec/op vs base │ ClientGzip-8 621.0µ ± 2% 616.3µ ± 10% ~ (p=0.971 n=10) │ HEAD~1 │ HEAD │ │ B/op │ B/op vs base │ ClientGzip-8 49.765Ki ± 0% 9.514Ki ± 2% -80.88% (p=0.000 n=10) │ HEAD~1 │ HEAD │ │ allocs/op │ allocs/op vs base │ ClientGzip-8 57.00 ± 0% 52.00 ± 0% -8.77% (p=0.000 n=10) Allocation saving comes from absent compress/flate.(*dictDecoder).init This change also improves concurrent body read detection by returning an explicit error. Updates golang#61353 Change-Id: I380acfca912dc009b3b9c8283e27b3526cedd546 GitHub-Last-Rev: df12f6a GitHub-Pull-Request: golang#61390 Reviewed-on: https://go-review.googlesource.com/c/go/+/510255 Reviewed-by: Sean Liao <[email protected]> Auto-Submit: Michael Pratt <[email protected]> Reviewed-by: Michael Pratt <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Cherry Mui <[email protected]>
1 parent 57769b5 commit 861c90c

File tree

2 files changed

+151
-40
lines changed

2 files changed

+151
-40
lines changed

src/net/http/serve_test.go

Lines changed: 75 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"compress/gzip"
1313
"compress/zlib"
1414
"context"
15+
crand "crypto/rand"
1516
"crypto/tls"
1617
"crypto/x509"
1718
"encoding/json"
@@ -5281,8 +5282,8 @@ func benchmarkClientServerParallel(b *testing.B, parallelism int, mode testMode)
52815282
func BenchmarkServer(b *testing.B) {
52825283
b.ReportAllocs()
52835284
// Child process mode;
5284-
if url := os.Getenv("TEST_BENCH_SERVER_URL"); url != "" {
5285-
n, err := strconv.Atoi(os.Getenv("TEST_BENCH_CLIENT_N"))
5285+
if url := os.Getenv("GO_TEST_BENCH_SERVER_URL"); url != "" {
5286+
n, err := strconv.Atoi(os.Getenv("GO_TEST_BENCH_CLIENT_N"))
52865287
if err != nil {
52875288
panic(err)
52885289
}
@@ -5316,8 +5317,8 @@ func BenchmarkServer(b *testing.B) {
53165317

53175318
cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$")
53185319
cmd.Env = append([]string{
5319-
fmt.Sprintf("TEST_BENCH_CLIENT_N=%d", b.N),
5320-
fmt.Sprintf("TEST_BENCH_SERVER_URL=%s", ts.URL),
5320+
fmt.Sprintf("GO_TEST_BENCH_CLIENT_N=%d", b.N),
5321+
fmt.Sprintf("GO_TEST_BENCH_SERVER_URL=%s", ts.URL),
53215322
}, os.Environ()...)
53225323
out, err := cmd.CombinedOutput()
53235324
if err != nil {
@@ -5338,39 +5339,63 @@ func getNoBody(urlStr string) (*Response, error) {
53385339
// A benchmark for profiling the client without the HTTP server code.
53395340
// The server code runs in a subprocess.
53405341
func BenchmarkClient(b *testing.B) {
5342+
var data = []byte("Hello world.\n")
5343+
5344+
url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
5345+
w.Header().Set("Content-Type", "text/html; charset=utf-8")
5346+
w.Write(data)
5347+
}))
5348+
5349+
// Do b.N requests to the server.
5350+
b.StartTimer()
5351+
for i := 0; i < b.N; i++ {
5352+
res, err := Get(url)
5353+
if err != nil {
5354+
b.Fatalf("Get: %v", err)
5355+
}
5356+
body, err := io.ReadAll(res.Body)
5357+
res.Body.Close()
5358+
if err != nil {
5359+
b.Fatalf("ReadAll: %v", err)
5360+
}
5361+
if !bytes.Equal(body, data) {
5362+
b.Fatalf("Got body: %q", body)
5363+
}
5364+
}
5365+
b.StopTimer()
5366+
}
5367+
5368+
func startClientBenchmarkServer(b *testing.B, handler Handler) string {
53415369
b.ReportAllocs()
53425370
b.StopTimer()
5343-
defer afterTest(b)
53445371

5345-
var data = []byte("Hello world.\n")
5346-
if server := os.Getenv("TEST_BENCH_SERVER"); server != "" {
5372+
if server := os.Getenv("GO_TEST_BENCH_SERVER"); server != "" {
53475373
// Server process mode.
5348-
port := os.Getenv("TEST_BENCH_SERVER_PORT") // can be set by user
5374+
port := os.Getenv("GO_TEST_BENCH_SERVER_PORT") // can be set by user
53495375
if port == "" {
53505376
port = "0"
53515377
}
53525378
ln, err := net.Listen("tcp", "localhost:"+port)
53535379
if err != nil {
5354-
fmt.Fprintln(os.Stderr, err.Error())
5355-
os.Exit(1)
5380+
log.Fatal(err)
53565381
}
53575382
fmt.Println(ln.Addr().String())
5383+
53585384
HandleFunc("/", func(w ResponseWriter, r *Request) {
53595385
r.ParseForm()
53605386
if r.Form.Get("stop") != "" {
53615387
os.Exit(0)
53625388
}
5363-
w.Header().Set("Content-Type", "text/html; charset=utf-8")
5364-
w.Write(data)
5389+
handler.ServeHTTP(w, r)
53655390
})
53665391
var srv Server
53675392
log.Fatal(srv.Serve(ln))
53685393
}
53695394

53705395
// Start server process.
53715396
ctx, cancel := context.WithCancel(context.Background())
5372-
cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkClient$")
5373-
cmd.Env = append(cmd.Environ(), "TEST_BENCH_SERVER=yes")
5397+
cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^"+b.Name()+"$")
5398+
cmd.Env = append(cmd.Environ(), "GO_TEST_BENCH_SERVER=yes")
53745399
cmd.Stderr = os.Stderr
53755400
stdout, err := cmd.StdoutPipe()
53765401
if err != nil {
@@ -5385,10 +5410,6 @@ func BenchmarkClient(b *testing.B) {
53855410
done <- cmd.Wait()
53865411
close(done)
53875412
}()
5388-
defer func() {
5389-
cancel()
5390-
<-done
5391-
}()
53925413

53935414
// Wait for the server in the child process to respond and tell us
53945415
// its listening address, once it's started listening:
@@ -5401,29 +5422,56 @@ func BenchmarkClient(b *testing.B) {
54015422
b.Fatalf("initial probe of child process failed: %v", err)
54025423
}
54035424

5425+
// Instruct server process to stop.
5426+
b.Cleanup(func() {
5427+
getNoBody(url + "?stop=yes")
5428+
if err := <-done; err != nil {
5429+
b.Fatalf("subprocess failed: %v", err)
5430+
}
5431+
5432+
cancel()
5433+
<-done
5434+
5435+
afterTest(b)
5436+
})
5437+
5438+
return url
5439+
}
5440+
5441+
func BenchmarkClientGzip(b *testing.B) {
5442+
const responseSize = 1024 * 1024
5443+
5444+
var buf bytes.Buffer
5445+
gz := gzip.NewWriter(&buf)
5446+
if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil {
5447+
b.Fatal(err)
5448+
}
5449+
gz.Close()
5450+
5451+
data := buf.Bytes()
5452+
5453+
url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
5454+
w.Header().Set("Content-Encoding", "gzip")
5455+
w.Write(data)
5456+
}))
5457+
54045458
// Do b.N requests to the server.
54055459
b.StartTimer()
54065460
for i := 0; i < b.N; i++ {
54075461
res, err := Get(url)
54085462
if err != nil {
54095463
b.Fatalf("Get: %v", err)
54105464
}
5411-
body, err := io.ReadAll(res.Body)
5465+
n, err := io.Copy(io.Discard, res.Body)
54125466
res.Body.Close()
54135467
if err != nil {
54145468
b.Fatalf("ReadAll: %v", err)
54155469
}
5416-
if !bytes.Equal(body, data) {
5417-
b.Fatalf("Got body: %q", body)
5470+
if n != responseSize {
5471+
b.Fatalf("ReadAll: expected %d bytes, got %d", responseSize, n)
54185472
}
54195473
}
54205474
b.StopTimer()
5421-
5422-
// Instruct server process to stop.
5423-
getNoBody(url + "?stop=yes")
5424-
if err := <-done; err != nil {
5425-
b.Fatalf("subprocess failed: %v", err)
5426-
}
54275475
}
54285476

54295477
func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) {

src/net/http/transport.go

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package http
1111

1212
import (
1313
"bufio"
14+
"compress/flate"
1415
"compress/gzip"
1516
"container/list"
1617
"context"
@@ -2988,6 +2989,7 @@ type bodyEOFSignal struct {
29882989
}
29892990

29902991
var errReadOnClosedResBody = errors.New("http: read on closed response body")
2992+
var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
29912993

29922994
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
29932995
es.mu.Lock()
@@ -3037,37 +3039,98 @@ func (es *bodyEOFSignal) condfn(err error) error {
30373039
}
30383040

30393041
// gzipReader wraps a response body so it can lazily
3040-
// call gzip.NewReader on the first call to Read
3042+
// get gzip.Reader from the pool on the first call to Read.
3043+
// After Close is called it puts gzip.Reader to the pool immediately
3044+
// if there is no Read in progress or later when Read completes.
30413045
type gzipReader struct {
30423046
_ incomparable
30433047
body *bodyEOFSignal // underlying HTTP/1 response body framing
3044-
zr *gzip.Reader // lazily-initialized gzip reader
3045-
zerr error // any error from gzip.NewReader; sticky
3048+
mu sync.Mutex // guards zr and zerr
3049+
zr *gzip.Reader
3050+
zerr error
30463051
}
30473052

3048-
func (gz *gzipReader) Read(p []byte) (n int, err error) {
3053+
type eofReader struct{}
3054+
3055+
func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3056+
func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3057+
3058+
var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3059+
3060+
// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
3061+
func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3062+
zr := gzipPool.Get().(*gzip.Reader)
3063+
if err := zr.Reset(r); err != nil {
3064+
gzipPoolPut(zr)
3065+
return nil, err
3066+
}
3067+
return zr, nil
3068+
}
3069+
3070+
// gzipPoolPut puts a gzip.Reader back into the pool.
3071+
func gzipPoolPut(zr *gzip.Reader) {
3072+
// Reset will allocate bufio.Reader if we pass it anything
3073+
// other than a flate.Reader, so ensure that it's getting one.
3074+
var r flate.Reader = eofReader{}
3075+
zr.Reset(r)
3076+
gzipPool.Put(zr)
3077+
}
3078+
3079+
// acquire returns a gzip.Reader for reading response body.
3080+
// The reader must be released after use.
3081+
func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3082+
gz.mu.Lock()
3083+
defer gz.mu.Unlock()
3084+
if gz.zerr != nil {
3085+
return nil, gz.zerr
3086+
}
30493087
if gz.zr == nil {
3050-
if gz.zerr == nil {
3051-
gz.zr, gz.zerr = gzip.NewReader(gz.body)
3052-
}
3088+
gz.zr, gz.zerr = gzipPoolGet(gz.body)
30533089
if gz.zerr != nil {
3054-
return 0, gz.zerr
3090+
return nil, gz.zerr
30553091
}
30563092
}
3093+
ret := gz.zr
3094+
gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3095+
return ret, nil
3096+
}
3097+
3098+
// release returns the gzip.Reader to the pool if Close was called during Read.
3099+
func (gz *gzipReader) release(zr *gzip.Reader) {
3100+
gz.mu.Lock()
3101+
defer gz.mu.Unlock()
3102+
if gz.zerr == errConcurrentReadOnResBody {
3103+
gz.zr, gz.zerr = zr, nil
3104+
} else { // errReadOnClosedResBody
3105+
gzipPoolPut(zr)
3106+
}
3107+
}
30573108

3058-
gz.body.mu.Lock()
3059-
if gz.body.closed {
3060-
err = errReadOnClosedResBody
3109+
// close returns the gzip.Reader to the pool immediately or
3110+
// signals release to do so after Read completes.
3111+
func (gz *gzipReader) close() {
3112+
gz.mu.Lock()
3113+
defer gz.mu.Unlock()
3114+
if gz.zerr == nil && gz.zr != nil {
3115+
gzipPoolPut(gz.zr)
3116+
gz.zr = nil
30613117
}
3062-
gz.body.mu.Unlock()
3118+
gz.zerr = errReadOnClosedResBody
3119+
}
30633120

3121+
func (gz *gzipReader) Read(p []byte) (n int, err error) {
3122+
zr, err := gz.acquire()
30643123
if err != nil {
30653124
return 0, err
30663125
}
3067-
return gz.zr.Read(p)
3126+
defer gz.release(zr)
3127+
3128+
return zr.Read(p)
30683129
}
30693130

30703131
func (gz *gzipReader) Close() error {
3132+
gz.close()
3133+
30713134
return gz.body.Close()
30723135
}
30733136

0 commit comments

Comments
 (0)