Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,6 @@
{'rabbitmq_4.2.0',
#{desc => "Allows rolling upgrades to 4.2.x",
stability => stable,
depends_on => ['rabbitmq_4.1.0']
depends_on => ['rabbitmq_4.1.0'],
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
}}).
16 changes: 5 additions & 11 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
-define(KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, rabbit_khepri_route_by_source_key).
-define(KHEPRI_ROUTE_BY_SOURCE_PROJECTION, rabbit_khepri_route_by_source).

%% -------------------------------------------------------------------
%% exists().
Expand Down Expand Up @@ -707,21 +708,14 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
end.

match_routing_key_in_khepri(Src, ['_']) ->
try
MatchHead = #index_route{source_key = {Src, '_'},
destination = '$1',
_ = '_'},
ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION, [{MatchHead, [], ['$1']}])
catch
error:badarg ->
[]
end;
ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION,
Src, #route_by_source.destination, []);
match_routing_key_in_khepri(Src, RoutingKeys) ->
lists:foldl(
fun(RK, Acc) ->
try
Dst = ets:lookup_element(
?KHEPRI_INDEX_ROUTE_PROJECTION,
?KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION,
{Src, RK},
#index_route.destination),
Dst ++ Acc
Expand Down
114 changes: 86 additions & 28 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
%% executed. If the migration runs concurrently, whether it started before or
%% during the execution of the Mnesia-specific anonymous function, {@link
%% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia
%% and will retry the Mnesia functino or run the Khepri function accordingly.
%% and will retry the Mnesia function or run the Khepri function accordingly.
%% The Mnesia function must be idempotent because it can be executed multiple
%% times.
%%
Expand Down Expand Up @@ -170,6 +170,7 @@
%% equivalent cluster
-export([khepri_db_migration_enable/1,
khepri_db_migration_post_enable/1,
enable_feature_flag/1,
is_enabled/0, is_enabled/1,
get_feature_state/0, get_feature_state/1,
handle_fallback/1]).
Expand Down Expand Up @@ -331,10 +332,7 @@ init(IsVirgin) ->
"local Khepri-based " ?RA_FRIENDLY_NAME " member is caught "
"up to the Raft cluster leader", [],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok ?= case IsVirgin of
true -> register_projections();
false -> ok
end,
ok ?= register_projections(),
%% Delete transient queues on init.
%% Note that we also do this in the
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
Expand Down Expand Up @@ -1318,7 +1316,8 @@ register_projections() ->
fun register_rabbit_per_vhost_runtime_parameters_projection/0,
fun register_rabbit_user_permissions_projection/0,
fun register_rabbit_bindings_projection/0,
fun register_rabbit_index_route_projection/0,
fun register_rabbit_route_by_source_key_projection/0,
fun register_rabbit_route_by_source_projection/0,
fun register_rabbit_topic_graph_projection/0],
rabbit_misc:for_each_while_ok(
fun(RegisterFun) ->
Expand Down Expand Up @@ -1414,35 +1413,78 @@ register_rabbit_bindings_projection() ->
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
khepri:register_projection(?STORE_ID, PathPattern, Projection).

register_rabbit_index_route_projection() ->
MapFun = fun(Path, _) ->
{
VHost,
ExchangeName,
Kind,
DstName,
RoutingKey
} = rabbit_db_binding:khepri_route_path_to_args(Path),
Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
Destination = rabbit_misc:r(VHost, Kind, DstName),
SourceKey = {Exchange, RoutingKey},
#index_route{source_key = SourceKey,
destination = Destination}
register_rabbit_route_by_source_key_projection() ->
MapFun = fun(_Path, #binding{source = Source,
key = Key,
destination = Destination,
args = Args}) ->
#index_route{source_key = {Source, Key},
destination = Destination,
args = Args}
end,
ProjectionFun = projection_fun_for_sets(MapFun),
Options = #{type => bag,
keypos => #index_route.source_key,
read_concurrency => true},
Projection = khepri_projection:new(
rabbit_khepri_index_route, ProjectionFun, Options),
DirectOrFanout = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
Projection = khepri_projection:new(rabbit_khepri_route_by_source_key,
ProjectionFun,
Options),
Exchange = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic},
{'=/=', '$1', fanout},
{'=/=', '$1', 'x-jms-topic'},
{'=/=', '$1', 'x-random'}
}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Exchange = DirectOrFanout,
Exchange,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
khepri:register_projection(?STORE_ID, PathPattern, Projection).

