From 47797d701b0a9c8a6bd4d00cfb18918b05bcfa23 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 4 Sep 2024 12:29:22 +0100 Subject: [PATCH 1/2] Adjust vheap sizes for message handling processes in OTP 27 OTP 27 reset all assumptions on how the vm reacts to processes that buffer and process a lot of large binaries. Substantially increasing the vheap sizes for such process restores most of the same performance by allowing processes to hold more binary data before major garbage collections are triggered. This introduces a new module to capture process flag configurations. The new vheap sizes are only applied when running on OTP 27 or above. (cherry picked from commit 465b19e8e80362e7462222d6a20a65dbe0f4d386) --- deps/rabbit/app.bzl | 3 +++ deps/rabbit/src/rabbit_amqp_session.erl | 2 +- deps/rabbit/src/rabbit_channel.erl | 15 ++++++++++++ deps/rabbit/src/rabbit_process_flag.erl | 32 +++++++++++++++++++++++++ deps/rabbit/src/rabbit_ra_systems.erl | 13 +++++++++- deps/rabbit_common/src/code_version.erl | 1 + moduleindex.yaml | 1 + 7 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 deps/rabbit/src/rabbit_process_flag.erl diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 2f282740260d..6010c6d2b7d0 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -192,6 +192,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_prelaunch_logging.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", + "src/rabbit_process_flag.erl", "src/rabbit_queue_consumers.erl", "src/rabbit_queue_decorator.erl", "src/rabbit_queue_index.erl", @@ -448,6 +449,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_prelaunch_logging.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", + "src/rabbit_process_flag.erl", "src/rabbit_queue_consumers.erl", "src/rabbit_queue_decorator.erl", "src/rabbit_queue_index.erl", @@ -727,6 +729,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_prelaunch_logging.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", + "src/rabbit_process_flag.erl", "src/rabbit_queue_consumers.erl", "src/rabbit_queue_decorator.erl", "src/rabbit_queue_index.erl", diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 0c459b08a76b..0b228040f5f3 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -385,7 +385,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, outgoing_window = ?UINT(RemoteOutgoingWindow), handle_max = ClientHandleMax}}) -> process_flag(trap_exit, true), - process_flag(message_queue_data, off_heap), + rabbit_process_flag:adjust_for_message_handling_proc(), ok = pg:join(pg_scope(), self(), self()), Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 5272991f5d2c..d1b1811aff1b 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -484,6 +484,8 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), + rabbit_process_flag:adjust_for_message_handling_proc(), + ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), @@ -2785,3 +2787,16 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) -> is_global_qos_permitted() -> rabbit_deprecated_features:is_permitted(global_qos). + +adjust_vheap() -> + case code_version:get_otp_version() of + OtpMaj when OtpMaj >= 27 -> + %% 46422 is the default min_bin_vheap_size and for OTP 27 and above + %% we want to substantially increase it for processes that may buffer + %% messages. 32x has proven workable in testing whilst not being + %% rediculously large + process_flag(min_bin_vheap_size, 46422 * 32); + _ -> + ok + end. + diff --git a/deps/rabbit/src/rabbit_process_flag.erl b/deps/rabbit/src/rabbit_process_flag.erl new file mode 100644 index 000000000000..32c8f1562579 --- /dev/null +++ b/deps/rabbit/src/rabbit_process_flag.erl @@ -0,0 +1,32 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_process_flag). + + +-export([adjust_for_message_handling_proc/0 + ]). + +%% @doc Enqueues a message. +%% Adjust process flags for processes that handle RabbitMQ messages. +%% For example any process that uses the `rabbit_queue_type' module +%% may benefit from this tuning. +%% @returns `ok' +-spec adjust_for_message_handling_proc() -> ok. +adjust_for_message_handling_proc() -> + process_flag(message_queue_data, off_heap), + case code_version:get_otp_version() of + OtpMaj when OtpMaj >= 27 -> + %% 46422 is the default min_bin_vheap_size and for OTP 27 and above + %% we want to substantially increase it for processes that may buffer + %% messages. 32x has proven workable in testing whilst not being + %% rediculously large + process_flag(min_bin_vheap_size, 46422 * 32), + ok; + _ -> + ok + end. diff --git a/deps/rabbit/src/rabbit_ra_systems.erl b/deps/rabbit/src/rabbit_ra_systems.erl index 08e15ecb53ba..033c76132522 100644 --- a/deps/rabbit/src/rabbit_ra_systems.erl +++ b/deps/rabbit/src/rabbit_ra_systems.erl @@ -24,6 +24,9 @@ -define(COORD_WAL_MAX_SIZE_B, 64_000_000). -define(QUORUM_AER_MAX_RPC_SIZE, 16). -define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000). +%% the default min bin vheap value in OTP 26 +-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422). +-define(MIN_BIN_VHEAP_SIZE_MULT, 64). -spec setup() -> ok | no_return(). @@ -107,7 +110,6 @@ ensure_ra_system_started(RaSystem) -> end. -spec get_config(ra_system_name()) -> ra_system:config(). - get_config(quorum_queues = RaSystem) -> DefaultConfig = get_default_config(), Checksums = application:get_env(rabbit, quorum_compute_checksums, true), @@ -124,7 +126,16 @@ get_config(quorum_queues = RaSystem) -> AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size, ?QUORUM_AER_MAX_RPC_SIZE), CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true), + MinBinVheapSize = case code_version:get_otp_version() of + OtpMaj when OtpMaj >= 27 -> + ?MIN_BIN_VHEAP_SIZE_DEFAULT * ?MIN_BIN_VHEAP_SIZE_MULT; + _ -> + ?MIN_BIN_VHEAP_SIZE_DEFAULT + end, + DefaultConfig#{name => RaSystem, + wal_min_bin_vheap_size => MinBinVheapSize, + server_min_bin_vheap_size => MinBinVheapSize, default_max_append_entries_rpc_batch_size => AERBatchSize, wal_compute_checksums => WalChecksums, wal_max_entries => WalMaxEntries, diff --git a/deps/rabbit_common/src/code_version.erl b/deps/rabbit_common/src/code_version.erl index 568a6e7c439a..af90f73d941f 100644 --- a/deps/rabbit_common/src/code_version.erl +++ b/deps/rabbit_common/src/code_version.erl @@ -116,6 +116,7 @@ get_forms(Code) -> throw({no_abstract_code, Reason}) end. +-spec get_otp_version() -> non_neg_integer(). get_otp_version() -> Version = erlang:system_info(otp_release), case re:run(Version, "^[0-9][0-9]", [{capture, first, list}]) of diff --git a/moduleindex.yaml b/moduleindex.yaml index 02f800fcd252..13d2c98adcc2 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -694,6 +694,7 @@ rabbit: - rabbit_prelaunch_logging - rabbit_priority_queue - rabbit_process +- rabbit_process_flag - rabbit_queue_consumers - rabbit_queue_decorator - rabbit_queue_index From b20f7b85f24b5bbdd4e10fe079205ece9cb077ef Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Tue, 10 Sep 2024 11:09:31 +0200 Subject: [PATCH 2/2] Remove redundant copy of adjust_for_message_handling_proc/0 (cherry picked from commit 6a7f8d0d1e9280c1ea9c210b50423785d86a0ba2) --- deps/rabbit/src/rabbit_channel.erl | 13 ------------- deps/rabbit/src/rabbit_process_flag.erl | 3 +-- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index d1b1811aff1b..287a4d8838b0 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2787,16 +2787,3 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) -> is_global_qos_permitted() -> rabbit_deprecated_features:is_permitted(global_qos). - -adjust_vheap() -> - case code_version:get_otp_version() of - OtpMaj when OtpMaj >= 27 -> - %% 46422 is the default min_bin_vheap_size and for OTP 27 and above - %% we want to substantially increase it for processes that may buffer - %% messages. 32x has proven workable in testing whilst not being - %% rediculously large - process_flag(min_bin_vheap_size, 46422 * 32); - _ -> - ok - end. - diff --git a/deps/rabbit/src/rabbit_process_flag.erl b/deps/rabbit/src/rabbit_process_flag.erl index 32c8f1562579..fc74c25f554e 100644 --- a/deps/rabbit/src/rabbit_process_flag.erl +++ b/deps/rabbit/src/rabbit_process_flag.erl @@ -11,7 +11,6 @@ -export([adjust_for_message_handling_proc/0 ]). -%% @doc Enqueues a message. %% Adjust process flags for processes that handle RabbitMQ messages. %% For example any process that uses the `rabbit_queue_type' module %% may benefit from this tuning. @@ -24,7 +23,7 @@ adjust_for_message_handling_proc() -> %% 46422 is the default min_bin_vheap_size and for OTP 27 and above %% we want to substantially increase it for processes that may buffer %% messages. 32x has proven workable in testing whilst not being - %% rediculously large + %% ridiculously large process_flag(min_bin_vheap_size, 46422 * 32), ok; _ ->