Skip to content

Commit ccb56fc

Browse files
committed
Fix regression with Khepri binding args
Fix #14533
1 parent 481e765 commit ccb56fc

File tree

2 files changed

+69
-71
lines changed

2 files changed

+69
-71
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
%% executed. If the migration runs concurrently, whether it started before or
6161
%% during the execution of the Mnesia-specific anonymous function, {@link
6262
%% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia
63-
%% and will retry the Mnesia functino or run the Khepri function accordingly.
63+
%% and will retry the Mnesia function or run the Khepri function accordingly.
6464
%% The Mnesia function must be idempotent because it can be executed multiple
6565
%% times.
6666
%%
@@ -1415,34 +1415,28 @@ register_rabbit_bindings_projection() ->
14151415
khepri:register_projection(?STORE_ID, PathPattern, Projection).
14161416

14171417
register_rabbit_index_route_projection() ->
1418-
MapFun = fun(Path, _) ->
1419-
{
1420-
VHost,
1421-
ExchangeName,
1422-
Kind,
1423-
DstName,
1424-
RoutingKey
1425-
} = rabbit_db_binding:khepri_route_path_to_args(Path),
1426-
Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
1427-
Destination = rabbit_misc:r(VHost, Kind, DstName),
1428-
SourceKey = {Exchange, RoutingKey},
1429-
#index_route{source_key = SourceKey,
1430-
destination = Destination}
1418+
MapFun = fun(_Path, #binding{source = Source,
1419+
key = Key,
1420+
destination = Destination,
1421+
args = Args}) ->
1422+
#index_route{source_key = {Source, Key},
1423+
destination = Destination,
1424+
args = Args}
14311425
end,
14321426
ProjectionFun = projection_fun_for_sets(MapFun),
14331427
Options = #{type => bag,
14341428
keypos => #index_route.source_key,
14351429
read_concurrency => true},
14361430
Projection = khepri_projection:new(
14371431
rabbit_khepri_index_route, ProjectionFun, Options),
1438-
DirectOrFanout = #if_data_matches{
1439-
pattern = #exchange{type = '$1', _ = '_'},
1440-
conditions = [{'andalso',
1441-
{'=/=', '$1', headers},
1442-
{'=/=', '$1', topic}}]},
1432+
IgnoreHeadersAndTopic = #if_data_matches{
1433+
pattern = #exchange{type = '$1', _ = '_'},
1434+
conditions = [{'andalso',
1435+
{'=/=', '$1', headers},
1436+
{'=/=', '$1', topic}}]},
14431437
PathPattern = rabbit_db_binding:khepri_route_path(
14441438
_VHost = ?KHEPRI_WILDCARD_STAR,
1445-
_Exchange = DirectOrFanout,
1439+
_Exchange = IgnoreHeadersAndTopic,
14461440
_Kind = ?KHEPRI_WILDCARD_STAR,
14471441
_DstName = ?KHEPRI_WILDCARD_STAR,
14481442
_RoutingKey = ?KHEPRI_WILDCARD_STAR),

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1414

1515
-compile([nowarn_export_all, export_all]).
16-
-compile(export_all).
1716

1817
suite() ->
1918
[{timetrap, 5 * 60000}].
@@ -37,7 +36,6 @@ groups() ->
3736
all_tests() ->
3837
[
3938
%% Queue bindings
40-
binding_args,
4139
bind_and_unbind,
4240
bind_and_delete,
4341
bind_and_delete_source_exchange,
@@ -50,6 +48,7 @@ all_tests() ->
5048
list_with_multiple_vhosts,
5149
list_with_multiple_arguments,
5250
bind_to_unknown_queue,
51+
binding_args,
5352
%% Exchange bindings
5453
bind_and_unbind_exchange,
5554
bind_and_delete_exchange_source,
@@ -118,55 +117,6 @@ end_per_testcase(Testcase, Config) ->
118117
%% Testcases.
119118
%% -------------------------------------------------------------------
120119

121-
binding_args(Config) ->
122-
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
123-
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
124-
Q = ?config(queue_name, Config),
125-
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
126-
127-
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
128-
amqp_channel:register_confirm_handler(Ch, self()),
129-
130-
%% Create two bindings that differ only in their binding arguments.
131-
Exchange = <<"amq.direct">>,
132-
RoutingKey = <<"some-key">>,
133-
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
134-
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
135-
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
136-
routing_key = RoutingKey,
137-
queue = Q,
138-
arguments = BindingArgs1}),
139-
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
140-
routing_key = RoutingKey,
141-
queue = Q,
142-
arguments = BindingArgs2}),
143-
ok = amqp_channel:cast(Ch,
144-
#'basic.publish'{exchange = Exchange,
145-
routing_key = RoutingKey},
146-
#amqp_msg{payload = <<"m1">>}),
147-
receive #'basic.ack'{} -> ok
148-
after 9000 -> ct:fail(confirm_timeout)
149-
end,
150-
151-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
152-
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),
153-
154-
%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
155-
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
156-
routing_key = RoutingKey,
157-
queue = Q,
158-
arguments = BindingArgs1}),
159-
ok = amqp_channel:cast(Ch,
160-
#'basic.publish'{exchange = Exchange,
161-
routing_key = RoutingKey},
162-
#amqp_msg{payload = <<"m2">>}),
163-
receive #'basic.ack'{} -> ok
164-
after 9000 -> ct:fail(confirm_timeout)
165-
end,
166-
167-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
168-
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).
169-
170120
bind_and_unbind(Config) ->
171121
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
172122

@@ -748,6 +698,60 @@ bind_to_unknown_queue(Config) ->
748698
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
749699
ok.
750700

701+
binding_args(Config) ->
702+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
703+
{skip, _} = Skip ->
704+
Skip;
705+
ok ->
706+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
707+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
708+
Q = ?config(queue_name, Config),
709+
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
710+
711+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
712+
amqp_channel:register_confirm_handler(Ch, self()),
713+
714+
%% Create two bindings that differ only in their binding arguments.
715+
Exchange = <<"amq.direct">>,
716+
RoutingKey = <<"some-key">>,
717+
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
718+
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
719+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
720+
routing_key = RoutingKey,
721+
queue = Q,
722+
arguments = BindingArgs1}),
723+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
724+
routing_key = RoutingKey,
725+
queue = Q,
726+
arguments = BindingArgs2}),
727+
ok = amqp_channel:cast(Ch,
728+
#'basic.publish'{exchange = Exchange,
729+
routing_key = RoutingKey},
730+
#amqp_msg{payload = <<"m1">>}),
731+
receive #'basic.ack'{} -> ok
732+
after 9000 -> ct:fail(confirm_timeout)
733+
end,
734+
735+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
736+
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),
737+
738+
%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
739+
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
740+
routing_key = RoutingKey,
741+
queue = Q,
742+
arguments = BindingArgs1}),
743+
ok = amqp_channel:cast(Ch,
744+
#'basic.publish'{exchange = Exchange,
745+
routing_key = RoutingKey},
746+
#amqp_msg{payload = <<"m2">>}),
747+
receive #'basic.ack'{} -> ok
748+
after 9000 -> ct:fail(confirm_timeout)
749+
end,
750+
751+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
752+
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true}))
753+
end.
754+
751755
bind_and_unbind_exchange(Config) ->
752756
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
753757

0 commit comments

Comments
 (0)