Skip to content

Commit 52896ee

Browse files
committed
wip
1 parent afc7f59 commit 52896ee

File tree

5 files changed

+138
-113
lines changed

5 files changed

+138
-113
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,5 +216,6 @@
216216
{'rabbitmq_4.2.0',
217217
#{desc => "Allows rolling upgrades to 4.2.x",
218218
stability => stable,
219-
depends_on => ['rabbitmq_4.1.0']
219+
depends_on => ['rabbitmq_4.1.0'],
220+
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
220221
}}).

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@
5656
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
5757
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
5858
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
59-
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
60-
-define(KHEPRI_SOURCE_ROUTE_PROJECTION, rabbit_khepri_source_route).
59+
-define(KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, rabbit_khepri_route_by_source_key).
60+
-define(KHEPRI_ROUTE_BY_SOURCE_PROJECTION, rabbit_khepri_route_by_source).
6161

6262
%% -------------------------------------------------------------------
6363
%% exists().
@@ -708,26 +708,14 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
708708
end.
709709

710710
match_routing_key_in_khepri(Src, ['_']) ->
711-
try ets:lookup_element(?KHEPRI_SOURCE_ROUTE_PROJECTION,
712-
Src, #source_route.destination, [])
713-
catch error:badarg ->
714-
%% The new projection doesn't exist.
715-
%% Fall back to old & slow approach.
716-
MatchHead = #index_route{source_key = {Src, '_'},
717-
destination = '$1',
718-
_ = '_'},
719-
try ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION,
720-
[{MatchHead, [], ['$1']}])
721-
catch error:badarg ->
722-
[]
723-
end
724-
end;
711+
ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION,
712+
Src, #route_by_source.destination, []);
725713
match_routing_key_in_khepri(Src, RoutingKeys) ->
726714
lists:foldl(
727715
fun(RK, Acc) ->
728716
try
729717
Dst = ets:lookup_element(
730-
?KHEPRI_INDEX_ROUTE_PROJECTION,
718+
?KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION,
731719
{Src, RK},
732720
#index_route.destination),
733721
Dst ++ Acc

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 74 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@
170170
%% equivalent cluster
171171
-export([khepri_db_migration_enable/1,
172172
khepri_db_migration_post_enable/1,
173+
enable_feature_flag/1,
173174
is_enabled/0, is_enabled/1,
174175
get_feature_state/0, get_feature_state/1,
175176
handle_fallback/1]).
@@ -332,8 +333,10 @@ init(IsVirgin) ->
332333
"up to the Raft cluster leader", [],
333334
#{domain => ?RMQLOG_DOMAIN_DB}),
334335
ok ?= case IsVirgin of
335-
true -> register_projections();
336-
false -> ok
336+
true ->
337+
register_projections();
338+
false ->
339+
register_4_2_0_projections()
337340
end,
338341
%% Delete transient queues on init.
339342
%% Note that we also do this in the
@@ -1318,26 +1321,34 @@ register_projections() ->
13181321
fun register_rabbit_per_vhost_runtime_parameters_projection/0,
13191322
fun register_rabbit_user_permissions_projection/0,
13201323
fun register_rabbit_bindings_projection/0,
1321-
fun register_rabbit_index_route_projection/0,
1322-
fun register_rabbit_source_route_projection/0,
1324+
fun register_rabbit_route_by_source_key_projection/0,
1325+
fun register_rabbit_route_by_source_projection/0,
13231326
fun register_rabbit_topic_graph_projection/0],
1324-
rabbit_misc:for_each_while_ok(
1325-
fun(RegisterFun) ->
1326-
case RegisterFun() of
1327-
ok ->
1328-
ok;
1329-
%% Before Khepri v0.13.0, `khepri:register_projection/1,2,3`
1330-
%% would return `{error, exists}` for projections which
1331-
%% already exist.
1332-
{error, exists} ->
1333-
ok;
1334-
%% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead.
1335-
{error, {khepri, projection_already_exists, _Info}} ->
1336-
ok;
1337-
{error, _} = Error ->
1338-
Error
1339-
end
1340-
end, RegFuns).
1327+
rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns).
1328+
1329+
%% This function registers projections introduced in 4.2.0. In a mixed version
1330+
%% cluster, these new projections will appear but won't be used on older nodes.
1331+
%% This function can be deleted after feature flag rabbitmq_4.2.0 becomes required.
1332+
register_4_2_0_projections() ->
1333+
RegFuns = [fun register_rabbit_route_by_source_key_projection/0,
1334+
fun register_rabbit_route_by_source_projection/0],
1335+
rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns).
1336+
1337+
register_projection(RegisterFun) ->
1338+
case RegisterFun() of
1339+
ok ->
1340+
ok;
1341+
%% Before Khepri v0.13.0, `khepri:register_projection/1,2,3`
1342+
%% would return `{error, exists}` for projections which
1343+
%% already exist.
1344+
{error, exists} ->
1345+
ok;
1346+
%% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead.
1347+
{error, {khepri, projection_already_exists, _Info}} ->
1348+
ok;
1349+
{error, _} = Error ->
1350+
Error
1351+
end.
13411352

