Skip to content

Commit 988acf3

Browse files
committed
rabbit_prometheus_handler: Stream identity-encoded HTTP replies
`prometheus_text_format:format/1` produces a binary of the format for the entire registry. For clusters with many resources, this can lead to large replies from `/metrics/[:registry]` especially for large registries like `per-object`. Instead of formatting the response and then sending it, we can stream the response by taking advantage of the new `format_into/3` callback (which needs to be added upstream to the `prometheus` dep). This uses `cowboy_req:stream_body/3` to stream the iodata as `prometheus` works through the registry. This should hopefully be a nice memory improvement. The other benefit is that results are sent eagerly. For a stress-testing example, 1. `make run-broker` 2. `rabbitmqctl import_definitions path/to/100k-classic-queues.json` 3. `curl -s localhost:15692/metrics/per-object` Before this change `curl` would wait for around 8 seconds and then the entire response would arrive. With this change the results start streaming in immediately.
1 parent fd04424 commit 988acf3

File tree

2 files changed

+40
-43
lines changed

2 files changed

+40
-43
lines changed

deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -89,47 +89,58 @@ gen_response(_, Request) ->
8989
Request.
9090

9191
gen_metrics_response(Registry, Request) ->
92-
{Code, RespHeaders, Body} = reply(Registry, Request),
93-
94-
Headers = to_cowboy_headers(RespHeaders),
95-
cowboy_req:reply(Code, maps:from_list(Headers), Body, Request).
96-
97-
to_cowboy_headers(RespHeaders) ->
98-
lists:map(fun to_cowboy_headers_/1, RespHeaders).
99-
100-
to_cowboy_headers_({Name, Value}) ->
101-
{to_cowboy_name(Name), Value}.
102-
103-
to_cowboy_name(Name) ->
104-
binary:replace(atom_to_binary(Name, utf8), <<"_">>, <<"-">>).
92+
reply(Registry, Request).
10593

10694
reply(Registry, Request) ->
10795
case validate_registry(Registry, registry()) of
10896
{true, RealRegistry} ->
10997
format_metrics(Request, RealRegistry);
11098
{registry_conflict, _ReqR, _ConfR} ->
111-
{409, [], <<>>}
99+
cowboy_req:reply(409, #{}, <<>>, Request)
112100
end.
113101

114102
format_metrics(Request, Registry) ->
115103
AcceptEncoding = cowboy_req:header(<<"accept-encoding">>, Request, undefined),
116104
ContentType = prometheus_text_format:content_type(),
117-
Scrape = render_format(ContentType, Registry),
118105
Encoding = accept_encoding_header:negotiate(AcceptEncoding, [<<"identity">>,
119106
<<"gzip">>]),
120-
encode_format(ContentType, binary_to_list(Encoding), Scrape, Registry).
121-
122-
render_format(ContentType, Registry) ->
123-
Scrape = prometheus_summary:observe_duration(
124-
Registry,
125-
?SCRAPE_DURATION,
126-
[Registry, ContentType],
127-
fun () -> prometheus_text_format:format(Registry) end),
128-
prometheus_summary:observe(Registry,
129-
?SCRAPE_SIZE,
130-
[Registry, ContentType],
131-
iolist_size(Scrape)),
132-
Scrape.
107+
Headers = #{<<"content-type">> => ContentType,
108+
<<"content-encoding">> => Encoding},
109+
case Encoding of
110+
<<"gzip">> ->
111+
Scrape = prometheus_summary:observe_duration(
112+
Registry,
113+
?SCRAPE_DURATION,
114+
[Registry, ContentType],
115+
fun () -> prometheus_text_format:format(Registry) end),
116+
prometheus_summary:observe(Registry,
117+
?SCRAPE_SIZE,
118+
[Registry, ContentType],
119+
iolist_size(Scrape)),
120+
Encoded = zlib:gzip(Scrape),
121+
prometheus_summary:observe(telemetry_registry(),
122+
?SCRAPE_ENCODED_SIZE,
123+
[Registry, ContentType, Encoding],
124+
iolist_size(Encoded)),
125+
cowboy_req:reply(200, Headers, Encoded, Request);
126+
<<"identity">> ->
127+
Req = cowboy_req:stream_reply(200, Headers, Request),
128+
Fmt = fun(Data, Size) ->
129+
cowboy_req:stream_body(Data, nofin, Req),
130+
Size + iolist_size(Data)
131+
end,
132+
Size = prometheus_summary:observe_duration(
133+
Registry,
134+
?SCRAPE_DURATION,
135+
[Registry, ContentType],
136+
fun () -> prometheus_text_format:format_into(Registry, 0, Fmt) end),
137+
cowboy_req:stream_body(<<>>, fin, Req),
138+
prometheus_summary:observe(Registry,
139+
?SCRAPE_SIZE,
140+
[Registry, ContentType],
141+
Size),
142+
Req
143+
end.
133144

134145
validate_registry(undefined, auto) ->
135146
{true, default};
@@ -146,20 +157,6 @@ telemetry_registry() ->
146157
registry() ->
147158
application:get_env(rabbitmq_prometheus, registry, auto).
148159

149-
encode_format(ContentType, Encoding, Scrape, Registry) ->
150-
Encoded = encode_format_(Encoding, Scrape),
151-
prometheus_summary:observe(telemetry_registry(),
152-
?SCRAPE_ENCODED_SIZE,
153-
[Registry, ContentType, Encoding],
154-
iolist_size(Encoded)),
155-
{200, [{content_type, binary_to_list(ContentType)},
156-
{content_encoding, Encoding}], Encoded}.
157-
158-
encode_format_("gzip", Scrape) ->
159-
zlib:gzip(Scrape);
160-
encode_format_("identity", Scrape) ->
161-
Scrape.
162-
163160
%% It's not easy to pass this information in a pure way (it'll require changing prometheus.erl)
164161
put_filtering_options_into_process_dictionary(Request) ->
165162
#{vhost := VHosts, family := Families} = cowboy_req:match_qs([{vhost, [], undefined}, {family, [], undefined}], Request),

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
5252
dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1
53-
dep_prometheus = hex 5.1.1
53+
dep_prometheus = git https://github.com/the-mikedavis/prometheus.erl ceb4c9fe14ede2876f87d4997c9825cdfbf07759
5454
dep_ra = hex 2.17.1
5555
dep_ranch = hex 2.2.0
5656
dep_recon = hex 2.5.6

0 commit comments

Comments
 (0)