register_rabbit_route_by_source_projection() ->
MapFun = fun(_Path, #binding{source = Source,
key = Key,
destination = Destination,
args = Args}) ->
#route_by_source{source = Source,
key = Key,
destination = Destination,
args = Args}
end,
ProjectionFun = projection_fun_for_sets(MapFun),
Options = #{type => bag,
keypos => #route_by_source.source,
read_concurrency => true},
Projection = khepri_projection:new(rabbit_khepri_route_by_source,
ProjectionFun,
Options),
%% For some exchange types we know that they won't use this projection.
%% So we exclude such bindings for two reasons:
%% 1. Lower overall ETS memory usage
%% 2. "Avoid inserting an extensive amount of objects with the same key.
%% It will hurt insert and lookup performance as well as real time characteristics
%% of the runtime environment (hash bucket linear search do not yield)."
%% Example: same source direct exchange with 100k different binding keys.
%% In future, rather than exchange types exclusion as done here, a nicer approach
%% would be that each exchange requiring routing lookup by only source exchange
%% advertises this access pattern, e.g. as a boolean flag in the #exchange.options field.
Exchange = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic},
{'=/=', '$1', direct},
{'=/=', '$1', 'x-local-random'},
{'=/=', '$1', 'x-jms-topic'}
}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
Exchange,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
Expand Down Expand Up @@ -1761,6 +1803,22 @@ khepri_db_migration_post_enable(
_ = mnesia_to_khepri:rollback_table_copy(?STORE_ID, ?MIGRATION_ID),
ok.

enable_feature_flag(#{command := enable,
feature_name := 'rabbitmq_4.2.0' = FeatureName}) ->
%% We unregister this projection because it's superseded by
%% rabbit_khepri_route_by_source_key introduced in 4.2.0
ProjectionName = rabbit_khepri_index_route,
Result = try khepri:unregister_projections(?STORE_ID, [ProjectionName])
catch _:Reason -> Reason
end,
?LOG_DEBUG(
"enabling feature flag ~s unregisters projection ~s: ~tp",
[FeatureName, ProjectionName, Result],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok;
enable_feature_flag(_) ->
ok.

-spec sync_cluster_membership_from_mnesia(FeatureName) -> Ret when
FeatureName :: rabbit_feature_flags:feature_name(),
Ret :: ok | {error, Reason},
Expand Down
111 changes: 99 additions & 12 deletions deps/rabbit/test/bindings_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([nowarn_export_all, export_all]).
-compile(export_all).

suite() ->
[{timetrap, 5 * 60000}].
Expand Down Expand Up @@ -49,8 +48,12 @@ all_tests() ->
list_with_multiple_vhosts,
list_with_multiple_arguments,
bind_to_unknown_queue,
binding_args_direct_exchange,
binding_args_fanout_exchange,

%% Exchange bindings
bind_and_unbind_exchange,
bind_and_unbind_direct_exchange,
bind_and_unbind_fanout_exchange,
bind_and_delete_exchange_source,
bind_and_delete_exchange_destination,
bind_to_unknown_exchange,
Expand Down Expand Up @@ -116,6 +119,7 @@ end_per_testcase(Testcase, Config) ->
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

bind_and_unbind(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

Expand Down Expand Up @@ -697,33 +701,116 @@ bind_to_unknown_queue(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
ok.

bind_and_unbind_exchange(Config) ->
%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/14533
binding_args_direct_exchange(Config) ->
binding_args(<<"amq.direct">>, Config).

binding_args_fanout_exchange(Config) ->
binding_args(<<"amq.fanout">>, Config).

binding_args(Exchange, Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),

#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),

%% Create two bindings that differ only in their binding arguments.
RoutingKey = <<"some-key">>,
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs1}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs2}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{payload = <<"m1">>}),
receive #'basic.ack'{} -> ok
after 9000 -> ct:fail(confirm_timeout)
end,

?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),

%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs1}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{payload = <<"m2">>}),
receive #'basic.ack'{} -> ok
after 9000 -> ct:fail(confirm_timeout)
end,

?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).

bind_and_unbind_direct_exchange(Config) ->
bind_and_unbind_exchange(<<"direct">>, Config).

bind_and_unbind_fanout_exchange(Config) ->
bind_and_unbind_exchange(<<"fanout">>, Config).

bind_and_unbind_exchange(Type, Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
X = ?config(exchange_name, Config),
Q = ?config(queue_name, Config),
RoutingKey = <<"some key">>,
SourceExchange = <<"amq.", Type/binary>>,

?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),

#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X}),
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X,
type = Type}),
%% Let's bind to other exchange
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = X,
source = <<"amq.direct">>,
routing_key = <<"key">>}),
source = SourceExchange,
routing_key = RoutingKey}),

DirectBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>),
rabbit_misc:r(<<"/">>, exchange, X),
<<"key">>, []),
Binding = binding_record(rabbit_misc:r(<<"/">>, exchange, SourceExchange),
rabbit_misc:r(<<"/">>, exchange, X),
RoutingKey, []),

?assertEqual([DirectBinding],
?assertEqual([Binding],
lists:sort(
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>]))),

%% Test that a message gets routed:
%% exchange -> exchange -> queue
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X,
routing_key = RoutingKey,
queue = Q}),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
ok = amqp_channel:cast(Ch,
#'basic.publish'{exchange = SourceExchange,
routing_key = RoutingKey},
#amqp_msg{payload = <<"m1">>}),
receive #'basic.ack'{} -> ok
after 9000 -> ct:fail(confirm_timeout)
end,
?assertEqual(#'queue.delete_ok'{message_count = 1},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),

#'exchange.unbind_ok'{} = amqp_channel:call(Ch,
#'exchange.unbind'{destination = X,
source = <<"amq.direct">>,
routing_key = <<"key">>}),
source = SourceExchange,
routing_key = RoutingKey}),

?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit_common/include/rabbit.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
-record(route, {binding, value = const}).
-record(reverse_route, {reverse_binding, value = const}).
-record(index_route, {source_key, destination, args = []}).
-record(route_by_source, {source, key, destination, args = []}).

-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
Expand Down
Loading