Skip to content

Commit 4876315

Browse files
committed
Add exchange binding tests
Test that exchange bindings work correctly with the new projection tables `rabbit_khepri_route_by_source` and `rabbit_khepri_route_by_source_key`.
1 parent 54c8537 commit 4876315

File tree

1 file changed

+41
-11
lines changed

1 file changed

+41
-11
lines changed

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ all_tests() ->
5050
bind_to_unknown_queue,
5151
binding_args_direct_exchange,
5252
binding_args_fanout_exchange,
53+
5354
%% Exchange bindings
54-
bind_and_unbind_exchange,
55+
bind_and_unbind_direct_exchange,
56+
bind_and_unbind_fanout_exchange,
5557
bind_and_delete_exchange_source,
5658
bind_and_delete_exchange_destination,
5759
bind_to_unknown_exchange,
@@ -754,33 +756,61 @@ binding_args(Exchange, Config) ->
754756
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
755757
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).
756758

757-
bind_and_unbind_exchange(Config) ->
759+
bind_and_unbind_direct_exchange(Config) ->
760+
bind_and_unbind_exchange(<<"direct">>, Config).
761+
762+
bind_and_unbind_fanout_exchange(Config) ->
763+
bind_and_unbind_exchange(<<"fanout">>, Config).
764+
765+
bind_and_unbind_exchange(Type, Config) ->
758766
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
759767

760768
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
761769
X = ?config(exchange_name, Config),
770+
Q = ?config(queue_name, Config),
771+
RoutingKey = <<"some key">>,
772+
SourceExchange = <<"amq.", Type/binary>>,
762773

763774
?assertEqual([],
764775
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
765776

766-
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X}),
777+
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X,
778+
type = Type}),
767779
%% Let's bind to other exchange
768780
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = X,
769-
source = <<"amq.direct">>,
770-
routing_key = <<"key">>}),
781+
source = SourceExchange,
782+
routing_key = RoutingKey}),
771783

772-
DirectBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>),
773-
rabbit_misc:r(<<"/">>, exchange, X),
774-
<<"key">>, []),
784+
Binding = binding_record(rabbit_misc:r(<<"/">>, exchange, SourceExchange),
785+
rabbit_misc:r(<<"/">>, exchange, X),
786+
RoutingKey, []),
775787

776-
?assertEqual([DirectBinding],
788+
?assertEqual([Binding],
777789
lists:sort(
778790
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>]))),
779791

792+
%% Test that a message gets routed:
793+
%% exchange -> exchange -> queue
794+
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
795+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X,
796+
routing_key = RoutingKey,
797+
queue = Q}),
798+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
799+
amqp_channel:register_confirm_handler(Ch, self()),
800+
ok = amqp_channel:cast(Ch,
801+
#'basic.publish'{exchange = SourceExchange,
802+
routing_key = RoutingKey},
803+
#amqp_msg{payload = <<"m1">>}),
804+
receive #'basic.ack'{} -> ok
805+
after 9000 -> ct:fail(confirm_timeout)
806+
end,
807+
?assertEqual(#'queue.delete_ok'{message_count = 1},
808+
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
809+
780810
#'exchange.unbind_ok'{} = amqp_channel:call(Ch,
781811
#'exchange.unbind'{destination = X,
782-
source = <<"amq.direct">>,
783-
routing_key = <<"key">>}),
812+
source = SourceExchange,
813+
routing_key = RoutingKey}),
784814

785815
?assertEqual([],
786816
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),

0 commit comments

Comments
 (0)