Skip to content

Commit 7de28f3

Browse files
committed
rabbit_prometheus_handler: Stream identity-encoded HTTP replies
`prometheus_text_format:format/1` produces a binary of the format for the entire registry. For clusters with many resources, this can lead to large replies from `/metrics/[:registry]` especially for large registries like `per-object`. Instead of formatting the response and then sending it, we can stream the response by taking advantage of the new `format_into/3` callback (which needs to be added upstream to the `prometheus` dep). This uses `cowboy_req:stream_body/3` to stream the iodata as `prometheus` works through the registry. This should hopefully be a nice memory improvement. The other benefit is that results are sent eagerly. For a stress-testing example, 1. `make run-broker` 2. `rabbitmqctl import_definitions path/to/100k-classic-queues.json` 3. `curl -s localhost:15692/metrics/per-object` Before this change `curl` would wait for around 8 seconds and then the entire response would arrive. With this change the results start streaming in immediately.
1 parent fd04424 commit 7de28f3

File tree

2 files changed

+43
-43
lines changed

2 files changed

+43
-43
lines changed

deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -89,47 +89,61 @@ gen_response(_, Request) ->
8989
Request.
9090

9191
gen_metrics_response(Registry, Request) ->
92-
{Code, RespHeaders, Body} = reply(Registry, Request),
93-
94-
Headers = to_cowboy_headers(RespHeaders),
95-
cowboy_req:reply(Code, maps:from_list(Headers), Body, Request).
96-
97-
to_cowboy_headers(RespHeaders) ->
98-
lists:map(fun to_cowboy_headers_/1, RespHeaders).
99-
100-
to_cowboy_headers_({Name, Value}) ->
101-
{to_cowboy_name(Name), Value}.
102-
103-
to_cowboy_name(Name) ->
104-
binary:replace(atom_to_binary(Name, utf8), <<"_">>, <<"-">>).
92+
reply(Registry, Request).
10593

10694
reply(Registry, Request) ->
10795
case validate_registry(Registry, registry()) of
10896
{true, RealRegistry} ->
10997
format_metrics(Request, RealRegistry);
11098
{registry_conflict, _ReqR, _ConfR} ->
111-
{409, [], <<>>}
99+
cowboy_req:reply(409, #{}, <<>>, Request)
112100
end.
113101

114102
format_metrics(Request, Registry) ->
103+
%% Formatting registries produces large binaries. Fullsweep eagerly to
104+
%% evict the large binaries faster and make GC cheaper.
105+
process_flag(fullsweep_after, 0),
115106
AcceptEncoding = cowboy_req:header(<<"accept-encoding">>, Request, undefined),
116107
ContentType = prometheus_text_format:content_type(),
117-
Scrape = render_format(ContentType, Registry),
118108
Encoding = accept_encoding_header:negotiate(AcceptEncoding, [<<"identity">>,
119109
<<"gzip">>]),
120-
encode_format(ContentType, binary_to_list(Encoding), Scrape, Registry).
121-
122-
render_format(ContentType, Registry) ->
123-
Scrape = prometheus_summary:observe_duration(
124-
Registry,
125-
?SCRAPE_DURATION,
126-
[Registry, ContentType],
127-
fun () -> prometheus_text_format:format(Registry) end),
128-
prometheus_summary:observe(Registry,
129-
?SCRAPE_SIZE,
130-
[Registry, ContentType],
131-
iolist_size(Scrape)),
132-
Scrape.
110+
Headers = #{<<"content-type">> => ContentType,
111+
<<"content-encoding">> => Encoding},
112+
case Encoding of
113+
<<"gzip">> ->
114+
Scrape = prometheus_summary:observe_duration(
115+
Registry,
116+
?SCRAPE_DURATION,
117+
[Registry, ContentType],
118+
fun () -> prometheus_text_format:format(Registry) end),
119+
prometheus_summary:observe(Registry,
120+
?SCRAPE_SIZE,
121+
[Registry, ContentType],
122+
iolist_size(Scrape)),
123+
Encoded = zlib:gzip(Scrape),
124+
prometheus_summary:observe(telemetry_registry(),
125+
?SCRAPE_ENCODED_SIZE,
126+
[Registry, ContentType, Encoding],
127+
iolist_size(Encoded)),
128+
cowboy_req:reply(200, Headers, Encoded, Request);
129+
<<"identity">> ->
130+
Req = cowboy_req:stream_reply(200, Headers, Request),
131+
Fmt = fun(Data, Size) ->
132+
cowboy_req:stream_body(Data, nofin, Req),
133+
Size + iolist_size(Data)
134+
end,
135+
Size = prometheus_summary:observe_duration(
136+
Registry,
137+
?SCRAPE_DURATION,
138+
[Registry, ContentType],
139+
fun () -> prometheus_text_format:format_into(Registry, 0, Fmt) end),
140+
cowboy_req:stream_body(<<>>, fin, Req),
141+
prometheus_summary:observe(Registry,
142+
?SCRAPE_SIZE,
143+
[Registry, ContentType],
144+
Size),
145+
Req
146+
end.
133147

134148
validate_registry(undefined, auto) ->
135149
{true, default};
@@ -146,20 +160,6 @@ telemetry_registry() ->
146160
registry() ->
147161
application:get_env(rabbitmq_prometheus, registry, auto).
148162

149-
encode_format(ContentType, Encoding, Scrape, Registry) ->
150-
Encoded = encode_format_(Encoding, Scrape),
151-
prometheus_summary:observe(telemetry_registry(),
152-
?SCRAPE_ENCODED_SIZE,
153-
[Registry, ContentType, Encoding],
154-
iolist_size(Encoded)),
155-
{200, [{content_type, binary_to_list(ContentType)},
156-
{content_encoding, Encoding}], Encoded}.
157-
158-
encode_format_("gzip", Scrape) ->
159-
zlib:gzip(Scrape);
160-
encode_format_("identity", Scrape) ->
161-
Scrape.
162-
163163
%% It's not easy to pass this information in a pure way (it'll require changing prometheus.erl)
164164
put_filtering_options_into_process_dictionary(Request) ->
165165
#{vhost := VHosts, family := Families} = cowboy_req:match_qs([{vhost, [], undefined}, {family, [], undefined}], Request),

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
5252
dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1
53-
dep_prometheus = hex 5.1.1
53+
dep_prometheus = git https://github.com/the-mikedavis/prometheus.erl ceb4c9fe14ede2876f87d4997c9825cdfbf07759
5454
dep_ra = hex 2.17.1
5555
dep_ranch = hex 2.2.0
5656
dep_recon = hex 2.5.6

0 commit comments

Comments
 (0)