Skip to content

Commit 16f375c

Browse files
authored
Merge pull request #482 from prometheus/beorn7/http
Use gzip from a pool and stream while encoding metrics
2 parents 1cafe34 + fb0f7fe commit 16f375c

File tree

4 files changed

+110
-111
lines changed

4 files changed

+110
-111
lines changed

prometheus/http.go

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ package prometheus
1515

1616
import (
1717
"bufio"
18-
"bytes"
1918
"compress/gzip"
20-
"fmt"
2119
"io"
2220
"net"
2321
"net/http"
@@ -41,19 +39,10 @@ const (
4139
acceptEncodingHeader = "Accept-Encoding"
4240
)
4341

44-
var bufPool sync.Pool
45-
46-
func getBuf() *bytes.Buffer {
47-
buf := bufPool.Get()
48-
if buf == nil {
49-
return &bytes.Buffer{}
50-
}
51-
return buf.(*bytes.Buffer)
52-
}
53-
54-
func giveBuf(buf *bytes.Buffer) {
55-
buf.Reset()
56-
bufPool.Put(buf)
42+
var gzipPool = sync.Pool{
43+
New: func() interface{} {
44+
return gzip.NewWriter(nil)
45+
},
5746
}
5847

5948
// Handler returns an HTTP handler for the DefaultGatherer. It is
@@ -71,58 +60,40 @@ func Handler() http.Handler {
7160
// Deprecated: Use promhttp.HandlerFor(DefaultGatherer, promhttp.HandlerOpts{})
7261
// instead. See there for further documentation.
7362
func UninstrumentedHandler() http.Handler {
74-
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
63+
return http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
7564
mfs, err := DefaultGatherer.Gather()
7665
if err != nil {
77-
http.Error(w, "An error has occurred during metrics collection:\n\n"+err.Error(), http.StatusInternalServerError)
66+
httpError(rsp, err)
7867
return
7968
}
8069

8170
contentType := expfmt.Negotiate(req.Header)
82-
buf := getBuf()
83-
defer giveBuf(buf)
84-
writer, encoding := decorateWriter(req, buf)
85-
enc := expfmt.NewEncoder(writer, contentType)
86-
var lastErr error
71+
header := rsp.Header()
72+
header.Set(contentTypeHeader, string(contentType))
73+
74+
w := io.Writer(rsp)
75+
if gzipAccepted(req.Header) {
76+
header.Set(contentEncodingHeader, "gzip")
77+
gz := gzipPool.Get().(*gzip.Writer)
78+
defer gzipPool.Put(gz)
79+
80+
gz.Reset(w)
81+
defer gz.Close()
82+
83+
w = gz
84+
}
85+
86+
enc := expfmt.NewEncoder(w, contentType)
87+
8788
for _, mf := range mfs {
8889
if err := enc.Encode(mf); err != nil {
89-
lastErr = err
90-
http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError)
90+
httpError(rsp, err)
9191
return
9292
}
9393
}
94-
if closer, ok := writer.(io.Closer); ok {
95-
closer.Close()
96-
}
97-
if lastErr != nil && buf.Len() == 0 {
98-
http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError)
99-
return
100-
}
101-
header := w.Header()
102-
header.Set(contentTypeHeader, string(contentType))
103-
header.Set(contentLengthHeader, fmt.Sprint(buf.Len()))
104-
if encoding != "" {
105-
header.Set(contentEncodingHeader, encoding)
106-
}
107-
w.Write(buf.Bytes())
10894
})
10995
}
11096

111-
// decorateWriter wraps a writer to handle gzip compression if requested. It
112-
// returns the decorated writer and the appropriate "Content-Encoding" header
113-
// (which is empty if no compression is enabled).
114-
func decorateWriter(request *http.Request, writer io.Writer) (io.Writer, string) {
115-
header := request.Header.Get(acceptEncodingHeader)
116-
parts := strings.Split(header, ",")
117-
for _, part := range parts {
118-
part = strings.TrimSpace(part)
119-
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
120-
return gzip.NewWriter(writer), "gzip"
121-
}
122-
}
123-
return writer, ""
124-
}
125-
12697
var instLabels = []string{"method", "code"}
12798

