@@ -32,8 +32,6 @@ const (
3232 rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
3333 rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
3434 rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
35-
36- errMsgNotEnabledPRW2 = "Not enabled prometheus remote write v2 push request"
3735)
3836
3937// Func defines the type of the push. It is similar to http.HandlerFunc.
@@ -52,36 +50,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
5250 }
5351 }
5452
55- // follow Prometheus https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go
56- contentType := r .Header .Get ("Content-Type" )
57- if contentType == "" {
58- contentType = appProtoContentType
59- }
60-
61- msgType , err := parseProtoMsg (contentType )
62- if err != nil {
63- level .Error (logger ).Log ("Error decoding remote write request" , "err" , err )
64- http .Error (w , err .Error (), http .StatusUnsupportedMediaType )
65- return
66- }
67-
68- if msgType != config .RemoteWriteProtoMsgV1 && msgType != config .RemoteWriteProtoMsgV2 {
69- level .Error (logger ).Log ("Not accepted msg type" , "msgType" , msgType , "err" , err )
70- http .Error (w , err .Error (), http .StatusUnsupportedMediaType )
71- return
72- }
73-
74- enc := r .Header .Get ("Content-Encoding" )
75- if enc == "" {
76- } else if enc != string (remote .SnappyBlockCompression ) {
77- err := fmt .Errorf ("%v encoding (compression) is not accepted by this server; only %v is acceptable" , enc , remote .SnappyBlockCompression )
78- level .Error (logger ).Log ("Error decoding remote write request" , "err" , err )
79- http .Error (w , err .Error (), http .StatusUnsupportedMediaType )
80- return
81- }
82-
83- switch msgType {
84- case config .RemoteWriteProtoMsgV1 :
53+ handlePRW1 := func () {
8554 var req cortexpb.PreallocWriteRequest
8655 err := util .ParseProtoReader (ctx , r .Body , int (r .ContentLength ), maxRecvMsgSize , & req , util .RawSnappy )
8756 if err != nil {
@@ -108,55 +77,89 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
10877 }
10978 http .Error (w , string (resp .Body ), int (resp .Code ))
11079 }
111- case config .RemoteWriteProtoMsgV2 :
112- if remoteWrite2Enabled {
113- var req writev2.Request
114- err := util .ParseProtoReader (ctx , r .Body , int (r .ContentLength ), maxRecvMsgSize , & req , util .RawSnappy )
115- if err != nil {
116- level .Error (logger ).Log ("err" , err .Error ())
117- http .Error (w , err .Error (), http .StatusBadRequest )
118- return
119- }
80+ }
12081
121- v1Req , err := convertV2RequestToV1 (& req )
122- if err != nil {
123- level .Error (logger ).Log ("err" , err .Error ())
124- http .Error (w , err .Error (), http .StatusBadRequest )
125- return
126- }
82+ handlePRW2 := func () {
83+ var req writev2.Request
84+ err := util .ParseProtoReader (ctx , r .Body , int (r .ContentLength ), maxRecvMsgSize , & req , util .RawSnappy )
85+ if err != nil {
86+ level .Error (logger ).Log ("err" , err .Error ())
87+ http .Error (w , err .Error (), http .StatusBadRequest )
88+ return
89+ }
12790
128- v1Req .SkipLabelNameValidation = false
129- // Current source is only API
130- if v1Req .Source == 0 {
131- v1Req .Source = cortexpb .API
132- }
91+ v1Req , err := convertV2RequestToV1 (& req )
92+ if err != nil {
93+ level .Error (logger ).Log ("err" , err .Error ())
94+ http .Error (w , err .Error (), http .StatusBadRequest )
95+ return
96+ }
13397
134- if resp , err := push (ctx , & v1Req .WriteRequest ); err != nil {
135- resp , ok := httpgrpc .HTTPResponseFromError (err )
136- setHeader (w , 0 , 0 , 0 )
137- if ! ok {
138- http .Error (w , err .Error (), http .StatusInternalServerError )
139- return
140- }
141- if resp .GetCode ()/ 100 == 5 {
142- level .Error (logger ).Log ("msg" , "push error" , "err" , err )
143- } else if resp .GetCode () != http .StatusAccepted && resp .GetCode () != http .StatusTooManyRequests {
144- level .Warn (logger ).Log ("msg" , "push refused" , "err" , err )
145- }
146- http .Error (w , string (resp .Body ), int (resp .Code ))
147- } else {
148- setHeader (w , resp .Samples , resp .Histograms , resp .Exemplars )
98+ v1Req .SkipLabelNameValidation = false
99+ if v1Req .Source == 0 {
100+ v1Req .Source = cortexpb .API
101+ }
102+
103+ if resp , err := push (ctx , & v1Req .WriteRequest ); err != nil {
104+ resp , ok := httpgrpc .HTTPResponseFromError (err )
105+ setPRW2RespHeader (w , 0 , 0 , 0 )
106+ if ! ok {
107+ http .Error (w , err .Error (), http .StatusInternalServerError )
108+ return
149109 }
110+ if resp .GetCode ()/ 100 == 5 {
111+ level .Error (logger ).Log ("msg" , "push error" , "err" , err )
112+ } else if resp .GetCode () != http .StatusAccepted && resp .GetCode () != http .StatusTooManyRequests {
113+ level .Warn (logger ).Log ("msg" , "push refused" , "err" , err )
114+ }
115+ http .Error (w , string (resp .Body ), int (resp .Code ))
150116 } else {
151- level .Error (logger ).Log (errMsgNotEnabledPRW2 )
152- http .Error (w , errMsgNotEnabledPRW2 , http .StatusUnsupportedMediaType )
117+ setPRW2RespHeader (w , resp .Samples , resp .Histograms , resp .Exemplars )
118+ }
119+ }
120+
121+ if remoteWrite2Enabled {
122+ // follow Prometheus https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go
123+ contentType := r .Header .Get ("Content-Type" )
124+ if contentType == "" {
125+ contentType = appProtoContentType
126+ }
127+
128+ msgType , err := parseProtoMsg (contentType )
129+ if err != nil {
130+ level .Error (logger ).Log ("Error decoding remote write request" , "err" , err )
131+ http .Error (w , err .Error (), http .StatusUnsupportedMediaType )
153132 return
154133 }
134+
135+ if msgType != config .RemoteWriteProtoMsgV1 && msgType != config .RemoteWriteProtoMsgV2 {
136+ level .Error (logger ).Log ("Not accepted msg type" , "msgType" , msgType , "err" , err )
137+ http .Error (w , err .Error (), http .StatusUnsupportedMediaType )
138+ return
139+ }
140+
141+ enc := r .Header .Get ("Content-Encoding" )
142+ if enc == "" {
143+ } else if enc != string (remote .SnappyBlockCompression ) {
144+ err := fmt .Errorf ("%v encoding (compression) is not accepted by this server; only %v is acceptable" , enc , remote .SnappyBlockCompression )
145+ level .Error (logger ).Log ("Error decoding remote write request" , "err" , err )
146+ http .Error (w , err .Error (), http .StatusUnsupportedMediaType )
147+ return
148+ }
149+
150+ switch msgType {
151+ case config .RemoteWriteProtoMsgV1 :
152+ handlePRW1 ()
153+ case config .RemoteWriteProtoMsgV2 :
154+ handlePRW2 ()
155+ }
156+ } else {
157+ handlePRW1 ()
155158 }
156159 })
157160}
158161
159- func setHeader (w http.ResponseWriter , samples , histograms , exemplars int64 ) {
162+ func setPRW2RespHeader (w http.ResponseWriter , samples , histograms , exemplars int64 ) {
160163 w .Header ().Set (rw20WrittenSamplesHeader , strconv .FormatInt (samples , 10 ))
161164 w .Header ().Set (rw20WrittenHistogramsHeader , strconv .FormatInt (histograms , 10 ))
162165 w .Header ().Set (rw20WrittenExemplarsHeader , strconv .FormatInt (exemplars , 10 ))
0 commit comments