@@ -64,8 +64,7 @@ var defaultAPIOpts = &apiOpts{
64
64
Max : 10 * time .Second ,
65
65
MaxRetries : 10 ,
66
66
},
67
- client : http .DefaultClient ,
68
- // Hardcoded for now.
67
+ client : http .DefaultClient ,
69
68
retryOnRateLimit : true ,
70
69
compression : SnappyBlockCompression ,
71
70
path : "api/v1/write" ,
@@ -96,7 +95,7 @@ func WithAPIPath(path string) APIOption {
96
95
}
97
96
}
98
97
99
- // WithAPIRetryOnRateLimit returns APIOption that disables retrying on rate limit status code.
98
+ // WithAPINoRetryOnRateLimit returns APIOption that disables retrying on rate limit status code.
100
99
func WithAPINoRetryOnRateLimit () APIOption {
101
100
return func (o * apiOpts ) error {
102
101
o .retryOnRateLimit = false
@@ -373,45 +372,46 @@ type writeStorage interface {
373
372
// Other headers might be trimmed, depending on the configured middlewares
374
373
// e.g. a default SnappyMiddleware trims "Content-Encoding" and ensures that
375
374
// encoded body bytes are already decompressed.
376
- Store (ctx context. Context , msgType WriteMessageType , req * http. Request ) (_ * WriteResponse , _ error )
375
+ Store (req * http. Request , msgType WriteMessageType ) (_ * WriteResponse , _ error )
377
376
}
378
377
379
- type handler struct {
378
+ type writeHandler struct {
380
379
store writeStorage
381
380
acceptedMessageTypes MessageTypes
382
- opts handlerOpts
381
+ opts writeHandlerOpts
383
382
}
384
383
385
- type handlerOpts struct {
384
+ type writeHandlerOpts struct {
386
385
logger * slog.Logger
387
386
middlewares []func (http.Handler ) http.Handler
388
387
}
389
388
390
- // HandlerOption represents an option for the handler.
391
- type HandlerOption func (o * handlerOpts )
389
+ // WriteHandlerOption represents an option for the write handler.
390
+ type WriteHandlerOption func (o * writeHandlerOpts )
392
391
393
- // WithHandlerLogger returns HandlerOption that allows providing slog logger.
392
+ // WithWriteHandlerLogger returns WriteHandlerOption that allows providing slog logger.
394
393
// By default, nothing is logged.
395
- func WithHandlerLogger (logger * slog.Logger ) HandlerOption {
396
- return func (o * handlerOpts ) {
394
+ func WithWriteHandlerLogger (logger * slog.Logger ) WriteHandlerOption {
395
+ return func (o * writeHandlerOpts ) {
397
396
o .logger = logger
398
397
}
399
398
}
400
399
401
- // WithHandlerMiddleware returns HandlerOption that allows providing middlewares.
400
+ // WithWriteHandlerMiddlewares returns WriteHandlerOption that allows providing middlewares.
402
401
// Multiple middlewares can be provided and will be applied in the order they are passed.
403
- // When using this option, SnappyDecompressorMiddleware is not applied by default so
404
- // it (or any other decompression middleware) needs to be added explicitly.
405
- func WithHandlerMiddlewares (middlewares ... func (http.Handler ) http.Handler ) HandlerOption {
406
- return func (o * handlerOpts ) {
402
+ // This option replaces the default middlewares (SnappyDecompressorMiddleware), so if
403
+ // you want to have handler that works with the default Remote Write 2.0 protocol,
404
+ // SnappyDecompressorMiddleware (or any other decompression middleware) needs to be added explicitly.
405
+ func WithWriteHandlerMiddlewares (middlewares ... func (http.Handler ) http.Handler ) WriteHandlerOption {
406
+ return func (o * writeHandlerOpts ) {
407
407
o .middlewares = middlewares
408
408
}
409
409
}
410
410
411
- // SnappyDecompressorMiddleware returns a middleware that checks if the request body is snappy-encoded and decompresses it.
411
+ // SnappyDecodeMiddleware returns a middleware that checks if the request body is snappy-encoded and decompresses it.
412
412
// If the request body is not snappy-encoded, it returns an error.
413
- // Used by default in NewRemoteWriteHandler .
414
- func SnappyDecompressorMiddleware (logger * slog.Logger ) func (http.Handler ) http.Handler {
413
+ // Used by default in NewHandler .
414
+ func SnappyDecodeMiddleware (logger * slog.Logger ) func (http.Handler ) http.Handler {
415
415
bufPool := sync.Pool {
416
416
New : func () any {
417
417
return bytes .NewBuffer (nil )
@@ -455,18 +455,18 @@ func SnappyDecompressorMiddleware(logger *slog.Logger) func(http.Handler) http.H
455
455
}
456
456
}
457
457
458
- // NewHandler returns HTTP handler that receives Remote Write 2.0
458
+ // NewWriteHandler returns HTTP handler that receives Remote Write 2.0
459
459
// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/.
460
- func NewHandler (store writeStorage , acceptedMessageTypes MessageTypes , opts ... HandlerOption ) http.Handler {
461
- o := handlerOpts {
460
+ func NewWriteHandler (store writeStorage , acceptedMessageTypes MessageTypes , opts ... WriteHandlerOption ) http.Handler {
461
+ o := writeHandlerOpts {
462
462
logger : slog .New (nopSlogHandler {}),
463
- middlewares : []func (http.Handler ) http.Handler {SnappyDecompressorMiddleware (slog .New (nopSlogHandler {}))},
463
+ middlewares : []func (http.Handler ) http.Handler {SnappyDecodeMiddleware (slog .New (nopSlogHandler {}))},
464
464
}
465
465
for _ , opt := range opts {
466
466
opt (& o )
467
467
}
468
468
469
- h := & handler {
469
+ h := & writeHandler {
470
470
opts : o ,
471
471
store : store ,
472
472
acceptedMessageTypes : acceptedMessageTypes ,
@@ -513,7 +513,7 @@ func ParseProtoMsg(contentType string) (WriteMessageType, error) {
513
513
return WriteV1MessageType , nil
514
514
}
515
515
516
- func (h * handler ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
516
+ func (h * writeHandler ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
517
517
if r .Method != http .MethodPost {
518
518
http .Error (w , "Method not allowed" , http .StatusMethodNotAllowed )
519
519
return
@@ -538,20 +538,25 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
538
538
return
539
539
}
540
540
541
- writeResponse , storeErr := h .store .Store (r .Context (), msgType , r )
541
+ writeResponse , storeErr := h .store .Store (r , msgType )
542
+ if writeResponse == nil {
543
+ // User could forget to return write response; in this case we assume 0 samples
544
+ // were written.
545
+ writeResponse = NewWriteResponse ()
546
+ }
542
547
543
- // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases, alongwith any user-defined headers.
544
- writeResponse .SetHeaders (w )
548
+ // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases, along with any user-defined headers.
549
+ writeResponse .writeHeaders (w )
545
550
546
551
if storeErr != nil {
547
- if writeResponse .StatusCode () == 0 {
552
+ if writeResponse .statusCode == 0 {
548
553
writeResponse .SetStatusCode (http .StatusInternalServerError )
549
554
}
550
- if writeResponse .StatusCode () / 100 == 5 { // 5xx
551
- h .opts .logger .Error ("Error while storing the remote write request" , "err" , storeErr . Error () )
555
+ if writeResponse .statusCode / 100 == 5 { // 5xx
556
+ h .opts .logger .Error ("Error while storing the remote write request" , "err" , storeErr )
552
557
}
553
- http .Error (w , storeErr .Error (), writeResponse .StatusCode () )
558
+ http .Error (w , storeErr .Error (), writeResponse .statusCode )
554
559
return
555
560
}
556
- w .WriteHeader (writeResponse .StatusCode () )
561
+ w .WriteHeader (writeResponse .statusCode )
557
562
}
0 commit comments