diff --git a/MODULE.bazel b/MODULE.bazel
index a5fced655768..4747107940b1 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -253,8 +253,8 @@ erlang_package.hex_package(
name = "ra",
build_file = "@rabbitmq-server//bazel:BUILD.ra",
pkg = "ra",
- sha256 = "7cdf7894f1f542aeaa3d9e6f3209aab6efe9a1cdd1d81de9587c3ea23629b0e3",
- version = "2.16.0",
+ sha256 = "fd32a9b0a4b253b073b90dd996456e524347951d39f0b572d78178188491e6d4",
+ version = "2.16.1",
)
erlang_package.git_package(
diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema
index 756cf03ce676..16e12ece625a 100644
--- a/deps/rabbit/priv/schema/rabbit.schema
+++ b/deps/rabbit/priv/schema/rabbit.schema
@@ -1370,6 +1370,13 @@ fun(Conf) ->
end
end}.
+%% Register node during cluster formation when backend supports registration.
+%%
+
+{mapping, "cluster_formation.registration.enabled", "rabbit.cluster_formation.perform_registration", [
+ {datatype, {enum, [true, false]}}
+]}.
+
%% Cluster formation: lock acquisition retries as passed to https://erlang.org/doc/man/global.html#set_lock-3
%%
%% Currently used in classic, k8s, and aws peer discovery backends.
diff --git a/deps/rabbit/src/rabbit_amqp_filtex.erl b/deps/rabbit/src/rabbit_amqp_filtex.erl
index a1e346d30fa1..327457125822 100644
--- a/deps/rabbit/src/rabbit_amqp_filtex.erl
+++ b/deps/rabbit/src/rabbit_amqp_filtex.erl
@@ -13,18 +13,24 @@
-export([validate/1,
filter/2]).
+%% "Impose a limit on the complexity of each filter expression."
+%% [filtex-v1.0-wd09 7.1]
+-define(MAX_FILTER_FIELDS, 16).
+
-type simple_type() :: number() | binary() | atom().
-type affix() :: {suffix, non_neg_integer(), binary()} |
{prefix, non_neg_integer(), binary()}.
-type filter_expression_value() :: simple_type() | affix().
--type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}]} |
- {application_properties, [{binary(), filter_expression_value()}]}.
+-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}, ...]} |
+ {application_properties, [{binary(), filter_expression_value()}, ...]}.
-type filter_expressions() :: [filter_expression()].
-export_type([filter_expressions/0]).
-spec validate(tuple()) ->
{ok, filter_expression()} | error.
-validate({described, Descriptor, {map, KVList}}) ->
+validate({described, Descriptor, {map, KVList}})
+ when KVList =/= [] andalso
+ length(KVList) =< ?MAX_FILTER_FIELDS ->
try validate0(Descriptor, KVList)
catch throw:{?MODULE, _, _} ->
error
@@ -108,14 +114,12 @@ match_simple_type(RefVal, Val) ->
RefVal == Val.
validate0(Descriptor, KVList) when
- (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse
- Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER}) andalso
- KVList =/= [] ->
+ Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse
+ Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER} ->
validate_props(KVList, []);
validate0(Descriptor, KVList) when
- (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse
- Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso
- KVList =/= [] ->
+ Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse
+ Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER} ->
validate_app_props(KVList, []);
validate0(_, _) ->
error.
diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl
index 60050fcccdf3..efb97a6d3532 100644
--- a/deps/rabbit/src/rabbit_khepri.erl
+++ b/deps/rabbit/src/rabbit_khepri.erl
@@ -286,7 +286,7 @@ setup(_) ->
retry_timeout() ->
case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of
{ok, T} -> T;
- undefined -> 30000
+ undefined -> 300_000
end.
retry_limit() ->
diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl
index 56400d3569e5..7019a6160a24 100644
--- a/deps/rabbit/src/rabbit_peer_discovery.erl
+++ b/deps/rabbit/src/rabbit_peer_discovery.erl
@@ -46,6 +46,9 @@
%% a new cluster as a virgin node
-define(DEFAULT_NODE_TYPE, disc).
+%% Register node by default (with the backends that support registration)
+-define(PERFORM_REGISTRATION_BY_DEFAULT, true).
+
%% default node prefix to attach to discovered hostnames
-define(DEFAULT_PREFIX, "rabbit").
@@ -82,6 +85,16 @@ node_type() ->
?DEFAULT_NODE_TYPE
end.
+-spec should_perform_registration() -> true | false.
+
+should_perform_registration() ->
+ case application:get_env(rabbit, cluster_formation) of
+ {ok, Proplist} ->
+ proplists:get_value(perform_registration, Proplist, ?PERFORM_REGISTRATION_BY_DEFAULT);
+ undefined ->
+ ?PERFORM_REGISTRATION_BY_DEFAULT
+ end.
+
-spec lock_acquisition_failure_mode() -> ignore | fail.
lock_acquisition_failure_mode() ->
@@ -968,18 +981,26 @@ error_description({invalid_cluster_node_type, BadType}) ->
-spec maybe_register() -> ok.
maybe_register() ->
- Backend = persistent_term:get(?PT_PEER_DISC_BACKEND, backend()),
- case Backend:supports_registration() of
+ case should_perform_registration() of
true ->
- ?LOG_DEBUG(
- "Peer discovery: registering this node",
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- register(Backend),
- _ = Backend:post_registration(),
- ok;
+ Backend = persistent_term:get(?PT_PEER_DISC_BACKEND, backend()),
+ case Backend:supports_registration() of
+ true ->
+ ?LOG_DEBUG(
+ "Peer discovery: registering this node",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ register(Backend),
+ _ = Backend:post_registration(),
+ ok;
+ false ->
+ ?LOG_DEBUG(
+ "Peer discovery: registration is not supported, skipping it",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ ok
+ end;
false ->
?LOG_DEBUG(
- "Peer discovery: registration unsupported, skipping register",
+ "Peer discovery: registration is disabled via configuration, skipping it",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end.
@@ -987,16 +1008,24 @@ maybe_register() ->
-spec maybe_unregister() -> ok.
maybe_unregister() ->
- Backend = persistent_term:get(?PT_PEER_DISC_BACKEND),
- case Backend:supports_registration() of
+ case should_perform_registration() of
true ->
- ?LOG_DEBUG(
- "Peer discovery: unregistering this node",
- #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
- unregister(Backend);
+ Backend = persistent_term:get(?PT_PEER_DISC_BACKEND),
+ case Backend:supports_registration() of
+ true ->
+ ?LOG_DEBUG(
+ "Peer discovery: unregistering this node",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ unregister(Backend);
+ false ->
+ ?LOG_DEBUG(
+ "Peer discovery: registration is not supported, skipping unregistration",
+ #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
+ ok
+ end;
false ->
?LOG_DEBUG(
- "Peer discovery: registration unsupported, skipping unregister",
+ "Peer discovery: registration is disabled via configuration, skipping unregistration",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end.
diff --git a/deps/rabbit/test/amqp_filtex_SUITE.erl b/deps/rabbit/test/amqp_filtex_SUITE.erl
index 75f8528da9ca..2d4f34bd1883 100644
--- a/deps/rabbit/test/amqp_filtex_SUITE.erl
+++ b/deps/rabbit/test/amqp_filtex_SUITE.erl
@@ -377,6 +377,39 @@ application_properties_section(Config) ->
?assertEqual([<<"m4">>], amqp10_msg:body(R4M4)),
ok = detach_link_sync(Receiver4),
+ %% Complex filter (too many properties to filter on) should fail validation in the server.
+ %% RabbitMQ should exclude this filter in its reply attach frame because
+ %% "the sending endpoint [RabbitMQ] sets the filter actually in place".
+ %% Hence, no filter expression is actually in place and we should receive all messages.
+ AppPropsFilter5 = [{{utf8, integer_to_binary(N)}, {uint, 1}} ||
+ N <- lists:seq(1, 17)],
+ Filter5 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
+ ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter5}},
+ {ok, Receiver5} = amqp10_client:attach_receiver_link(
+ Session, <<"receiver 5">>, Address,
+ unsettled, configuration, Filter5),
+ receive {amqp10_event,
+ {link, Receiver5,
+ {attached, #'v1_0.attach'{
+ source = #'v1_0.source'{filter = {map, ActualFilter5}}}}}} ->
+ ?assertMatch([{{symbol,<<"rabbitmq:stream-offset-spec">>}, _}],
+ ActualFilter5)
+ after 30000 -> ct:fail({missing_event, ?LINE})
+ end,
+ {ok, R5M1} = amqp10_client:get_msg(Receiver5),
+ {ok, R5M2} = amqp10_client:get_msg(Receiver5),
+ {ok, R5M3} = amqp10_client:get_msg(Receiver5),
+ {ok, R5M4} = amqp10_client:get_msg(Receiver5),
+ ok = amqp10_client:accept_msg(Receiver5, R5M1),
+ ok = amqp10_client:accept_msg(Receiver5, R5M2),
+ ok = amqp10_client:accept_msg(Receiver5, R5M3),
+ ok = amqp10_client:accept_msg(Receiver5, R5M4),
+ ?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)),
+ ?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)),
+ ?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)),
+ ?assertEqual([<<"m4">>], amqp10_msg:body(R5M4)),
+ ok = detach_link_sync(Receiver5),
+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
diff --git a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml b/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml
index 1cf102431b64..e40b72b44099 100644
--- a/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml
+++ b/deps/rabbit/test/amqp_system_SUITE_data/java-tests/pom.xml
@@ -9,10 +9,10 @@
https://www.rabbitmq.com
5.10.2
- 2.5.0
+ 2.6.1
1.2.13
2.43.0
- 1.17.0
+ 1.25.2
3.12.1
3.2.5
diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
index ec706686466b..9cf52a41dbf5 100644
--- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
+++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
@@ -310,6 +310,25 @@ cluster_formation.dns.hostname = discovery.eng.example.local",
]}],
[]},
+ %% registration is enabled by default for the backends that support it
+ {cluster_formation_explicitly_enable_of_registration,
+ "cluster_formation.registration.enabled = true",
+ [{rabbit,
+ [{cluster_formation, [
+ {perform_registration, true}
+ ]}]
+ }],
+ []},
+
+ {cluster_formation_opt_out_of_registration,
+ "cluster_formation.registration.enabled = false",
+ [{rabbit,
+ [{cluster_formation, [
+ {perform_registration, false}
+ ]}]
+ }],
+ []},
+
{tcp_listen_options,
"tcp_listen_options.backlog = 128
tcp_listen_options.nodelay = true
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index a62f0622fa3d..ed970043cd83 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -50,7 +50,7 @@ dep_khepri = hex 0.16.0
dep_khepri_mnesia_migration = hex 0.7.1
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.5
dep_prometheus = hex 4.11.0
-dep_ra = hex 2.16.0
+dep_ra = hex 2.16.1
dep_ranch = hex 2.1.0
dep_recon = hex 2.5.6
dep_redbug = hex 2.0.7