From 8355bc691e1b67100e9015d1b16a54574ee547cc Mon Sep 17 00:00:00 2001 From: Frederik Bosch Date: Sat, 1 Feb 2025 13:36:22 +0100 Subject: [PATCH 1/7] add option to disable registration of node during cluster formation --- deps/rabbit/priv/schema/rabbit.schema | 7 +++ deps/rabbit/src/rabbit_peer_discovery.erl | 61 +++++++++++++++++------ 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 756cf03ce676..2c121c0646b6 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1353,6 +1353,13 @@ end}. {datatype, {enum, [disc, disk, ram]}} ]}. +%% Register node during cluster formation. +%% + +{mapping, "cluster_formation.registration", "rabbit.cluster_formation.register", [ + {datatype, {enum, [true, false]}} +]}. + {translation, "rabbit.cluster_formation.node_type", fun(Conf) -> %% if peer discovery backend isn't configured, don't generate diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index 56400d3569e5..db97b142f587 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -46,6 +46,9 @@ %% a new cluster as a virgin node -define(DEFAULT_NODE_TYPE, disc). +%% register node by default +-define(DEFAULT_REGISTRATION, true). + %% default node prefix to attach to discovered hostnames -define(DEFAULT_PREFIX, "rabbit"). @@ -82,6 +85,16 @@ node_type() -> ?DEFAULT_NODE_TYPE end. +-spec registration() -> true | false. + +registration() -> + case application:get_env(rabbit, cluster_formation) of + {ok, Proplist} -> + proplists:get_value(registration, Proplist, ?DEFAULT_REGISTRATION); + undefined -> + ?DEFAULT_REGISTRATION + end. + -spec lock_acquisition_failure_mode() -> ignore | fail. lock_acquisition_failure_mode() -> @@ -968,18 +981,26 @@ error_description({invalid_cluster_node_type, BadType}) -> -spec maybe_register() -> ok. maybe_register() -> - Backend = persistent_term:get(?PT_PEER_DISC_BACKEND, backend()), - case Backend:supports_registration() of + case registration() of true -> - ?LOG_DEBUG( - "Peer discovery: registering this node", - #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - register(Backend), - _ = Backend:post_registration(), - ok; + Backend = persistent_term:get(?PT_PEER_DISC_BACKEND, backend()), + case Backend:supports_registration() of + true -> + ?LOG_DEBUG( + "Peer discovery: registering this node", + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + register(Backend), + _ = Backend:post_registration(), + ok; + false -> + ?LOG_DEBUG( + "Peer discovery: registration unsupported, skipping register", + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + ok + end. false -> ?LOG_DEBUG( - "Peer discovery: registration unsupported, skipping register", + "Peer discovery: registration disabled, skipping register", #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), ok end. @@ -987,16 +1008,24 @@ maybe_register() -> -spec maybe_unregister() -> ok. maybe_unregister() -> - Backend = persistent_term:get(?PT_PEER_DISC_BACKEND), - case Backend:supports_registration() of + case registration() of true -> - ?LOG_DEBUG( - "Peer discovery: unregistering this node", - #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - unregister(Backend); + Backend = persistent_term:get(?PT_PEER_DISC_BACKEND), + case Backend:supports_registration() of + true -> + ?LOG_DEBUG( + "Peer discovery: unregistering this node", + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + unregister(Backend); + false -> + ?LOG_DEBUG( + "Peer discovery: registration unsupported, skipping unregister", + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + ok + end. false -> ?LOG_DEBUG( - "Peer discovery: registration unsupported, skipping unregister", + "Peer discovery: registration disabled, skipping unregister", #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), ok end. From 481ffb2d6cd3b1bff1dcd65c3960bc6af19462f5 Mon Sep 17 00:00:00 2001 From: Frederik Bosch Date: Sat, 1 Feb 2025 14:09:03 +0100 Subject: [PATCH 2/7] add option to disable registration of node during cluster formation --- deps/rabbit/priv/schema/rabbit.schema | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 2c121c0646b6..0f1888abfba3 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1353,13 +1353,6 @@ end}. {datatype, {enum, [disc, disk, ram]}} ]}. -%% Register node during cluster formation. -%% - -{mapping, "cluster_formation.registration", "rabbit.cluster_formation.register", [ - {datatype, {enum, [true, false]}} -]}. - {translation, "rabbit.cluster_formation.node_type", fun(Conf) -> %% if peer discovery backend isn't configured, don't generate @@ -1377,6 +1370,13 @@ fun(Conf) -> end end}. +%% Register node during cluster formation when backend supports registration. +%% + +{mapping, "cluster_formation.registration", "rabbit.cluster_formation.register", [ + {datatype, {enum, [true, false]}} +]}. + %% Cluster formation: lock acquisition retries as passed to https://erlang.org/doc/man/global.html#set_lock-3 %% %% Currently used in classic, k8s, and aws peer discovery backends. From df424e5edfa1c57db9cec3896100e8ba16e51ea9 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 30 Jan 2025 21:19:22 -0500 Subject: [PATCH 3/7] Bump Khepri cluster formation timeout to match that used with Mnesia. In the case of Mnesia, there are 10 retries with a 30 second delay each. For Khepri, a single timeout is used, so it must be ten times as long. --- deps/rabbit/src/rabbit_khepri.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 60050fcccdf3..efb97a6d3532 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -286,7 +286,7 @@ setup(_) -> retry_timeout() -> case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of {ok, T} -> T; - undefined -> 30000 + undefined -> 300_000 end. retry_limit() -> From 4bdddc7a987e117cab31ef5b6da016ea535a228d Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 3 Feb 2025 10:38:22 +0100 Subject: [PATCH 4/7] Bump Qpid JMS AMQP 1.0 client --- deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml b/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml index 1cf102431b64..e40b72b44099 100644 --- a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml +++ b/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml @@ -9,10 +9,10 @@ https://www.rabbitmq.com 5.10.2 - 2.5.0 + 2.6.1 1.2.13 2.43.0 - 1.17.0 + 1.25.2 3.12.1 3.2.5 From 0b1cfc6f04af1c89765ea54a590f758dec1b6aef Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 3 Feb 2025 09:45:04 +0100 Subject: [PATCH 5/7] Impose limit on AMQP filter complexity As described in section 7.1 of filtex-v1.0-wd09: > Impose a limit on the complexity of each filter expression. Here, we hard code the maximum properties within a filter expression to 16. There should never be a use case requiring to filter on more than 16 different properties. --- deps/rabbit/src/rabbit_amqp_filtex.erl | 22 ++++++++++------- deps/rabbit/test/amqp_filtex_SUITE.erl | 33 ++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_filtex.erl b/deps/rabbit/src/rabbit_amqp_filtex.erl index a1e346d30fa1..327457125822 100644 --- a/deps/rabbit/src/rabbit_amqp_filtex.erl +++ b/deps/rabbit/src/rabbit_amqp_filtex.erl @@ -13,18 +13,24 @@ -export([validate/1, filter/2]). +%% "Impose a limit on the complexity of each filter expression." +%% [filtex-v1.0-wd09 7.1] +-define(MAX_FILTER_FIELDS, 16). + -type simple_type() :: number() | binary() | atom(). -type affix() :: {suffix, non_neg_integer(), binary()} | {prefix, non_neg_integer(), binary()}. -type filter_expression_value() :: simple_type() | affix(). --type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}]} | - {application_properties, [{binary(), filter_expression_value()}]}. +-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}, ...]} | + {application_properties, [{binary(), filter_expression_value()}, ...]}. -type filter_expressions() :: [filter_expression()]. -export_type([filter_expressions/0]). -spec validate(tuple()) -> {ok, filter_expression()} | error. -validate({described, Descriptor, {map, KVList}}) -> +validate({described, Descriptor, {map, KVList}}) + when KVList =/= [] andalso + length(KVList) =< ?MAX_FILTER_FIELDS -> try validate0(Descriptor, KVList) catch throw:{?MODULE, _, _} -> error @@ -108,14 +114,12 @@ match_simple_type(RefVal, Val) -> RefVal == Val. validate0(Descriptor, KVList) when - (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse - Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER}) andalso - KVList =/= [] -> + Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse + Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER} -> validate_props(KVList, []); validate0(Descriptor, KVList) when - (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse - Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso - KVList =/= [] -> + Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse + Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER} -> validate_app_props(KVList, []); validate0(_, _) -> error. diff --git a/deps/rabbit/test/amqp_filtex_SUITE.erl b/deps/rabbit/test/amqp_filtex_SUITE.erl index 75f8528da9ca..2d4f34bd1883 100644 --- a/deps/rabbit/test/amqp_filtex_SUITE.erl +++ b/deps/rabbit/test/amqp_filtex_SUITE.erl @@ -377,6 +377,39 @@ application_properties_section(Config) -> ?assertEqual([<<"m4">>], amqp10_msg:body(R4M4)), ok = detach_link_sync(Receiver4), + %% Complex filter (too many properties to filter on) should fail validation in the server. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + AppPropsFilter5 = [{{utf8, integer_to_binary(N)}, {uint, 1}} || + N <- lists:seq(1, 17)], + Filter5 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter5}}, + {ok, Receiver5} = amqp10_client:attach_receiver_link( + Session, <<"receiver 5">>, Address, + unsettled, configuration, Filter5), + receive {amqp10_event, + {link, Receiver5, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter5}}}}}} -> + ?assertMatch([{{symbol,<<"rabbitmq:stream-offset-spec">>}, _}], + ActualFilter5) + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, R5M1} = amqp10_client:get_msg(Receiver5), + {ok, R5M2} = amqp10_client:get_msg(Receiver5), + {ok, R5M3} = amqp10_client:get_msg(Receiver5), + {ok, R5M4} = amqp10_client:get_msg(Receiver5), + ok = amqp10_client:accept_msg(Receiver5, R5M1), + ok = amqp10_client:accept_msg(Receiver5, R5M2), + ok = amqp10_client:accept_msg(Receiver5, R5M3), + ok = amqp10_client:accept_msg(Receiver5, R5M4), + ?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R5M4)), + ok = detach_link_sync(Receiver5), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), From 600bb22939f98f0449def383b9256cde3be257dd Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 31 Jan 2025 14:41:29 +0000 Subject: [PATCH 6/7] Ra 2.16.1 Contains bug fix which would crash at-most-once dead lettering during node restarts. Less excessive debug logging around ra log. Fix issue that could make leader transfers take 5s+ to complete. --- MODULE.bazel | 4 ++-- rabbitmq-components.mk | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/MODULE.bazel b/MODULE.bazel index a5fced655768..4747107940b1 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -253,8 +253,8 @@ erlang_package.hex_package( name = "ra", build_file = "@rabbitmq-server//bazel:BUILD.ra", pkg = "ra", - sha256 = "7cdf7894f1f542aeaa3d9e6f3209aab6efe9a1cdd1d81de9587c3ea23629b0e3", - version = "2.16.0", + sha256 = "fd32a9b0a4b253b073b90dd996456e524347951d39f0b572d78178188491e6d4", + version = "2.16.1", ) erlang_package.git_package( diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index a62f0622fa3d..ed970043cd83 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -50,7 +50,7 @@ dep_khepri = hex 0.16.0 dep_khepri_mnesia_migration = hex 0.7.1 dep_osiris = git https://github.com/rabbitmq/osiris v1.8.5 dep_prometheus = hex 4.11.0 -dep_ra = hex 2.16.0 +dep_ra = hex 2.16.1 dep_ranch = hex 2.1.0 dep_recon = hex 2.5.6 dep_redbug = hex 2.0.7 From 269685dd6e27b88c000bc62d239918c27e22ebeb Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 3 Feb 2025 13:58:17 -0500 Subject: [PATCH 7/7] Make it possible to opt out of peer discovery registration for the backends that support it in the first place. When forming a cluster, registration of the node joining the cluster might be left to (container) orchestration tools like Nomad or Kubernetes. This PR add a new configuration option, 'cluster_formation.registration.enable', which defaults to true. When set to false node registration will be skipped. There is at least one important advantage using a tool such as Nomad (plus Consul) over the application (RabbitMQ) doing the registration. When the application is not stopped gracefully for any reason, e.g. its OOM killed, it cannot deregister the service/node. This leaves behind an unlinked service entry in the registry. This problem is fundamentally avoided by allowing Nomad (or similar tools) to register the node'service. See #11233 #11045 for prior discussions. Co-authored-by: Frederik Bosch --- deps/rabbit/priv/schema/rabbit.schema | 2 +- deps/rabbit/src/rabbit_peer_discovery.erl | 28 +++++++++---------- .../config_schema_SUITE_data/rabbit.snippets | 19 +++++++++++++ 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 0f1888abfba3..16e12ece625a 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1373,7 +1373,7 @@ end}. %% Register node during cluster formation when backend supports registration. %% -{mapping, "cluster_formation.registration", "rabbit.cluster_formation.register", [ +{mapping, "cluster_formation.registration.enabled", "rabbit.cluster_formation.perform_registration", [ {datatype, {enum, [true, false]}} ]}. diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index db97b142f587..7019a6160a24 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -46,8 +46,8 @@ %% a new cluster as a virgin node -define(DEFAULT_NODE_TYPE, disc). -%% register node by default --define(DEFAULT_REGISTRATION, true). +%% Register node by default (with the backends that support registration) +-define(PERFORM_REGISTRATION_BY_DEFAULT, true). %% default node prefix to attach to discovered hostnames -define(DEFAULT_PREFIX, "rabbit"). @@ -85,14 +85,14 @@ node_type() -> ?DEFAULT_NODE_TYPE end. --spec registration() -> true | false. +-spec should_perform_registration() -> true | false. -registration() -> +should_perform_registration() -> case application:get_env(rabbit, cluster_formation) of {ok, Proplist} -> - proplists:get_value(registration, Proplist, ?DEFAULT_REGISTRATION); + proplists:get_value(perform_registration, Proplist, ?PERFORM_REGISTRATION_BY_DEFAULT); undefined -> - ?DEFAULT_REGISTRATION + ?PERFORM_REGISTRATION_BY_DEFAULT end. -spec lock_acquisition_failure_mode() -> ignore | fail. @@ -981,7 +981,7 @@ error_description({invalid_cluster_node_type, BadType}) -> -spec maybe_register() -> ok. maybe_register() -> - case registration() of + case should_perform_registration() of true -> Backend = persistent_term:get(?PT_PEER_DISC_BACKEND, backend()), case Backend:supports_registration() of @@ -994,13 +994,13 @@ maybe_register() -> ok; false -> ?LOG_DEBUG( - "Peer discovery: registration unsupported, skipping register", + "Peer discovery: registration is not supported, skipping it", #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), ok - end. + end; false -> ?LOG_DEBUG( - "Peer discovery: registration disabled, skipping register", + "Peer discovery: registration is disabled via configuration, skipping it", #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), ok end. @@ -1008,7 +1008,7 @@ maybe_register() -> -spec maybe_unregister() -> ok. maybe_unregister() -> - case registration() of + case should_perform_registration() of true -> Backend = persistent_term:get(?PT_PEER_DISC_BACKEND), case Backend:supports_registration() of @@ -1019,13 +1019,13 @@ maybe_unregister() -> unregister(Backend); false -> ?LOG_DEBUG( - "Peer discovery: registration unsupported, skipping unregister", + "Peer discovery: registration is not supported, skipping unregistration", #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), ok - end. + end; false -> ?LOG_DEBUG( - "Peer discovery: registration disabled, skipping unregister", + "Peer discovery: registration is disabled via configuration, skipping unregistration", #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), ok end. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index ec706686466b..9cf52a41dbf5 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -310,6 +310,25 @@ cluster_formation.dns.hostname = discovery.eng.example.local", ]}], []}, + %% registration is enabled by default for the backends that support it + {cluster_formation_explicitly_enable_of_registration, + "cluster_formation.registration.enabled = true", + [{rabbit, + [{cluster_formation, [ + {perform_registration, true} + ]}] + }], + []}, + + {cluster_formation_opt_out_of_registration, + "cluster_formation.registration.enabled = false", + [{rabbit, + [{cluster_formation, [ + {perform_registration, false} + ]}] + }], + []}, + {tcp_listen_options, "tcp_listen_options.backlog = 128 tcp_listen_options.nodelay = true