Skip to content

Commit 62361fc

Browse files
author
beorn7
committed
Use streaming encoding of metrics
Signed-off-by: beorn7 <[email protected]>
1 parent 752f50d commit 62361fc

File tree

3 files changed

+46
-66
lines changed

3 files changed

+46
-66
lines changed

prometheus/promhttp/http.go

Lines changed: 40 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
package promhttp
3333

3434
import (
35-
"bytes"
3635
"compress/gzip"
3736
"fmt"
3837
"io"
@@ -59,12 +58,6 @@ var gzipPool = sync.Pool{
5958
},
6059
}
6160

62-
var bufPool = sync.Pool{
63-
New: func() interface{} {
64-
return &bytes.Buffer{}
65-
},
66-
}
67-
6861
// Handler returns an http.Handler for the prometheus.DefaultGatherer, using
6962
// default HandlerOpts, i.e. it reports the first error as an HTTP error, it has
7063
// no error logging, and it applies compression if requested by the client.
@@ -97,13 +90,13 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
9790
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
9891
}
9992

100-
h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
93+
h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
10194
if inFlightSem != nil {
10295
select {
10396
case inFlightSem <- struct{}{}: // All good, carry on.
10497
defer func() { <-inFlightSem }()
10598
default:
106-
http.Error(w, fmt.Sprintf(
99+
http.Error(rsp, fmt.Sprintf(
107100
"Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight,
108101
), http.StatusServiceUnavailable)
109102
return
@@ -119,63 +112,56 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
119112
panic(err)
120113
case ContinueOnError:
121114
if len(mfs) == 0 {
122-
http.Error(w, "No metrics gathered, last error:\n\n"+err.Error(), http.StatusInternalServerError)
115+
// Still report the error if no metrics have been gathered.
116+
httpError(rsp, err)
123117
return
124118
}
125119
case HTTPErrorOnError:
126-
http.Error(w, "An error has occurred during metrics gathering:\n\n"+err.Error(), http.StatusInternalServerError)
120+
httpError(rsp, err)
127121
return
128122
}
129123
}
130124

131125
contentType := expfmt.Negotiate(req.Header)
132-
buf := bufPool.Get().(*bytes.Buffer)
133-
buf.Reset()
134-
enc := expfmt.NewEncoder(buf, contentType)
126+
header := rsp.Header()
127+
header.Set(contentTypeHeader, string(contentType))
135128

136-
defer bufPool.Put(buf)
129+
w := io.Writer(rsp)
130+
if !opts.DisableCompression && gzipAccepted(req.Header) {
131+
header.Set(contentEncodingHeader, "gzip")
132+
gz := gzipPool.Get().(*gzip.Writer)
133+
defer gzipPool.Put(gz)
134+
135+
gz.Reset(w)
136+
defer gz.Close()
137+
138+
w = gz
139+
}
140+
141+
enc := expfmt.NewEncoder(w, contentType)
137142

138143
var lastErr error
139144
for _, mf := range mfs {
140145
if err := enc.Encode(mf); err != nil {
141146
lastErr = err
142147
if opts.ErrorLog != nil {
143-
opts.ErrorLog.Println("error encoding metric family:", err)
148+
opts.ErrorLog.Println("error encoding and sending metric family:", err)
144149
}
145150
switch opts.ErrorHandling {
146151
case PanicOnError:
147152
panic(err)
148153
case ContinueOnError:
149154
// Handled later.
150155
case HTTPErrorOnError:
151-
http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError)
156+
httpError(rsp, err)
152157
return
153158
}
154159
}
155160
}
156161

157-
if lastErr != nil && buf.Len() == 0 {
158-
http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError)
159-
return
162+
if lastErr != nil {
163+
httpError(rsp, lastErr)
160164
}
161-
header := w.Header()
162-
header.Set(contentTypeHeader, string(contentType))
163-
header.Set(contentLengthHeader, fmt.Sprint(buf.Len()))
164-
165-
if !opts.DisableCompression && gzipAccepted(req.Header) {
166-
header.Set(contentEncodingHeader, "gzip")
167-
gz := gzipPool.Get().(*gzip.Writer)
168-
defer gzipPool.Put(gz)
169-
170-
gz.Reset(w)
171-
defer gz.Close()
172-
173-
zipWriter := gzipResponseWriter{gz, w}
174-
writeResult(zipWriter, buf, opts)
175-
return
176-
}
177-
writeResult(w, buf, opts)
178-
// TODO(beorn7): Consider streaming serving of metrics.
179165
})
180166

