2828-import (quorum_queue_SUITE , [publish /2 ,
2929 consume /3 ]).
3030
31+ -define (DEFAULT_WAIT , 1000 ).
32+ -define (DEFAULT_INTERVAL , 200 ).
33+
3134-compile ([nowarn_export_all , export_all ]).
3235
3336all () ->
@@ -804,10 +807,12 @@ many_target_queues(Config) ->
804807 # 'basic.publish' {routing_key = SourceQ },
805808 # amqp_msg {props = # 'P_basic' {expiration = <<" 5" >>},
806809 payload = Msg1 }),
807- eventually (? _assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
808- amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ1 }))),
809- eventually (? _assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
810- amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ2 }))),
810+ ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
811+ amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ1 }),
812+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
813+ ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
814+ amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ2 }),
815+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
811816 % % basic.get not supported by stream queues
812817 # 'basic.qos_ok' {} = amqp_channel :call (Ch , # 'basic.qos' {prefetch_count = 2 }),
813818 CTag = <<" ctag" >>,
@@ -830,14 +835,18 @@ many_target_queues(Config) ->
830835 after 2000 ->
831836 exit (deliver_timeout )
832837 end ,
833- eventually (? _assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
834- amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ4 }))),
835- eventually (? _assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
836- amqp_channel :call (Ch2 , # 'basic.get' {queue = TargetQ5 }))),
837- eventually (? _assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
838- amqp_channel :call (Ch2 , # 'basic.get' {queue = TargetQ6 }))),
839- eventually (? _assertEqual ([{0 , 0 }],
840- dirty_query ([Server1 ], RaName , fun rabbit_fifo :query_stat_dlx /1 ))),
838+ ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
839+ amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ4 }),
840+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
841+ ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
842+ amqp_channel :call (Ch2 , # 'basic.get' {queue = TargetQ5 }),
843+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
844+ ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg1 }},
845+ amqp_channel :call (Ch2 , # 'basic.get' {queue = TargetQ6 }),
846+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
847+ ? awaitMatch ([{0 , 0 }],
848+ dirty_query ([Server1 ], RaName , fun rabbit_fifo :query_stat_dlx /1 ),
849+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
841850 ok = rabbit_ct_broker_helpers :stop_node (Config , Server3 ),
842851 ok = rabbit_ct_broker_helpers :stop_node (Config , Server2 ),
843852 Msg2 = <<" m2" >>,
@@ -848,32 +857,39 @@ many_target_queues(Config) ->
848857 % % Nodes 2 and 3 are down.
849858 % % rabbit_fifo_dlx_worker should wait until all queues confirm the message
850859 % % before acking it to the source queue.
851- eventually (? _assertEqual ([{1 , 2 }],
852- dirty_query ([Server1 ], RaName , fun rabbit_fifo :query_stat_dlx /1 ))),
853- consistently (? _assertEqual ([{1 , 2 }],
854- dirty_query ([Server1 ], RaName , fun rabbit_fifo :query_stat_dlx /1 ))),
860+ ? awaitMatch ([{1 , 2 }],
861+ dirty_query ([Server1 ], RaName , fun rabbit_fifo :query_stat_dlx /1 ),
862+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
863+ timer :sleep (1000 ),
864+ ? assertEqual ([{1 , 2 }],
865+ dirty_query ([Server1 ], RaName , fun rabbit_fifo :query_stat_dlx /1 )),
855866 ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg2 }},
856867 amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ1 })),
857868 ok = rabbit_ct_broker_helpers :start_node (Config , Server2 ),
858869 ok = rabbit_ct_broker_helpers :start_node (Config , Server3 ),
859- eventually (? _assertEqual ([{0 , 0 }],
860- dirty_query ([Server1 ], RaName , fun rabbit_fifo :query_stat_dlx /1 )), 500 , 6 ),
861- ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg2 }},
862- amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ2 })),
870+ ? awaitMatch ([{0 , 0 }],
871+ dirty_query ([Server1 ], RaName , fun rabbit_fifo :query_stat_dlx /1 ),
872+ 3000 , 500 ),
873+ ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg2 }},
874+ amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ2 }),
875+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
863876 receive
864877 {# 'basic.deliver' {consumer_tag = CTag },
865878 # amqp_msg {payload = Msg2 }} ->
866879 ok
867880 after 0 ->
868881 exit (deliver_timeout )
869882 end ,
870- ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg2 }},
871- amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ4 })),
872- eventually (? _assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg2 }},
873- amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ5 }))),
883+ ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg2 }},
884+ amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ4 }),
885+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
886+ ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg2 }},
887+ amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ5 }),
888+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
874889 % %TODO why is the 1st message (m1) a duplicate?
875890 ? awaitMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = Msg2 }},
876- amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ6 }), 2 , 200 ),
891+ amqp_channel :call (Ch , # 'basic.get' {queue = TargetQ6 }),
892+ ? DEFAULT_WAIT , ? DEFAULT_INTERVAL ),
877893 ? assertEqual (2 , counted (messages_dead_lettered_expired_total , Config )),
878894 ? assertEqual (2 , counted (messages_dead_lettered_confirmed_total , Config )).
879895
0 commit comments