diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 98eea8aba260..7f27f9f5544e 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -216,5 +216,6 @@ {'rabbitmq_4.2.0', #{desc => "Allows rolling upgrades to 4.2.x", stability => stable, - depends_on => ['rabbitmq_4.1.0'] + depends_on => ['rabbitmq_4.1.0'], + callbacks => #{enable => {rabbit_khepri, enable_feature_flag}} }}). diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index 0588a0cffd5a..2584a725e79c 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -56,7 +56,8 @@ -define(MNESIA_REVERSE_TABLE, rabbit_reverse_route). -define(MNESIA_INDEX_TABLE, rabbit_index_route). -define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding). --define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route). +-define(KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, rabbit_khepri_route_by_source_key). +-define(KHEPRI_ROUTE_BY_SOURCE_PROJECTION, rabbit_khepri_route_by_source). %% ------------------------------------------------------------------- %% exists(). @@ -708,10 +709,10 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) -> match_routing_key_in_khepri(Src, ['_']) -> try - MatchHead = #index_route{source_key = {Src, '_'}, - destination = '$1', - _ = '_'}, - ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION, [{MatchHead, [], ['$1']}]) + ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION, + Src, + #route_by_source.destination, + []) catch error:badarg -> [] @@ -721,7 +722,7 @@ match_routing_key_in_khepri(Src, RoutingKeys) -> fun(RK, Acc) -> try Dst = ets:lookup_element( - ?KHEPRI_INDEX_ROUTE_PROJECTION, + ?KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, {Src, RK}, #index_route.destination), Dst ++ Acc diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 7dda47984c5a..9357f2318018 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -60,7 +60,7 @@ %% executed. If the migration runs concurrently, whether it started before or %% during the execution of the Mnesia-specific anonymous function, {@link %% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia -%% and will retry the Mnesia functino or run the Khepri function accordingly. +%% and will retry the Mnesia function or run the Khepri function accordingly. %% The Mnesia function must be idempotent because it can be executed multiple %% times. %% @@ -170,6 +170,7 @@ %% equivalent cluster -export([khepri_db_migration_enable/1, khepri_db_migration_post_enable/1, + enable_feature_flag/1, is_enabled/0, is_enabled/1, get_feature_state/0, get_feature_state/1, handle_fallback/1]). @@ -331,10 +332,7 @@ init(IsVirgin) -> "local Khepri-based " ?RA_FRIENDLY_NAME " member is caught " "up to the Raft cluster leader", [], #{domain => ?RMQLOG_DOMAIN_DB}), - ok ?= case IsVirgin of - true -> register_projections(); - false -> ok - end, + ok ?= register_projections(), %% Delete transient queues on init. %% Note that we also do this in the %% `rabbit_amqqueue:on_node_down/1' callback. We must try this @@ -1318,7 +1316,8 @@ register_projections() -> fun register_rabbit_per_vhost_runtime_parameters_projection/0, fun register_rabbit_user_permissions_projection/0, fun register_rabbit_bindings_projection/0, - fun register_rabbit_index_route_projection/0, + fun register_rabbit_route_by_source_key_projection/0, + fun register_rabbit_route_by_source_projection/0, fun register_rabbit_topic_graph_projection/0], rabbit_misc:for_each_while_ok( fun(RegisterFun) -> @@ -1414,35 +1413,78 @@ register_rabbit_bindings_projection() -> _RoutingKey = ?KHEPRI_WILDCARD_STAR), khepri:register_projection(?STORE_ID, PathPattern, Projection). -register_rabbit_index_route_projection() -> - MapFun = fun(Path, _) -> - { - VHost, - ExchangeName, - Kind, - DstName, - RoutingKey - } = rabbit_db_binding:khepri_route_path_to_args(Path), - Exchange = rabbit_misc:r(VHost, exchange, ExchangeName), - Destination = rabbit_misc:r(VHost, Kind, DstName), - SourceKey = {Exchange, RoutingKey}, - #index_route{source_key = SourceKey, - destination = Destination} +register_rabbit_route_by_source_key_projection() -> + MapFun = fun(_Path, #binding{source = Source, + key = Key, + destination = Destination, + args = Args}) -> + #index_route{source_key = {Source, Key}, + destination = Destination, + args = Args} end, ProjectionFun = projection_fun_for_sets(MapFun), Options = #{type => bag, keypos => #index_route.source_key, read_concurrency => true}, - Projection = khepri_projection:new( - rabbit_khepri_index_route, ProjectionFun, Options), - DirectOrFanout = #if_data_matches{ - pattern = #exchange{type = '$1', _ = '_'}, - conditions = [{'andalso', - {'=/=', '$1', headers}, - {'=/=', '$1', topic}}]}, + Projection = khepri_projection:new(rabbit_khepri_route_by_source_key, + ProjectionFun, + Options), + Exchange = #if_data_matches{ + pattern = #exchange{type = '$1', _ = '_'}, + conditions = [{'andalso', + {'=/=', '$1', headers}, + {'=/=', '$1', topic}, + {'=/=', '$1', fanout}, + {'=/=', '$1', 'x-jms-topic'}, + {'=/=', '$1', 'x-random'} + }]}, PathPattern = rabbit_db_binding:khepri_route_path( _VHost = ?KHEPRI_WILDCARD_STAR, - _Exchange = DirectOrFanout, + Exchange, + _Kind = ?KHEPRI_WILDCARD_STAR, + _DstName = ?KHEPRI_WILDCARD_STAR, + _RoutingKey = ?KHEPRI_WILDCARD_STAR), + khepri:register_projection(?STORE_ID, PathPattern, Projection). + +register_rabbit_route_by_source_projection() -> + MapFun = fun(_Path, #binding{source = Source, + key = Key, + destination = Destination, + args = Args}) -> + #route_by_source{source = Source, + key = Key, + destination = Destination, + args = Args} + end, + ProjectionFun = projection_fun_for_sets(MapFun), + Options = #{type => bag, + keypos => #route_by_source.source, + read_concurrency => true}, + Projection = khepri_projection:new(rabbit_khepri_route_by_source, + ProjectionFun, + Options), + %% For some exchange types we know that they won't use this projection. + %% So we exclude such bindings for two reasons: + %% 1. Lower overall ETS memory usage + %% 2. "Avoid inserting an extensive amount of objects with the same key. + %% It will hurt insert and lookup performance as well as real time characteristics + %% of the runtime environment (hash bucket linear search do not yield)." + %% Example: same source direct exchange with 100k different binding keys. + %% In future, rather than exchange types exclusion as done here, a nicer approach + %% would be that each exchange requiring routing lookup by only source exchange + %% advertises this access pattern, e.g. as a boolean flag in the #exchange.options field. + Exchange = #if_data_matches{ + pattern = #exchange{type = '$1', _ = '_'}, + conditions = [{'andalso', + {'=/=', '$1', headers}, + {'=/=', '$1', topic}, + {'=/=', '$1', direct}, + {'=/=', '$1', 'x-local-random'}, + {'=/=', '$1', 'x-jms-topic'} + }]}, + PathPattern = rabbit_db_binding:khepri_route_path( + _VHost = ?KHEPRI_WILDCARD_STAR, + Exchange, _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), @@ -1761,6 +1803,22 @@ khepri_db_migration_post_enable( _ = mnesia_to_khepri:rollback_table_copy(?STORE_ID, ?MIGRATION_ID), ok. +enable_feature_flag(#{command := enable, + feature_name := 'rabbitmq_4.2.0' = FeatureName}) -> + %% We unregister this projection because it's superseded by + %% rabbit_khepri_route_by_source_key introduced in 4.2.0 + ProjectionName = rabbit_khepri_index_route, + Result = try khepri:unregister_projections(?STORE_ID, [ProjectionName]) + catch _:Reason -> Reason + end, + ?LOG_DEBUG( + "enabling feature flag ~s unregisters projection ~s: ~tp", + [FeatureName, ProjectionName, Result], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok; +enable_feature_flag(_) -> + ok. + -spec sync_cluster_membership_from_mnesia(FeatureName) -> Ret when FeatureName :: rabbit_feature_flags:feature_name(), Ret :: ok | {error, Reason}, diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 382251fb7b59..539515fd93f8 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -13,7 +13,6 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile([nowarn_export_all, export_all]). --compile(export_all). suite() -> [{timetrap, 5 * 60000}]. @@ -49,8 +48,12 @@ all_tests() -> list_with_multiple_vhosts, list_with_multiple_arguments, bind_to_unknown_queue, + binding_args_direct_exchange, + binding_args_fanout_exchange, + %% Exchange bindings - bind_and_unbind_exchange, + bind_and_unbind_direct_exchange, + bind_and_unbind_fanout_exchange, bind_and_delete_exchange_source, bind_and_delete_exchange_destination, bind_to_unknown_exchange, @@ -116,6 +119,7 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- %% Testcases. %% ------------------------------------------------------------------- + bind_and_unbind(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -697,33 +701,116 @@ bind_to_unknown_queue(Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), ok. -bind_and_unbind_exchange(Config) -> +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/14533 +binding_args_direct_exchange(Config) -> + binding_args(<<"amq.direct">>, Config). + +binding_args_fanout_exchange(Config) -> + binding_args(<<"amq.fanout">>, Config). + +binding_args(Exchange, Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + %% Create two bindings that differ only in their binding arguments. + RoutingKey = <<"some-key">>, + BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}], + BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}], + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs2}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m1">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})), + + %% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding. + #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m2">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})). + +bind_and_unbind_direct_exchange(Config) -> + bind_and_unbind_exchange(<<"direct">>, Config). + +bind_and_unbind_fanout_exchange(Config) -> + bind_and_unbind_exchange(<<"fanout">>, Config). + +bind_and_unbind_exchange(Type, Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), X = ?config(exchange_name, Config), + Q = ?config(queue_name, Config), + RoutingKey = <<"some key">>, + SourceExchange = <<"amq.", Type/binary>>, ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X}), + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X, + type = Type}), %% Let's bind to other exchange #'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = X, - source = <<"amq.direct">>, - routing_key = <<"key">>}), + source = SourceExchange, + routing_key = RoutingKey}), - DirectBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>), - rabbit_misc:r(<<"/">>, exchange, X), - <<"key">>, []), + Binding = binding_record(rabbit_misc:r(<<"/">>, exchange, SourceExchange), + rabbit_misc:r(<<"/">>, exchange, X), + RoutingKey, []), - ?assertEqual([DirectBinding], + ?assertEqual([Binding], lists:sort( rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>]))), + %% Test that a message gets routed: + %% exchange -> exchange -> queue + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X, + routing_key = RoutingKey, + queue = Q}), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = SourceExchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m1">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + ?assertEqual(#'queue.delete_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + #'exchange.unbind_ok'{} = amqp_channel:call(Ch, #'exchange.unbind'{destination = X, - source = <<"amq.direct">>, - routing_key = <<"key">>}), + source = SourceExchange, + routing_key = RoutingKey}), ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), diff --git a/deps/rabbit_common/include/rabbit.hrl b/deps/rabbit_common/include/rabbit.hrl index cdd4772d3bbe..8c55bb600159 100644 --- a/deps/rabbit_common/include/rabbit.hrl +++ b/deps/rabbit_common/include/rabbit.hrl @@ -94,6 +94,7 @@ -record(route, {binding, value = const}). -record(reverse_route, {reverse_binding, value = const}). -record(index_route, {source_key, destination, args = []}). +-record(route_by_source, {source, key, destination, args = []}). -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}).