Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
%% executed. If the migration runs concurrently, whether it started before or
%% during the execution of the Mnesia-specific anonymous function, {@link
%% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia
%% and will retry the Mnesia functino or run the Khepri function accordingly.
%% and will retry the Mnesia function or run the Khepri function accordingly.
%% The Mnesia function must be idempotent because it can be executed multiple
%% times.
%%
Expand Down Expand Up @@ -1415,34 +1415,28 @@ register_rabbit_bindings_projection() ->
khepri:register_projection(?STORE_ID, PathPattern, Projection).

register_rabbit_index_route_projection() ->
MapFun = fun(Path, _) ->
{
VHost,
ExchangeName,
Kind,
DstName,
RoutingKey
} = rabbit_db_binding:khepri_route_path_to_args(Path),
Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
Destination = rabbit_misc:r(VHost, Kind, DstName),
SourceKey = {Exchange, RoutingKey},
#index_route{source_key = SourceKey,
destination = Destination}
MapFun = fun(_Path, #binding{source = Source,
key = Key,
destination = Destination,
args = Args}) ->
#index_route{source_key = {Source, Key},
destination = Destination,
args = Args}
end,
ProjectionFun = projection_fun_for_sets(MapFun),
Options = #{type => bag,
keypos => #index_route.source_key,
read_concurrency => true},
Projection = khepri_projection:new(
rabbit_khepri_index_route, ProjectionFun, Options),
DirectOrFanout = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
IgnoreHeadersAndTopic = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Exchange = DirectOrFanout,
_Exchange = IgnoreHeadersAndTopic,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
Expand Down
57 changes: 56 additions & 1 deletion deps/rabbit/test/bindings_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([nowarn_export_all, export_all]).
-compile(export_all).

suite() ->
[{timetrap, 5 * 60000}].
Expand Down Expand Up @@ -49,6 +48,7 @@ all_tests() ->
list_with_multiple_vhosts,
list_with_multiple_arguments,
bind_to_unknown_queue,
binding_args,
%% Exchange bindings
bind_and_unbind_exchange,
bind_and_delete_exchange_source,
Expand Down Expand Up @@ -116,6 +116,7 @@ end_per_testcase(Testcase, Config) ->
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

bind_and_unbind(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

Expand Down Expand Up @@ -697,6 +698,60 @@ bind_to_unknown_queue(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
ok.

binding_args(Config) ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
{skip, _} = Skip ->
Skip;
ok ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),

#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),

%% Create two bindings that differ only in their binding arguments.
Exchange = <<"amq.direct">>,
RoutingKey = <<"some-key">>,
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs1}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs2}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{payload = <<"m1">>}),
receive #'basic.ack'{} -> ok
after 9000 -> ct:fail(confirm_timeout)
end,

?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),

%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs1}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{payload = <<"m2">>}),
receive #'basic.ack'{} -> ok
after 9000 -> ct:fail(confirm_timeout)
end,

?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true}))
end.

bind_and_unbind_exchange(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

Expand Down
Loading