Skip to content

Commit 00aa894

Browse files
committed
(3.13!) HTTP API: Gradually encode and send /queues response as JSON
NOTE this is currently branched off of `3.13.x` and should eventually target `main` instead. This change adds a helper to `rabbit_mgmt_util` which streams the reply instead of relying on the standard `cowboy_rest` response tuple which sets the response body in one go. For a sizable broker with many many queues, querying the `/api/queues` endpoint for all queues causes a significant memory spike on invocation. Spiky workloads are generally hard to handle for any system, garbage collectors included. The hope of this patch is to drop memory usage by gradually converting queue metrics to JSON and sending them in small chunks rather than converting the entire response to a large term (albeit `iodata()`) at once. TODO can we also run the augmentation gradually?
1 parent ed23a7d commit 00aa894

File tree

2 files changed

+82
-2
lines changed

2 files changed

+82
-2
lines changed

deps/rabbitmq_management/src/rabbit_mgmt_util.erl

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
-export([with_decode/4, not_found/3]).
2525
-export([with_channel/4, with_channel/5]).
2626
-export([props_to_method/2, props_to_method/4]).
27-
-export([all_or_one_vhost/2, reply/3, responder_map/1,
27+
-export([all_or_one_vhost/2, reply/3, stream_reply/5, responder_map/1,
2828
filter_vhost/3]).
2929
-export([filter_conn_ch_list/3, filter_user/2, list_login_vhosts/2,
3030
list_login_vhosts_names/2]).
@@ -307,6 +307,81 @@ is_pagination_requested(undefined) ->
307307
is_pagination_requested(#pagination{}) ->
308308
true.
309309

310+
-spec stream_reply(StatusCode, ChunkSize, Facts, ReqData, Context) -> ReqData1
311+
when
312+
StatusCode :: cowboy:http_status(),
313+
ChunkSize :: pos_integer(),
314+
Facts :: term(),
315+
ReqData :: cowboy_req:req(),
316+
Context :: term(),
317+
ReqData1 :: cowboy_req:req().
318+
%% @doc Uses the Cowboy response streaming API to gradually encode the `Facts'
319+
%% into JSON and send on the connection.
320+
%%
321+
%% This helper can be used instead of `rabbit_mgmt_util:reply/3' when `Facts'
322+
%% could be large: a list with more than `ChunkSize' elements.
323+
%%
324+
%% If `Facts' is not a large list, this function delegates to `reply/3'.
325+
326+
stream_reply(StatusCode, ChunkSize, Facts, ReqData, Context)
327+
when is_integer(ChunkSize) ->
328+
case maps:get(media_type, ReqData, undefined) of
329+
%% Only application/json can be streamed currently.
330+
{<<"application">>, <<"json">>, _} when is_list(Facts) andalso
331+
length(Facts) > ChunkSize ->
332+
stream_json_reply(StatusCode, ChunkSize, Facts, ReqData, Context);
333+
_ ->
334+
reply(Facts, ReqData, Context)
335+
end.
336+
337+
stream_json_reply(StatusCode, ChunkSize, Facts, ReqData, Context)
338+
when is_list(Facts) ->
339+
Facts1 = rabbit_mgmt_format:format_nulls(Facts),
340+
ReqData1 = rabbit_mgmt_util:set_resp_header(<<"cache-control">>, "no-cache", ReqData),
341+
try
342+
ReqData2 = cowboy_req:stream_reply(StatusCode, ReqData1),
343+
ok = cowboy_req:stream_body("[", nofin, ReqData2),
344+
ok = stream_json_reply(Facts1, ChunkSize, ReqData2),
345+
{stop, ReqData2, Context}
346+
catch exit:{json_encode, E} ->
347+
Error = iolist_to_binary(
348+
io_lib:format("JSON encode error: ~tp", [E])),
349+
Reason = iolist_to_binary(
350+
io_lib:format("While encoding: ~n~tp", [Facts])),
351+
internal_server_error(Error, Reason, ReqData1, Context)
352+
end.
353+
354+
stream_json_reply([], _ChunkSize, ReqData) ->
355+
ReqData;
356+
stream_json_reply(Items, ChunkSize, ReqData) ->
357+
{Chunk, Rest} = try
358+
lists:split(ChunkSize, Items)
359+
catch
360+
%% `lists:split/2' fails with `badarg' if the list is
361+
%% shorter than `ChunkSize'.
362+
error:badarg ->
363+
{Items, []}
364+
end,
365+
EncodedChunk = json_encode_separated(Chunk),
366+
case Rest of
367+
[] ->
368+
ok = cowboy_req:stream_body([EncodedChunk | "]"], fin, ReqData);
369+
_ ->
370+
ok = cowboy_req:stream_body([EncodedChunk | ","], nofin, ReqData),
371+
stream_json_reply(Rest, ChunkSize, ReqData)
372+
end.
373+
374+
json_encode_separated(Items) ->
375+
json_encode_separated(Items, []).
376+
377+
json_encode_separated([] = _Items, IoData) ->
378+
lists:reverse(IoData);
379+
json_encode_separated([Item | Rest], []) ->
380+
json_encode_separated(Rest, [rabbit_json:encode(Item)]);
381+
json_encode_separated([Item | Rest], IoData) ->
382+
IoData1 = [rabbit_json:encode(Item), "," | IoData],
383+
json_encode_separated(Rest, IoData1).
384+
310385

311386
with_valid_pagination(ReqData, Context, Fun) ->
312387
try

deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929

3030
-define(DEFAULT_SORT, ["vhost", "name"]).
3131

32+
%% The number of queues to encode as JSON and send using Cowboy's response
33+
%% streaming API. TODO: tune this number according to performance tests.
34+
-define(RESPONSE_STREAM_CHUNK_SIZE, 1024).
35+
3236
%%--------------------------------------------------------------------
3337

3438
init(Req, State) ->
@@ -57,7 +61,8 @@ to_json(ReqData, {Mode, Context}) ->
5761
Data = rabbit_mgmt_util:augment_resources(Basic, ?DEFAULT_SORT,
5862
?BASIC_COLUMNS, ReqData,
5963
Context, augment(Mode)),
60-
rabbit_mgmt_util:reply(Data, ReqData, {Mode, Context})
64+
rabbit_mgmt_util:stream_reply(
65+
200, ?RESPONSE_STREAM_CHUNK_SIZE, Data, ReqData, {Mode, Context})
6166
catch
6267
{error, invalid_range_parameters, Reason} ->
6368
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData,

0 commit comments

Comments
 (0)