From a2e4cababa01a9236a51312edb71eb84cddbd1ab Mon Sep 17 00:00:00 2001 From: David Ansari Date: Sat, 13 Sep 2025 14:12:18 +0200 Subject: [PATCH 1/6] Add test case for binding args Khepri regression This commit adds a test case for a regression/bug that occurs in Khepri. ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia ``` succeeds, but ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri ``` fails. The problem is that ETS table `rabbit_khepri_index_route` cannot differentiate between two bindings with different binding arguments, and therefore deletes entries too early, leading to wrong routing decisions. The solution to this bug is to include the binding arguments in the `rabbit_khepri_index_route` projection, similar to how the binding args are also included in the `rabbit_index_route` Mnesia table. This bug/regression is an edge case and exists if the source exchange type is `direct` or `fanout` and if different bindings arguments are used by client apps. Note that such binding arguments are entirely ignored when RabbitMQ performs routing decisions for the `direct` or `fanout` exchange. However, there might be client apps that use binding arguments to add some metadata to the binding, for example `app-id` or `user` or `purpose` and might use this metadata as a form of reference counting in deciding when to delete `auto-delete` exchanges or just for informational/operational purposes. --- deps/rabbit/test/bindings_SUITE.erl | 51 +++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 382251fb7b59..4cb69d49aa0a 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -37,6 +37,7 @@ groups() -> all_tests() -> [ %% Queue bindings + binding_args, bind_and_unbind, bind_and_delete, bind_and_delete_source_exchange, @@ -116,6 +117,56 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- %% Testcases. %% ------------------------------------------------------------------- + +binding_args(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + %% Create two bindings that differ only in their binding arguments. + Exchange = <<"amq.direct">>, + RoutingKey = <<"some-key">>, + BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}], + BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}], + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs2}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m1">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})), + + %% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding. + #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m2">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})). + bind_and_unbind(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), From 17244de390d2629d6b78a3c3bc7f9b4b1b369ff9 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 15 Sep 2025 09:46:41 +0200 Subject: [PATCH 2/6] Fix regression with Khepri binding args Fix #14533 --- deps/rabbit/src/rabbit_khepri.erl | 34 ++++----- deps/rabbit/test/bindings_SUITE.erl | 106 +++++++++++++++------------- 2 files changed, 69 insertions(+), 71 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 7dda47984c5a..4ce3f580e8d9 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -60,7 +60,7 @@ %% executed. If the migration runs concurrently, whether it started before or %% during the execution of the Mnesia-specific anonymous function, {@link %% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia -%% and will retry the Mnesia functino or run the Khepri function accordingly. +%% and will retry the Mnesia function or run the Khepri function accordingly. %% The Mnesia function must be idempotent because it can be executed multiple %% times. %% @@ -1415,19 +1415,13 @@ register_rabbit_bindings_projection() -> khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_index_route_projection() -> - MapFun = fun(Path, _) -> - { - VHost, - ExchangeName, - Kind, - DstName, - RoutingKey - } = rabbit_db_binding:khepri_route_path_to_args(Path), - Exchange = rabbit_misc:r(VHost, exchange, ExchangeName), - Destination = rabbit_misc:r(VHost, Kind, DstName), - SourceKey = {Exchange, RoutingKey}, - #index_route{source_key = SourceKey, - destination = Destination} + MapFun = fun(_Path, #binding{source = Source, + key = Key, + destination = Destination, + args = Args}) -> + #index_route{source_key = {Source, Key}, + destination = Destination, + args = Args} end, ProjectionFun = projection_fun_for_sets(MapFun), Options = #{type => bag, @@ -1435,14 +1429,14 @@ register_rabbit_index_route_projection() -> read_concurrency => true}, Projection = khepri_projection:new( rabbit_khepri_index_route, ProjectionFun, Options), - DirectOrFanout = #if_data_matches{ - pattern = #exchange{type = '$1', _ = '_'}, - conditions = [{'andalso', - {'=/=', '$1', headers}, - {'=/=', '$1', topic}}]}, + IgnoreHeadersAndTopic = #if_data_matches{ + pattern = #exchange{type = '$1', _ = '_'}, + conditions = [{'andalso', + {'=/=', '$1', headers}, + {'=/=', '$1', topic}}]}, PathPattern = rabbit_db_binding:khepri_route_path( _VHost = ?KHEPRI_WILDCARD_STAR, - _Exchange = DirectOrFanout, + _Exchange = IgnoreHeadersAndTopic, _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 4cb69d49aa0a..10765ba41263 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -13,7 +13,6 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile([nowarn_export_all, export_all]). --compile(export_all). suite() -> [{timetrap, 5 * 60000}]. @@ -37,7 +36,6 @@ groups() -> all_tests() -> [ %% Queue bindings - binding_args, bind_and_unbind, bind_and_delete, bind_and_delete_source_exchange, @@ -50,6 +48,7 @@ all_tests() -> list_with_multiple_vhosts, list_with_multiple_arguments, bind_to_unknown_queue, + binding_args, %% Exchange bindings bind_and_unbind_exchange, bind_and_delete_exchange_source, @@ -118,55 +117,6 @@ end_per_testcase(Testcase, Config) -> %% Testcases. %% ------------------------------------------------------------------- -binding_args(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - Q = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), - - #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), - amqp_channel:register_confirm_handler(Ch, self()), - - %% Create two bindings that differ only in their binding arguments. - Exchange = <<"amq.direct">>, - RoutingKey = <<"some-key">>, - BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}], - BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}], - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, - routing_key = RoutingKey, - queue = Q, - arguments = BindingArgs1}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, - routing_key = RoutingKey, - queue = Q, - arguments = BindingArgs2}), - ok = amqp_channel:cast(Ch, - #'basic.publish'{exchange = Exchange, - routing_key = RoutingKey}, - #amqp_msg{payload = <<"m1">>}), - receive #'basic.ack'{} -> ok - after 9000 -> ct:fail(confirm_timeout) - end, - - ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, - amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})), - - %% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding. - #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange, - routing_key = RoutingKey, - queue = Q, - arguments = BindingArgs1}), - ok = amqp_channel:cast(Ch, - #'basic.publish'{exchange = Exchange, - routing_key = RoutingKey}, - #amqp_msg{payload = <<"m2">>}), - receive #'basic.ack'{} -> ok - after 9000 -> ct:fail(confirm_timeout) - end, - - ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, - amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})). - bind_and_unbind(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -748,6 +698,60 @@ bind_to_unknown_queue(Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), ok. +binding_args(Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of + {skip, _} = Skip -> + Skip; + ok -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + %% Create two bindings that differ only in their binding arguments. + Exchange = <<"amq.direct">>, + RoutingKey = <<"some-key">>, + BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}], + BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}], + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs2}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m1">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})), + + %% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding. + #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m2">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})) + end. + bind_and_unbind_exchange(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), From 54c8537bdb89e013bf79f58ead9f85fef35f3504 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 15 Sep 2025 13:02:30 +0000 Subject: [PATCH 3/6] Speed up fanout exchange Resolves #14531 ## What? Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below). In addition to the fanout exchange, a similar speed up is achieved for the following exchange types: * modulus hash * random * recent history This applies only if Khepri is enabled. ## How? Use an additional routing table (projection) whose table key is the source exchange. Looking up the destinations happens then by an ETS table key. Prior to this commit, CPUs were busy compiling the same match spec for every incoming message. ## Benchmark 1. Start RabbitMQ: ``` make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management" ``` where `advanced.config` contains: ``` [ {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` 2. Create a queue and binding: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 ``` 3. Create the load ``` java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60 ``` Before this commit: ``` sending rate avg: 97394 msg/s receiving rate avg: 97394 msg/s ``` After this commit: ``` sending rate avg: 138677 msg/s receiving rate avg: 138677 msg/s ``` The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts: * 13.5% before this commit * 3.4% after this commit ## Downsides Additional ETS memory usage for the new projection table. However, the new table does not store any binding entries for the following source exchange types: * direct * headers * topic * x-local-random --- deps/rabbit/src/rabbit_core_ff.erl | 3 +- deps/rabbit/src/rabbit_db_binding.erl | 16 +-- deps/rabbit/src/rabbit_khepri.erl | 135 ++++++++++++++++++++------ deps/rabbit/test/bindings_SUITE.erl | 110 ++++++++++----------- deps/rabbit_common/include/rabbit.hrl | 1 + 5 files changed, 170 insertions(+), 95 deletions(-) diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 98eea8aba260..7f27f9f5544e 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -216,5 +216,6 @@ {'rabbitmq_4.2.0', #{desc => "Allows rolling upgrades to 4.2.x", stability => stable, - depends_on => ['rabbitmq_4.1.0'] + depends_on => ['rabbitmq_4.1.0'], + callbacks => #{enable => {rabbit_khepri, enable_feature_flag}} }}). diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index 0588a0cffd5a..a7852befa783 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -56,7 +56,8 @@ -define(MNESIA_REVERSE_TABLE, rabbit_reverse_route). -define(MNESIA_INDEX_TABLE, rabbit_index_route). -define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding). --define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route). +-define(KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, rabbit_khepri_route_by_source_key). +-define(KHEPRI_ROUTE_BY_SOURCE_PROJECTION, rabbit_khepri_route_by_source). %% ------------------------------------------------------------------- %% exists(). @@ -707,21 +708,14 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) -> end. match_routing_key_in_khepri(Src, ['_']) -> - try - MatchHead = #index_route{source_key = {Src, '_'}, - destination = '$1', - _ = '_'}, - ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION, [{MatchHead, [], ['$1']}]) - catch - error:badarg -> - [] - end; + ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION, + Src, #route_by_source.destination, []); match_routing_key_in_khepri(Src, RoutingKeys) -> lists:foldl( fun(RK, Acc) -> try Dst = ets:lookup_element( - ?KHEPRI_INDEX_ROUTE_PROJECTION, + ?KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, {Src, RK}, #index_route.destination), Dst ++ Acc diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 4ce3f580e8d9..f244f55635e1 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -170,6 +170,7 @@ %% equivalent cluster -export([khepri_db_migration_enable/1, khepri_db_migration_post_enable/1, + enable_feature_flag/1, is_enabled/0, is_enabled/1, get_feature_state/0, get_feature_state/1, handle_fallback/1]). @@ -332,8 +333,10 @@ init(IsVirgin) -> "up to the Raft cluster leader", [], #{domain => ?RMQLOG_DOMAIN_DB}), ok ?= case IsVirgin of - true -> register_projections(); - false -> ok + true -> + register_projections(); + false -> + register_4_2_0_projections() end, %% Delete transient queues on init. %% Note that we also do this in the @@ -1318,25 +1321,34 @@ register_projections() -> fun register_rabbit_per_vhost_runtime_parameters_projection/0, fun register_rabbit_user_permissions_projection/0, fun register_rabbit_bindings_projection/0, - fun register_rabbit_index_route_projection/0, + fun register_rabbit_route_by_source_key_projection/0, + fun register_rabbit_route_by_source_projection/0, fun register_rabbit_topic_graph_projection/0], - rabbit_misc:for_each_while_ok( - fun(RegisterFun) -> - case RegisterFun() of - ok -> - ok; - %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` - %% would return `{error, exists}` for projections which - %% already exist. - {error, exists} -> - ok; - %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. - {error, {khepri, projection_already_exists, _Info}} -> - ok; - {error, _} = Error -> - Error - end - end, RegFuns). + rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns). + +%% This function registers projections introduced in 4.2.0. In a mixed version +%% cluster, these new projections will appear but won't be used on older nodes. +%% This function can be deleted after feature flag rabbitmq_4.2.0 becomes required. +register_4_2_0_projections() -> + RegFuns = [fun register_rabbit_route_by_source_key_projection/0, + fun register_rabbit_route_by_source_projection/0], + rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns). + +register_projection(RegisterFun) -> + case RegisterFun() of + ok -> + ok; + %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` + %% would return `{error, exists}` for projections which + %% already exist. + {error, exists} -> + ok; + %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. + {error, {khepri, projection_already_exists, _Info}} -> + ok; + {error, _} = Error -> + Error + end. register_rabbit_exchange_projection() -> Name = rabbit_khepri_exchange, @@ -1414,7 +1426,7 @@ register_rabbit_bindings_projection() -> _RoutingKey = ?KHEPRI_WILDCARD_STAR), khepri:register_projection(?STORE_ID, PathPattern, Projection). -register_rabbit_index_route_projection() -> +register_rabbit_route_by_source_key_projection() -> MapFun = fun(_Path, #binding{source = Source, key = Key, destination = Destination, @@ -1427,16 +1439,65 @@ register_rabbit_index_route_projection() -> Options = #{type => bag, keypos => #index_route.source_key, read_concurrency => true}, - Projection = khepri_projection:new( - rabbit_khepri_index_route, ProjectionFun, Options), - IgnoreHeadersAndTopic = #if_data_matches{ - pattern = #exchange{type = '$1', _ = '_'}, - conditions = [{'andalso', - {'=/=', '$1', headers}, - {'=/=', '$1', topic}}]}, + Projection = khepri_projection:new(rabbit_khepri_route_by_source_key, + ProjectionFun, + Options), + Exchange = #if_data_matches{ + pattern = #exchange{type = '$1', _ = '_'}, + conditions = [{'andalso', + {'=/=', '$1', headers}, + {'=/=', '$1', topic}, + {'=/=', '$1', fanout}, + {'=/=', '$1', 'x-jms-topic'}, + {'=/=', '$1', 'x-random'} + }]}, PathPattern = rabbit_db_binding:khepri_route_path( _VHost = ?KHEPRI_WILDCARD_STAR, - _Exchange = IgnoreHeadersAndTopic, + Exchange, + _Kind = ?KHEPRI_WILDCARD_STAR, + _DstName = ?KHEPRI_WILDCARD_STAR, + _RoutingKey = ?KHEPRI_WILDCARD_STAR), + khepri:register_projection(?STORE_ID, PathPattern, Projection). + +register_rabbit_route_by_source_projection() -> + MapFun = fun(_Path, #binding{source = Source, + key = Key, + destination = Destination, + args = Args}) -> + #route_by_source{source = Source, + key = Key, + destination = Destination, + args = Args} + end, + ProjectionFun = projection_fun_for_sets(MapFun), + Options = #{type => bag, + keypos => #route_by_source.source, + read_concurrency => true}, + Projection = khepri_projection:new(rabbit_khepri_route_by_source, + ProjectionFun, + Options), + %% For some exchange types we know that they won't use this projection. + %% So we exclude such bindings for two reasons: + %% 1. Lower overall ETS memory usage + %% 2. "Avoid inserting an extensive amount of objects with the same key. + %% It will hurt insert and lookup performance as well as real time characteristics + %% of the runtime environment (hash bucket linear search do not yield)." + %% Example: same source direct exchange with 100k different binding keys. + %% In future, rather than exchange types exclusion as done here, a nicer approach + %% would be that each exchange requiring routing lookup by only source exchange + %% advertises this access pattern, e.g. as a boolean flag in the #exchange.options field. + Exchange = #if_data_matches{ + pattern = #exchange{type = '$1', _ = '_'}, + conditions = [{'andalso', + {'=/=', '$1', headers}, + {'=/=', '$1', topic}, + {'=/=', '$1', direct}, + {'=/=', '$1', 'x-local-random'}, + {'=/=', '$1', 'x-jms-topic'} + }]}, + PathPattern = rabbit_db_binding:khepri_route_path( + _VHost = ?KHEPRI_WILDCARD_STAR, + Exchange, _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), @@ -1755,6 +1816,22 @@ khepri_db_migration_post_enable( _ = mnesia_to_khepri:rollback_table_copy(?STORE_ID, ?MIGRATION_ID), ok. +enable_feature_flag(#{command := enable, + feature_name := 'rabbitmq_4.2.0' = FeatureName}) -> + %% We unregister this projection because it's superseded by + %% rabbit_khepri_route_by_source_key introduced in 4.2.0 + ProjectionName = rabbit_khepri_index_route, + Result = try khepri:unregister_projections(?STORE_ID, [ProjectionName]) + catch _:Reason -> Reason + end, + ?LOG_DEBUG( + "enabling feature flag ~s unregisters projection ~s: ~tp", + [FeatureName, ProjectionName, Result], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok; +enable_feature_flag(_) -> + ok. + -spec sync_cluster_membership_from_mnesia(FeatureName) -> Ret when FeatureName :: rabbit_feature_flags:feature_name(), Ret :: ok | {error, Reason}, diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 10765ba41263..986cdcd44988 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -48,7 +48,8 @@ all_tests() -> list_with_multiple_vhosts, list_with_multiple_arguments, bind_to_unknown_queue, - binding_args, + binding_args_direct_exchange, + binding_args_fanout_exchange, %% Exchange bindings bind_and_unbind_exchange, bind_and_delete_exchange_source, @@ -698,59 +699,60 @@ bind_to_unknown_queue(Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), ok. -binding_args(Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of - {skip, _} = Skip -> - Skip; - ok -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - Q = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), - - #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), - amqp_channel:register_confirm_handler(Ch, self()), - - %% Create two bindings that differ only in their binding arguments. - Exchange = <<"amq.direct">>, - RoutingKey = <<"some-key">>, - BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}], - BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}], - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, - routing_key = RoutingKey, - queue = Q, - arguments = BindingArgs1}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, - routing_key = RoutingKey, - queue = Q, - arguments = BindingArgs2}), - ok = amqp_channel:cast(Ch, - #'basic.publish'{exchange = Exchange, - routing_key = RoutingKey}, - #amqp_msg{payload = <<"m1">>}), - receive #'basic.ack'{} -> ok - after 9000 -> ct:fail(confirm_timeout) - end, - - ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, - amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})), - - %% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding. - #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange, - routing_key = RoutingKey, - queue = Q, - arguments = BindingArgs1}), - ok = amqp_channel:cast(Ch, - #'basic.publish'{exchange = Exchange, - routing_key = RoutingKey}, - #amqp_msg{payload = <<"m2">>}), - receive #'basic.ack'{} -> ok - after 9000 -> ct:fail(confirm_timeout) - end, - - ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, - amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})) - end. +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/14533 +binding_args_direct_exchange(Config) -> + binding_args(<<"amq.direct">>, Config). + +binding_args_fanout_exchange(Config) -> + binding_args(<<"amq.fanout">>, Config). + +binding_args(Exchange, Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + %% Create two bindings that differ only in their binding arguments. + RoutingKey = <<"some-key">>, + BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}], + BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}], + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs2}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m1">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})), + + %% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding. + #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m2">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})). bind_and_unbind_exchange(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), diff --git a/deps/rabbit_common/include/rabbit.hrl b/deps/rabbit_common/include/rabbit.hrl index cdd4772d3bbe..8c55bb600159 100644 --- a/deps/rabbit_common/include/rabbit.hrl +++ b/deps/rabbit_common/include/rabbit.hrl @@ -94,6 +94,7 @@ -record(route, {binding, value = const}). -record(reverse_route, {reverse_binding, value = const}). -record(index_route, {source_key, destination, args = []}). +-record(route_by_source, {source, key, destination, args = []}). -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). From 4876315f9274126feea6c28a9a4bdad169fab892 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 16 Sep 2025 15:35:50 +0200 Subject: [PATCH 4/6] Add exchange binding tests Test that exchange bindings work correctly with the new projection tables `rabbit_khepri_route_by_source` and `rabbit_khepri_route_by_source_key`. --- deps/rabbit/test/bindings_SUITE.erl | 52 +++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 986cdcd44988..539515fd93f8 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -50,8 +50,10 @@ all_tests() -> bind_to_unknown_queue, binding_args_direct_exchange, binding_args_fanout_exchange, + %% Exchange bindings - bind_and_unbind_exchange, + bind_and_unbind_direct_exchange, + bind_and_unbind_fanout_exchange, bind_and_delete_exchange_source, bind_and_delete_exchange_destination, bind_to_unknown_exchange, @@ -754,33 +756,61 @@ binding_args(Exchange, Config) -> ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})). -bind_and_unbind_exchange(Config) -> +bind_and_unbind_direct_exchange(Config) -> + bind_and_unbind_exchange(<<"direct">>, Config). + +bind_and_unbind_fanout_exchange(Config) -> + bind_and_unbind_exchange(<<"fanout">>, Config). + +bind_and_unbind_exchange(Type, Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), X = ?config(exchange_name, Config), + Q = ?config(queue_name, Config), + RoutingKey = <<"some key">>, + SourceExchange = <<"amq.", Type/binary>>, ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X}), + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X, + type = Type}), %% Let's bind to other exchange #'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = X, - source = <<"amq.direct">>, - routing_key = <<"key">>}), + source = SourceExchange, + routing_key = RoutingKey}), - DirectBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>), - rabbit_misc:r(<<"/">>, exchange, X), - <<"key">>, []), + Binding = binding_record(rabbit_misc:r(<<"/">>, exchange, SourceExchange), + rabbit_misc:r(<<"/">>, exchange, X), + RoutingKey, []), - ?assertEqual([DirectBinding], + ?assertEqual([Binding], lists:sort( rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>]))), + %% Test that a message gets routed: + %% exchange -> exchange -> queue + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X, + routing_key = RoutingKey, + queue = Q}), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = SourceExchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m1">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + ?assertEqual(#'queue.delete_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + #'exchange.unbind_ok'{} = amqp_channel:call(Ch, #'exchange.unbind'{destination = X, - source = <<"amq.direct">>, - routing_key = <<"key">>}), + source = SourceExchange, + routing_key = RoutingKey}), ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), From 63313e1b48d897af0a998d8d49f7b647b38fea6a Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 16 Sep 2025 17:40:45 +0200 Subject: [PATCH 5/6] Always register all projections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Khepri won’t modify a projection that is already registered (based on its name). --- deps/rabbit/src/rabbit_khepri.erl | 49 ++++++++++++------------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index f244f55635e1..9357f2318018 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -332,12 +332,7 @@ init(IsVirgin) -> "local Khepri-based " ?RA_FRIENDLY_NAME " member is caught " "up to the Raft cluster leader", [], #{domain => ?RMQLOG_DOMAIN_DB}), - ok ?= case IsVirgin of - true -> - register_projections(); - false -> - register_4_2_0_projections() - end, + ok ?= register_projections(), %% Delete transient queues on init. %% Note that we also do this in the %% `rabbit_amqqueue:on_node_down/1' callback. We must try this @@ -1324,31 +1319,23 @@ register_projections() -> fun register_rabbit_route_by_source_key_projection/0, fun register_rabbit_route_by_source_projection/0, fun register_rabbit_topic_graph_projection/0], - rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns). - -%% This function registers projections introduced in 4.2.0. In a mixed version -%% cluster, these new projections will appear but won't be used on older nodes. -%% This function can be deleted after feature flag rabbitmq_4.2.0 becomes required. -register_4_2_0_projections() -> - RegFuns = [fun register_rabbit_route_by_source_key_projection/0, - fun register_rabbit_route_by_source_projection/0], - rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns). - -register_projection(RegisterFun) -> - case RegisterFun() of - ok -> - ok; - %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` - %% would return `{error, exists}` for projections which - %% already exist. - {error, exists} -> - ok; - %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. - {error, {khepri, projection_already_exists, _Info}} -> - ok; - {error, _} = Error -> - Error - end. + rabbit_misc:for_each_while_ok( + fun(RegisterFun) -> + case RegisterFun() of + ok -> + ok; + %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` + %% would return `{error, exists}` for projections which + %% already exist. + {error, exists} -> + ok; + %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. + {error, {khepri, projection_already_exists, _Info}} -> + ok; + {error, _} = Error -> + Error + end + end, RegFuns). register_rabbit_exchange_projection() -> Name = rabbit_khepri_exchange, From af61e93aa004be8df46176e62e0dee8626332858 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 16 Sep 2025 18:34:39 +0200 Subject: [PATCH 6/6] Protect ets:lookup_element/4 in try catch See https://github.com/rabbitmq/rabbitmq-server/pull/11667#issue-2401399413 for rationale. --- deps/rabbit/src/rabbit_db_binding.erl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index a7852befa783..2584a725e79c 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -708,8 +708,15 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) -> end. match_routing_key_in_khepri(Src, ['_']) -> - ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION, - Src, #route_by_source.destination, []); + try + ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION, + Src, + #route_by_source.destination, + []) + catch + error:badarg -> + [] + end; match_routing_key_in_khepri(Src, RoutingKeys) -> lists:foldl( fun(RK, Acc) ->