Skip to content

Commit 03b7e90

Browse files
Merge pull request #4272 from rabbitmq/rabbit_fifo_dlx_integration_SUITE-flake
Adjust assertions in an effort to reduce flakes
2 parents 658d5cf + 8ccf741 commit 03b7e90

File tree

1 file changed

+41
-25
lines changed

1 file changed

+41
-25
lines changed

deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
-import(quorum_queue_SUITE, [publish/2,
2929
consume/3]).
3030

31+
-define(DEFAULT_WAIT, 1000).
32+
-define(DEFAULT_INTERVAL, 200).
33+
3134
-compile([nowarn_export_all, export_all]).
3235

3336
all() ->
@@ -804,10 +807,12 @@ many_target_queues(Config) ->
804807
#'basic.publish'{routing_key = SourceQ},
805808
#amqp_msg{props = #'P_basic'{expiration = <<"5">>},
806809
payload = Msg1}),
807-
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
808-
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ1}))),
809-
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
810-
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ2}))),
810+
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
811+
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ1}),
812+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
813+
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
814+
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ2}),
815+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
811816
%% basic.get not supported by stream queues
812817
#'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{prefetch_count = 2}),
813818
CTag = <<"ctag">>,
@@ -830,14 +835,18 @@ many_target_queues(Config) ->
830835
after 2000 ->
831836
exit(deliver_timeout)
832837
end,
833-
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
834-
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ4}))),
835-
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
836-
amqp_channel:call(Ch2, #'basic.get'{queue = TargetQ5}))),
837-
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
838-
amqp_channel:call(Ch2, #'basic.get'{queue = TargetQ6}))),
839-
eventually(?_assertEqual([{0, 0}],
840-
dirty_query([Server1], RaName, fun rabbit_fifo:query_stat_dlx/1))),
838+
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
839+
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ4}),
840+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
841+
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
842+
amqp_channel:call(Ch2, #'basic.get'{queue = TargetQ5}),
843+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
844+
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
845+
amqp_channel:call(Ch2, #'basic.get'{queue = TargetQ6}),
846+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
847+
?awaitMatch([{0, 0}],
848+
dirty_query([Server1], RaName, fun rabbit_fifo:query_stat_dlx/1),
849+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
841850
ok = rabbit_ct_broker_helpers:stop_node(Config, Server3),
842851
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
843852
Msg2 = <<"m2">>,
@@ -848,32 +857,39 @@ many_target_queues(Config) ->
848857
%% Nodes 2 and 3 are down.
849858
%% rabbit_fifo_dlx_worker should wait until all queues confirm the message
850859
%% before acking it to the source queue.
851-
eventually(?_assertEqual([{1, 2}],
852-
dirty_query([Server1], RaName, fun rabbit_fifo:query_stat_dlx/1))),
853-
consistently(?_assertEqual([{1, 2}],
854-
dirty_query([Server1], RaName, fun rabbit_fifo:query_stat_dlx/1))),
860+
?awaitMatch([{1, 2}],
861+
dirty_query([Server1], RaName, fun rabbit_fifo:query_stat_dlx/1),
862+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
863+
timer:sleep(1000),
864+
?assertEqual([{1, 2}],
865+
dirty_query([Server1], RaName, fun rabbit_fifo:query_stat_dlx/1)),
855866
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg2}},
856867
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ1})),
857868
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
858869
ok = rabbit_ct_broker_helpers:start_node(Config, Server3),
859-
eventually(?_assertEqual([{0, 0}],
860-
dirty_query([Server1], RaName, fun rabbit_fifo:query_stat_dlx/1)), 500, 6),
861-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg2}},
862-
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ2})),
870+
?awaitMatch([{0, 0}],
871+
dirty_query([Server1], RaName, fun rabbit_fifo:query_stat_dlx/1),
872+
3000, 500),
873+
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg2}},
874+
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ2}),
875+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
863876
receive
864877
{#'basic.deliver'{consumer_tag = CTag},
865878
#amqp_msg{payload = Msg2}} ->
866879
ok
867880
after 0 ->
868881
exit(deliver_timeout)
869882
end,
870-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg2}},
871-
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ4})),
872-
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg2}},
873-
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ5}))),
883+
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg2}},
884+
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ4}),
885+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
886+
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg2}},
887+
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ5}),
888+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
874889
%%TODO why is the 1st message (m1) a duplicate?
875890
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg2}},
876-
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ6}), 2, 200),
891+
amqp_channel:call(Ch, #'basic.get'{queue = TargetQ6}),
892+
?DEFAULT_WAIT, ?DEFAULT_INTERVAL),
877893
?assertEqual(2, counted(messages_dead_lettered_expired_total, Config)),
878894
?assertEqual(2, counted(messages_dead_lettered_confirmed_total, Config)).
879895

0 commit comments

Comments
 (0)