181167
if opts.Timeout <= 0 {
@@ -296,29 +282,8 @@ type HandlerOpts struct {
296282
Timeout time.Duration
297283
}
298284

299-
// gzipResponseWriter in charge of wrapping io.Writer and http.ReponseWriter
300-
// together, allowing to get a single struct which implements both interface.
301-
type gzipResponseWriter struct {
302-
io.Writer
303-
http.ResponseWriter
304-
}
305-
306-
func (w gzipResponseWriter) Write(b []byte) (int, error) {
307-
return w.Writer.Write(b)
308-
}
309-
310-
// writeResult to buf using http.ResponseWriter.
311-
// If ErrorLog is enabled, err is logged in.
312-
func writeResult(w http.ResponseWriter, buf *bytes.Buffer, opts HandlerOpts) {
313-
if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil {
314-
opts.ErrorLog.Println("error while sending encoded metrics:", err)
315-
}
316-
}
317-
318-
// gzipHandler return a http.HandlerFunc in charge of compressing the content
319-
// of the given http.HandlerFunc
285+
// gzipAccepted returns whether the client will accept gzip-encoded content.
320286
func gzipAccepted(header http.Header) bool {
321-
322287
a := header.Get(acceptEncodingHeader)
323288
parts := strings.Split(a, ",")
324289
for _, part := range parts {
@@ -329,3 +294,18 @@ func gzipAccepted(header http.Header) bool {
329294
}
330295
return false
331296
}
297+
298+
// httpError removes any content-encoding header and then calls http.Error with
299+
// the provided error and http.StatusInternalServerErrer. Error contents is
300+
// supposed to be uncompressed plain text. However, same as with a plain
301+
// http.Error, any header settings will be void if the header has already been
302+
// sent. The error message will still be written to the writer, but it will
303+
// probably be of limited use.
304+
func httpError(rsp http.ResponseWriter, err error) {
305+
rsp.Header().Del(contentEncodingHeader)
306+
http.Error(
307+
rsp,
308+
"An error has occurred while serving metrics:\n\n"+err.Error(),
309+
http.StatusInternalServerError,
310+
)
311+
}

prometheus/promhttp/http_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestHandlerErrorHandling(t *testing.T) {
103103
})
104104
wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
105105
`
106-
wantErrorBody := `An error has occurred during metrics gathering:
106+
wantErrorBody := `An error has occurred while serving metrics:
107107
108108
error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
109109
`

prometheus/registry_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ metric: <
250250
},
251251
}
252252

253-
expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred during metrics gathering:
253+
expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred while serving metrics:
254254
255255
collected metric "name" { label:<name:"constname" value:"\377" > label:<name:"labelname" value:"different_val" > counter:<value:42 > } has a label named "constname" whose value is not utf8: "\xff"
256256
`)
@@ -299,15 +299,15 @@ complex_bucket 1
299299
},
300300
},
301301
}
302-
bucketCollisionMsg := []byte(`An error has occurred during metrics gathering:
302+
bucketCollisionMsg := []byte(`An error has occurred while serving metrics:
303303
304304
collected metric named "complex_bucket" collides with previously collected histogram named "complex"
305305
`)
306-
summaryCountCollisionMsg := []byte(`An error has occurred during metrics gathering:
306+
summaryCountCollisionMsg := []byte(`An error has occurred while serving metrics:
307307
308308
collected metric named "complex_count" collides with previously collected summary named "complex"
309309
`)
310-
histogramCountCollisionMsg := []byte(`An error has occurred during metrics gathering:
310+
histogramCountCollisionMsg := []byte(`An error has occurred while serving metrics:
311311
312312
collected metric named "complex_count" collides with previously collected histogram named "complex"
313313
`)
@@ -333,7 +333,7 @@ collected metric named "complex_count" collides with previously collected histog
333333
},
334334
},
335335
}
336-
duplicateLabelMsg := []byte(`An error has occurred during metrics gathering:
336+
duplicateLabelMsg := []byte(`An error has occurred while serving metrics:
337337
338338
collected metric "broken_metric" { label:<name:"foo" value:"bar" > label:<name:"foo" value:"baz" > counter:<value:2.7 > } has two or more labels with the same name: foo
339339
`)

0 commit comments

Comments
 (0)