@@ -52,6 +52,7 @@ groups() ->
5252 bad_exchange_property ,
5353 bad_exchange_type ,
5454 get_queue_not_found ,
55+ declare_queues_concurrently ,
5556 declare_queue_default_queue_type ,
5657 declare_queue_empty_name ,
5758 declare_queue_line_feed ,
@@ -432,6 +433,40 @@ get_queue_not_found(Config) ->
432433 amqp10_msg :body (Resp )),
433434 ok = cleanup (Init ).
434435
436+ declare_queues_concurrently (Config ) ->
437+ NumQueues = 5 ,
438+ {Pid1 , Ref1 } = spawn_monitor (? MODULE , declare_queues , [Config , NumQueues ]),
439+ {Pid2 , Ref2 } = spawn_monitor (? MODULE , declare_queues , [Config , NumQueues ]),
440+ receive {'DOWN' , Ref1 , process , Pid1 , Reason1 } ->
441+ ? assertEqual (normal , Reason1 )
442+ end ,
443+ receive {'DOWN' , Ref2 , process , Pid2 , Reason2 } ->
444+ ? assertEqual (normal , Reason2 )
445+ end ,
446+
447+ ? assertEqual (NumQueues , count_queues (Config )),
448+
449+ Init = {_ , LinkPair } = init (Config ),
450+ lists :foreach (fun (N ) ->
451+ Bin = integer_to_binary (N ),
452+ QName = <<" queue-" , Bin /binary >>,
453+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName )
454+ end , lists :seq (1 , NumQueues )),
455+ ok = cleanup (Init ).
456+
457+ declare_queues (Config , Num ) ->
458+ Init = {_ , LinkPair } = init (Config ),
459+ ok = declare_queues0 (LinkPair , Num ),
460+ ok = cleanup (Init ).
461+
462+ declare_queues0 (_LinkPair , 0 ) ->
463+ ok ;
464+ declare_queues0 (LinkPair , Left ) ->
465+ Bin = integer_to_binary (Left ),
466+ QName = <<" queue-" , Bin /binary >>,
467+ ? assertMatch ({ok , _ }, rabbitmq_amqp_client :declare_queue (LinkPair , QName , #{})),
468+ declare_queues0 (LinkPair , Left - 1 ).
469+
435470declare_queue_default_queue_type (Config ) ->
436471 Node = get_node_config (Config , 0 , nodename ),
437472 Vhost = QName = atom_to_binary (? FUNCTION_NAME ),
@@ -859,11 +894,11 @@ pipeline(Config) ->
859894 % % because RabbitMQ grants us 8 link credits initially.
860895 Num = 8 ,
861896 pipeline0 (Num , LinkPair , <<" PUT" >>, {map , []}),
862- eventually (? _assertEqual (Num , rpc (Config , rabbit_amqqueue , count , [] )), 200 , 20 ),
897+ eventually (? _assertEqual (Num , count_queues (Config )), 200 , 20 ),
863898 flush (queues_created ),
864899
865900 pipeline0 (Num , LinkPair , <<" DELETE" >>, null ),
866- eventually (? _assertEqual (0 , rpc (Config , rabbit_amqqueue , count , [] )), 200 , 20 ),
901+ eventually (? _assertEqual (0 , count_queues (Config )), 200 , 20 ),
867902 flush (queues_deleted ),
868903
869904 ok = cleanup (Init ).
@@ -1115,3 +1150,6 @@ gen_server_state(Pid) ->
11151150 L1 = lists :last (L0 ),
11161151 {data , L2 } = lists :last (L1 ),
11171152 proplists :get_value (" State" , L2 ).
1153+
1154+ count_queues (Config ) ->
1155+ rpc (Config , rabbit_amqqueue , count , []).
0 commit comments