Skip to content

Commit 295ff3a

Browse files
committed
Adjust vheap sizes for message handling processes in OTP 27
OTP 27 reset all assumptions on how the vm reacts to processes that buffer and process a lot of large binaries. Substantially increasing the vheap sizes for such process restores most of the same performance by allowing processes to hold more binary data before major garbage collections are triggered. This introduces a new module to capture process flag configurations. The new vheap sizes are only applied when running on OTP 27 or above.
1 parent 7baff37 commit 295ff3a

File tree

7 files changed

+65
-2
lines changed

7 files changed

+65
-2
lines changed

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ def all_beam_files(name = "all_beam_files"):
192192
"src/rabbit_prelaunch_logging.erl",
193193
"src/rabbit_priority_queue.erl",
194194
"src/rabbit_process.erl",
195+
"src/rabbit_process_flag.erl",
195196
"src/rabbit_queue_consumers.erl",
196197
"src/rabbit_queue_decorator.erl",
197198
"src/rabbit_queue_index.erl",
@@ -448,6 +449,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
448449
"src/rabbit_prelaunch_logging.erl",
449450
"src/rabbit_priority_queue.erl",
450451
"src/rabbit_process.erl",
452+
"src/rabbit_process_flag.erl",
451453
"src/rabbit_queue_consumers.erl",
452454
"src/rabbit_queue_decorator.erl",
453455
"src/rabbit_queue_index.erl",
@@ -727,6 +729,7 @@ def all_srcs(name = "all_srcs"):
727729
"src/rabbit_prelaunch_logging.erl",
728730
"src/rabbit_priority_queue.erl",
729731
"src/rabbit_process.erl",
732+
"src/rabbit_process_flag.erl",
730733
"src/rabbit_queue_consumers.erl",
731734
"src/rabbit_queue_decorator.erl",
732735
"src/rabbit_queue_index.erl",

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
384384
outgoing_window = ?UINT(RemoteOutgoingWindow),
385385
handle_max = ClientHandleMax}}) ->
386386
process_flag(trap_exit, true),
387-
process_flag(message_queue_data, off_heap),
387+
rabbit_process_flag:adjust_for_message_handling_proc(),
388388

389389
ok = pg:join(pg_scope(), self(), self()),
390390
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,8 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
485485
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
486486
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
487487
process_flag(trap_exit, true),
488+
rabbit_process_flag:adjust_for_message_handling_proc(),
489+
488490
?LG_PROCESS_TYPE(channel),
489491
?store_proc_name({ConnName, Channel}),
490492
ok = pg_local:join(rabbit_channels, self()),
@@ -2784,3 +2786,16 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
27842786

27852787
is_global_qos_permitted() ->
27862788
rabbit_deprecated_features:is_permitted(global_qos).
2789+
2790+
adjust_vheap() ->
2791+
case code_version:get_otp_version() of
2792+
OtpMaj when OtpMaj >= 27 ->
2793+
%% 46422 is the default min_bin_vheap_size and for OTP 27 and above
2794+
%% we want to substantially increase it for processes that may buffer
2795+
%% messages. 32x has proven workable in testing whilst not being
2796+
%% rediculously large
2797+
process_flag(min_bin_vheap_size, 46422 * 32);
2798+
_ ->
2799+
ok
2800+
end.
2801+
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_process_flag).
9+
10+
11+
-export([adjust_for_message_handling_proc/0
12+
]).
13+
14+
%% @doc Enqueues a message.
15+
%% Adjust process flags for processes that handle RabbitMQ messages.
16+
%% For example any process that uses the `rabbit_queue_type' module
17+
%% may benefit from this tuning.
18+
%% @returns `ok'
19+
-spec adjust_for_message_handling_proc() -> ok.
20+
adjust_for_message_handling_proc() ->
21+
process_flag(message_queue_data, off_heap),
22+
case code_version:get_otp_version() of
23+
OtpMaj when OtpMaj >= 27 ->
24+
%% 46422 is the default min_bin_vheap_size and for OTP 27 and above
25+
%% we want to substantially increase it for processes that may buffer
26+
%% messages. 32x has proven workable in testing whilst not being
27+
%% rediculously large
28+
process_flag(min_bin_vheap_size, 46422 * 32),
29+
ok;
30+
_ ->
31+
ok
32+
end.

deps/rabbit/src/rabbit_ra_systems.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
-define(COORD_WAL_MAX_SIZE_B, 64_000_000).
2525
-define(QUORUM_AER_MAX_RPC_SIZE, 16).
2626
-define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000).
27+
%% the default min bin vheap value in OTP 26
28+
-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422).
29+
-define(MIN_BIN_VHEAP_SIZE_MULT, 64).
2730

2831
-spec setup() -> ok | no_return().
2932

@@ -107,7 +110,6 @@ ensure_ra_system_started(RaSystem) ->
107110
end.
108111

109112
-spec get_config(ra_system_name()) -> ra_system:config().
110-
111113
get_config(quorum_queues = RaSystem) ->
112114
DefaultConfig = get_default_config(),
113115
Checksums = application:get_env(rabbit, quorum_compute_checksums, true),
@@ -124,7 +126,16 @@ get_config(quorum_queues = RaSystem) ->
124126
AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size,
125127
?QUORUM_AER_MAX_RPC_SIZE),
126128
CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true),
129+
MinBinVheapSize = case code_version:get_otp_version() of
130+
OtpMaj when OtpMaj >= 27 ->
131+
?MIN_BIN_VHEAP_SIZE_DEFAULT * ?MIN_BIN_VHEAP_SIZE_MULT;
132+
_ ->
133+
?MIN_BIN_VHEAP_SIZE_DEFAULT
134+
end,
135+
127136
DefaultConfig#{name => RaSystem,
137+
wal_min_bin_vheap_size => MinBinVheapSize,
138+
server_min_bin_vheap_size => MinBinVheapSize,
128139
default_max_append_entries_rpc_batch_size => AERBatchSize,
129140
wal_compute_checksums => WalChecksums,
130141
wal_max_entries => WalMaxEntries,

deps/rabbit_common/src/code_version.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ get_forms(Code) ->
116116
throw({no_abstract_code, Reason})
117117
end.
118118

119+
-spec get_otp_version() -> non_neg_integer().
119120
get_otp_version() ->
120121
Version = erlang:system_info(otp_release),
121122
case re:run(Version, "^[0-9][0-9]", [{capture, first, list}]) of

moduleindex.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,7 @@ rabbit:
694694
- rabbit_prelaunch_logging
695695
- rabbit_priority_queue
696696
- rabbit_process
697+
- rabbit_process_flag
697698
- rabbit_queue_consumers
698699
- rabbit_queue_decorator
699700
- rabbit_queue_index

0 commit comments

Comments
 (0)