diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 7dda47984c5a..4ce3f580e8d9 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -60,7 +60,7 @@ %% executed. If the migration runs concurrently, whether it started before or %% during the execution of the Mnesia-specific anonymous function, {@link %% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia -%% and will retry the Mnesia functino or run the Khepri function accordingly. +%% and will retry the Mnesia function or run the Khepri function accordingly. %% The Mnesia function must be idempotent because it can be executed multiple %% times. %% @@ -1415,19 +1415,13 @@ register_rabbit_bindings_projection() -> khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_index_route_projection() -> - MapFun = fun(Path, _) -> - { - VHost, - ExchangeName, - Kind, - DstName, - RoutingKey - } = rabbit_db_binding:khepri_route_path_to_args(Path), - Exchange = rabbit_misc:r(VHost, exchange, ExchangeName), - Destination = rabbit_misc:r(VHost, Kind, DstName), - SourceKey = {Exchange, RoutingKey}, - #index_route{source_key = SourceKey, - destination = Destination} + MapFun = fun(_Path, #binding{source = Source, + key = Key, + destination = Destination, + args = Args}) -> + #index_route{source_key = {Source, Key}, + destination = Destination, + args = Args} end, ProjectionFun = projection_fun_for_sets(MapFun), Options = #{type => bag, @@ -1435,14 +1429,14 @@ register_rabbit_index_route_projection() -> read_concurrency => true}, Projection = khepri_projection:new( rabbit_khepri_index_route, ProjectionFun, Options), - DirectOrFanout = #if_data_matches{ - pattern = #exchange{type = '$1', _ = '_'}, - conditions = [{'andalso', - {'=/=', '$1', headers}, - {'=/=', '$1', topic}}]}, + IgnoreHeadersAndTopic = #if_data_matches{ + pattern = #exchange{type = '$1', _ = '_'}, + conditions = [{'andalso', + {'=/=', '$1', headers}, + {'=/=', '$1', topic}}]}, PathPattern = rabbit_db_binding:khepri_route_path( _VHost = ?KHEPRI_WILDCARD_STAR, - _Exchange = DirectOrFanout, + _Exchange = IgnoreHeadersAndTopic, _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 382251fb7b59..10765ba41263 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -13,7 +13,6 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile([nowarn_export_all, export_all]). --compile(export_all). suite() -> [{timetrap, 5 * 60000}]. @@ -49,6 +48,7 @@ all_tests() -> list_with_multiple_vhosts, list_with_multiple_arguments, bind_to_unknown_queue, + binding_args, %% Exchange bindings bind_and_unbind_exchange, bind_and_delete_exchange_source, @@ -116,6 +116,7 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- %% Testcases. %% ------------------------------------------------------------------- + bind_and_unbind(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -697,6 +698,60 @@ bind_to_unknown_queue(Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), ok. +binding_args(Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of + {skip, _} = Skip -> + Skip; + ok -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + %% Create two bindings that differ only in their binding arguments. + Exchange = <<"amq.direct">>, + RoutingKey = <<"some-key">>, + BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}], + BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}], + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs2}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m1">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})), + + %% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding. + #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange, + routing_key = RoutingKey, + queue = Q, + arguments = BindingArgs1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = RoutingKey}, + #amqp_msg{payload = <<"m2">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})) + end. + bind_and_unbind_exchange(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),