12899
type nower interface {
@@ -503,3 +474,31 @@ func sanitizeCode(s int) string {
503474
return strconv.Itoa(s)
504475
}
505476
}
477+
478+
// gzipAccepted returns whether the client will accept gzip-encoded content.
479+
func gzipAccepted(header http.Header) bool {
480+
a := header.Get(acceptEncodingHeader)
481+
parts := strings.Split(a, ",")
482+
for _, part := range parts {
483+
part = strings.TrimSpace(part)
484+
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
485+
return true
486+
}
487+
}
488+
return false
489+
}
490+
491+
// httpError removes any content-encoding header and then calls http.Error with
492+
// the provided error and http.StatusInternalServerErrer. Error contents is
493+
// supposed to be uncompressed plain text. However, same as with a plain
494+
// http.Error, any header settings will be void if the header has already been
495+
// sent. The error message will still be written to the writer, but it will
496+
// probably be of limited use.
497+
func httpError(rsp http.ResponseWriter, err error) {
498+
rsp.Header().Del(contentEncodingHeader)
499+
http.Error(
500+
rsp,
501+
"An error has occurred while serving metrics:\n\n"+err.Error(),
502+
http.StatusInternalServerError,
503+
)
504+
}

prometheus/promhttp/http.go

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
package promhttp
3333

3434
import (
35-
"bytes"
3635
"compress/gzip"
3736
"fmt"
3837
"io"
@@ -53,19 +52,10 @@ const (
5352
acceptEncodingHeader = "Accept-Encoding"
5453
)
5554

56-
var bufPool sync.Pool
57-
58-
func getBuf() *bytes.Buffer {
59-
buf := bufPool.Get()
60-
if buf == nil {
61-
return &bytes.Buffer{}
62-
}
63-
return buf.(*bytes.Buffer)
64-
}
65-
66-
func giveBuf(buf *bytes.Buffer) {
67-
buf.Reset()
68-
bufPool.Put(buf)
55+
var gzipPool = sync.Pool{
56+
New: func() interface{} {
57+
return gzip.NewWriter(nil)
58+
},
6959
}
7060

7161
// Handler returns an http.Handler for the prometheus.DefaultGatherer, using
@@ -100,19 +90,18 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
10090
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
10191
}
10292

103-
h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
93+
h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
10494
if inFlightSem != nil {
10595
select {
10696
case inFlightSem <- struct{}{}: // All good, carry on.
10797
defer func() { <-inFlightSem }()
10898
default:
109-
http.Error(w, fmt.Sprintf(
99+
http.Error(rsp, fmt.Sprintf(
110100
"Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight,
111101
), http.StatusServiceUnavailable)
112102
return
113103
}
114104
}
115-
116105
mfs, err := reg.Gather()
117106
if err != nil {
118107
if opts.ErrorLog != nil {
@@ -123,55 +112,56 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
123112
panic(err)
124113
case ContinueOnError:
125114
if len(mfs) == 0 {
126-
http.Error(w, "No metrics gathered, last error:\n\n"+err.Error(), http.StatusInternalServerError)
115+
// Still report the error if no metrics have been gathered.
116+
httpError(rsp, err)
127117
return
128118
}
129119
case HTTPErrorOnError:
130-
http.Error(w, "An error has occurred during metrics gathering:\n\n"+err.Error(), http.StatusInternalServerError)
120+
httpError(rsp, err)
131121
return
132122
}
133123
}
134124

135125
contentType := expfmt.Negotiate(req.Header)
136-
buf := getBuf()
137-
defer giveBuf(buf)
138-
writer, encoding := decorateWriter(req, buf, opts.DisableCompression)
139-
enc := expfmt.NewEncoder(writer, contentType)
126+
header := rsp.Header()
127+
header.Set(contentTypeHeader, string(contentType))
128+
129+
w := io.Writer(rsp)
130+
if !opts.DisableCompression && gzipAccepted(req.Header) {
131+
header.Set(contentEncodingHeader, "gzip")
132+
gz := gzipPool.Get().(*gzip.Writer)
133+
defer gzipPool.Put(gz)
134+
135+
gz.Reset(w)
136+
defer gz.Close()
137+
138+
w = gz
139+
}
140+
141+
enc := expfmt.NewEncoder(w, contentType)
142+
140143
var lastErr error
141144
for _, mf := range mfs {
142145
if err := enc.Encode(mf); err != nil {
143146
lastErr = err
144147
if opts.ErrorLog != nil {
145-
opts.ErrorLog.Println("error encoding metric family:", err)
148+
opts.ErrorLog.Println("error encoding and sending metric family:", err)
146149
}
147150
switch opts.ErrorHandling {
148151
case PanicOnError:
149152
panic(err)
150153
case ContinueOnError:
151154
// Handled later.
152155
case HTTPErrorOnError:
153-
http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError)
156+
httpError(rsp, err)
154157
return
155158
}
156159
}
157160
}
158-
if closer, ok := writer.(io.Closer); ok {
159-
closer.Close()
160-
}
161-
if lastErr != nil && buf.Len() == 0 {
162-
http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError)
163-
return
164-
}
165-
header := w.Header()
166-
header.Set(contentTypeHeader, string(contentType))
167-
header.Set(contentLengthHeader, fmt.Sprint(buf.Len()))
168-
if encoding != "" {
169-
header.Set(contentEncodingHeader, encoding)
170-
}
171-
if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil {
172-
opts.ErrorLog.Println("error while sending encoded metrics:", err)
161+
162+
if lastErr != nil {
163+
httpError(rsp, lastErr)
173164
}
174-
// TODO(beorn7): Consider streaming serving of metrics.
175165
})
176166