13421353
register_rabbit_exchange_projection() ->
13431354
Name = rabbit_khepri_exchange,
@@ -1415,7 +1426,7 @@ register_rabbit_bindings_projection() ->
14151426
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
14161427
khepri:register_projection(?STORE_ID, PathPattern, Projection).
14171428

1418-
register_rabbit_index_route_projection() ->
1429+
register_rabbit_route_by_source_key_projection() ->
14191430
MapFun = fun(_Path, #binding{source = Source,
14201431
key = Key,
14211432
destination = Destination,
@@ -1428,44 +1439,51 @@ register_rabbit_index_route_projection() ->
14281439
Options = #{type => bag,
14291440
keypos => #index_route.source_key,
14301441
read_concurrency => true},
1431-
Projection = khepri_projection:new(
1432-
rabbit_khepri_index_route, ProjectionFun, Options),
1433-
IgnoreHeadersAndTopic = #if_data_matches{
1434-
pattern = #exchange{type = '$1', _ = '_'},
1435-
conditions = [{'andalso',
1436-
{'=/=', '$1', headers},
1437-
{'=/=', '$1', topic}}]},
1442+
Projection = khepri_projection:new(rabbit_khepri_route_by_source_key,
1443+
ProjectionFun,
1444+
Options),
1445+
Exchange = #if_data_matches{
1446+
pattern = #exchange{type = '$1', _ = '_'},
1447+
conditions = [{'andalso',
1448+
{'=/=', '$1', headers},
1449+
{'=/=', '$1', topic},
1450+
{'=/=', '$1', fanout},
1451+
{'=/=', '$1', 'x-jms-topic'},
1452+
{'=/=', '$1', 'x-random'}
1453+
}]},
14381454
PathPattern = rabbit_db_binding:khepri_route_path(
14391455
_VHost = ?KHEPRI_WILDCARD_STAR,
1440-
_Exchange = IgnoreHeadersAndTopic,
1456+
Exchange,
14411457
_Kind = ?KHEPRI_WILDCARD_STAR,
14421458
_DstName = ?KHEPRI_WILDCARD_STAR,
14431459
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
14441460
khepri:register_projection(?STORE_ID, PathPattern, Projection).
14451461

1446-
register_rabbit_source_route_projection() ->
1462+
register_rabbit_route_by_source_projection() ->
14471463
MapFun = fun(_Path, #binding{source = Source,
14481464
key = Key,
14491465
destination = Destination,
14501466
args = Args}) ->
1451-
#source_route{source = Source,
1452-
key = Key,
1453-
destination = Destination,
1454-
args = Args}
1467+
#route_by_source{source = Source,
1468+
key = Key,
1469+
destination = Destination,
1470+
args = Args}
14551471
end,
14561472
ProjectionFun = projection_fun_for_sets(MapFun),
14571473
Options = #{type => bag,
1458-
keypos => #source_route.source,
1474+
keypos => #route_by_source.source,
14591475
read_concurrency => true},
1460-
Projection = khepri_projection:new(
1461-
rabbit_khepri_source_route, ProjectionFun, Options),
1476+
Projection = khepri_projection:new(rabbit_khepri_route_by_source,
1477+
ProjectionFun,
1478+
Options),
14621479
Exchange = #if_data_matches{
14631480
pattern = #exchange{type = '$1', _ = '_'},
14641481
conditions = [{'andalso',
1465-
{'=/=', '$1', direct},
14661482
{'=/=', '$1', headers},
14671483
{'=/=', '$1', topic},
1468-
{'=/=', '$1', 'x-local-random'}
1484+
{'=/=', '$1', direct},
1485+
{'=/=', '$1', 'x-local-random'},
1486+
{'=/=', '$1', 'x-jms-topic'}
14691487
}]},
14701488
PathPattern = rabbit_db_binding:khepri_route_path(
14711489
_VHost = ?KHEPRI_WILDCARD_STAR,
@@ -1788,6 +1806,22 @@ khepri_db_migration_post_enable(
17881806
_ = mnesia_to_khepri:rollback_table_copy(?STORE_ID, ?MIGRATION_ID),
17891807
ok.
17901808

1809+
enable_feature_flag(#{command := enable,
1810+
feature_name := 'rabbitmq_4.2.0' = FeatureName}) ->
1811+
%% We unregister this projection because it's superseded by
1812+
%% rabbit_khepri_route_by_source_key introduced in 4.2.0
1813+
ProjectionName = rabbit_khepri_index_route,
1814+
Result = try khepri:unregister_projections(?STORE_ID, [ProjectionName])
1815+
catch _:Reason -> Reason
1816+
end,
1817+
?LOG_DEBUG(
1818+
"enabling feature flag ~s unregisters projection ~s: ~tp",
1819+
[FeatureName, ProjectionName, Result],
1820+
#{domain => ?RMQLOG_DOMAIN_DB}),
1821+
ok;
1822+
enable_feature_flag(_) ->
1823+
ok.
1824+
17911825
-spec sync_cluster_membership_from_mnesia(FeatureName) -> Ret when
17921826
FeatureName :: rabbit_feature_flags:feature_name(),
17931827
Ret :: ok | {error, Reason},

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ all_tests() ->
4848
list_with_multiple_vhosts,
4949
list_with_multiple_arguments,
5050
bind_to_unknown_queue,
51-
binding_args,
51+
binding_args_direct_exchange,
52+
binding_args_fanout_exchange,
5253
%% Exchange bindings
5354
bind_and_unbind_exchange,
5455
bind_and_delete_exchange_source,
@@ -698,59 +699,60 @@ bind_to_unknown_queue(Config) ->
698699
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
699700
ok.
700701

701-
binding_args(Config) ->
702-
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
703-
{skip, _} = Skip ->
704-
Skip;
705-
ok ->
706-
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
707-
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
708-
Q = ?config(queue_name, Config),
709-
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
710-
711-
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
712-
amqp_channel:register_confirm_handler(Ch, self()),
713-
714-
%% Create two bindings that differ only in their binding arguments.
715-
Exchange = <<"amq.direct">>,
716-
RoutingKey = <<"some-key">>,
717-
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
718-
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
719-
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
720-
routing_key = RoutingKey,
721-
queue = Q,
722-
arguments = BindingArgs1}),
723-
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
724-
routing_key = RoutingKey,
725-
queue = Q,
726-
arguments = BindingArgs2}),
727-
ok = amqp_channel:cast(Ch,
728-
#'basic.publish'{exchange = Exchange,
729-
routing_key = RoutingKey},
730-
#amqp_msg{payload = <<"m1">>}),
731-
receive #'basic.ack'{} -> ok
732-
after 9000 -> ct:fail(confirm_timeout)
733-
end,
734-
735-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
736-
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),
737-
738-
%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
739-
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
740-
routing_key = RoutingKey,
741-
queue = Q,
742-
arguments = BindingArgs1}),
743-
ok = amqp_channel:cast(Ch,
744-
#'basic.publish'{exchange = Exchange,
745-
routing_key = RoutingKey},
746-
#amqp_msg{payload = <<"m2">>}),
747-
receive #'basic.ack'{} -> ok
748-
after 9000 -> ct:fail(confirm_timeout)
749-
end,
750-
751-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
752-
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true}))
753-
end.
702+
%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/14533
703+
binding_args_direct_exchange(Config) ->
704+
binding_args(<<"amq.direct">>, Config).
705+
706+
binding_args_fanout_exchange(Config) ->
707+
binding_args(<<"amq.fanout">>, Config).
708+
709+
binding_args(Exchange, Config) ->
710+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
711+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
712+
Q = ?config(queue_name, Config),
713+
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
714+
715+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
716+
amqp_channel:register_confirm_handler(Ch, self()),
717+
718+
%% Create two bindings that differ only in their binding arguments.
719+
RoutingKey = <<"some-key">>,
720+
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
721+
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
722+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
723+
routing_key = RoutingKey,
724+
queue = Q,
725+
arguments = BindingArgs1}),
726+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
727+
routing_key = RoutingKey,
728+
queue = Q,
729+
arguments = BindingArgs2}),
730+
ok = amqp_channel:cast(Ch,
731+
#'basic.publish'{exchange = Exchange,
732+
routing_key = RoutingKey},
733+
#amqp_msg{payload = <<"m1">>}),
734+
receive #'basic.ack'{} -> ok
735+
after 9000 -> ct:fail(confirm_timeout)
736+
end,
737+
738+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
739+
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),
740+
741+
%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
742+
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
743+
routing_key = RoutingKey,
744+
queue = Q,
745+
arguments = BindingArgs1}),
746+
ok = amqp_channel:cast(Ch,
747+
#'basic.publish'{exchange = Exchange,
748+
routing_key = RoutingKey},
749+
#amqp_msg{payload = <<"m2">>}),
750+
receive #'basic.ack'{} -> ok
751+
after 9000 -> ct:fail(confirm_timeout)
752+
end,
753+
754+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
755+
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).
754756

755757
bind_and_unbind_exchange(Config) ->
756758
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

deps/rabbit_common/include/rabbit.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
-record(route, {binding, value = const}).
9595
-record(reverse_route, {reverse_binding, value = const}).
9696
-record(index_route, {source_key, destination, args = []}).
97-
-record(source_route, {source, key, destination, args = []}).
97+
-record(route_by_source, {source, key, destination, args = []}).
9898

9999
-record(binding, {source, key, destination, args = []}).
100100
-record(reverse_binding, {destination, key, source, args = []}).

0 commit comments

Comments
 (0)