177167
if opts.Timeout <= 0 {
@@ -292,20 +282,30 @@ type HandlerOpts struct {
292282
Timeout time.Duration
293283
}
294284

295-
// decorateWriter wraps a writer to handle gzip compression if requested. It
296-
// returns the decorated writer and the appropriate "Content-Encoding" header
297-
// (which is empty if no compression is enabled).
298-
func decorateWriter(request *http.Request, writer io.Writer, compressionDisabled bool) (io.Writer, string) {
299-
if compressionDisabled {
300-
return writer, ""
301-
}
302-
header := request.Header.Get(acceptEncodingHeader)
303-
parts := strings.Split(header, ",")
285+
// gzipAccepted returns whether the client will accept gzip-encoded content.
286+
func gzipAccepted(header http.Header) bool {
287+
a := header.Get(acceptEncodingHeader)
288+
parts := strings.Split(a, ",")
304289
for _, part := range parts {
305290
part = strings.TrimSpace(part)
306291
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
307-
return gzip.NewWriter(writer), "gzip"
292+
return true
308293
}
309294
}
310-
return writer, ""
295+
return false
296+
}
297+
298+
// httpError removes any content-encoding header and then calls http.Error with
299+
// the provided error and http.StatusInternalServerErrer. Error contents is
300+
// supposed to be uncompressed plain text. However, same as with a plain
301+
// http.Error, any header settings will be void if the header has already been
302+
// sent. The error message will still be written to the writer, but it will
303+
// probably be of limited use.
304+
func httpError(rsp http.ResponseWriter, err error) {
305+
rsp.Header().Del(contentEncodingHeader)
306+
http.Error(
307+
rsp,
308+
"An error has occurred while serving metrics:\n\n"+err.Error(),
309+
http.StatusInternalServerError,
310+
)
311311
}

prometheus/promhttp/http_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestHandlerErrorHandling(t *testing.T) {
103103
})
104104
wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
105105
`
106-
wantErrorBody := `An error has occurred during metrics gathering:
106+
wantErrorBody := `An error has occurred while serving metrics:
107107
108108
error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
109109
`

prometheus/registry_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ metric: <
250250
},
251251
}
252252

253-
expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred during metrics gathering:
253+
expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred while serving metrics:
254254
255255
collected metric "name" { label:<name:"constname" value:"\377" > label:<name:"labelname" value:"different_val" > counter:<value:42 > } has a label named "constname" whose value is not utf8: "\xff"
256256
`)
@@ -299,15 +299,15 @@ complex_bucket 1
299299
},
300300
},
301301
}
302-
bucketCollisionMsg := []byte(`An error has occurred during metrics gathering:
302+
bucketCollisionMsg := []byte(`An error has occurred while serving metrics:
303303
304304
collected metric named "complex_bucket" collides with previously collected histogram named "complex"
305305
`)
306-
summaryCountCollisionMsg := []byte(`An error has occurred during metrics gathering:
306+
summaryCountCollisionMsg := []byte(`An error has occurred while serving metrics:
307307
308308
collected metric named "complex_count" collides with previously collected summary named "complex"
309309
`)
310-
histogramCountCollisionMsg := []byte(`An error has occurred during metrics gathering:
310+
histogramCountCollisionMsg := []byte(`An error has occurred while serving metrics:
311311
312312
collected metric named "complex_count" collides with previously collected histogram named "complex"
313313
`)
@@ -333,7 +333,7 @@ collected metric named "complex_count" collides with previously collected histog
333333
},
334334
},
335335
}
336-
duplicateLabelMsg := []byte(`An error has occurred during metrics gathering:
336+
duplicateLabelMsg := []byte(`An error has occurred while serving metrics:
337337
338338
collected metric "broken_metric" { label:<name:"foo" value:"bar" > label:<name:"foo" value:"baz" > counter:<value:2.7 > } has two or more labels with the same name: foo
339339
`)

0 commit comments

Comments
 (0)