@@ -52,6 +52,7 @@ groups() ->
5252 bad_exchange_property ,
5353 bad_exchange_type ,
5454 get_queue_not_found ,
55+ declare_queues_concurrently ,
5556 declare_queue_default_queue_type ,
5657 declare_queue_empty_name ,
5758 declare_queue_line_feed ,
@@ -436,6 +437,40 @@ get_queue_not_found(Config) ->
436437 amqp10_msg :body (Resp )),
437438 ok = cleanup (Init ).
438439
440+ declare_queues_concurrently (Config ) ->
441+ NumQueues = 5 ,
442+ {Pid1 , Ref1 } = spawn_monitor (? MODULE , declare_queues , [Config , NumQueues ]),
443+ {Pid2 , Ref2 } = spawn_monitor (? MODULE , declare_queues , [Config , NumQueues ]),
444+ receive {'DOWN' , Ref1 , process , Pid1 , Reason1 } ->
445+ ? assertEqual (normal , Reason1 )
446+ end ,
447+ receive {'DOWN' , Ref2 , process , Pid2 , Reason2 } ->
448+ ? assertEqual (normal , Reason2 )
449+ end ,
450+
451+ ? assertEqual (NumQueues , count_queues (Config )),
452+
453+ Init = {_ , LinkPair } = init (Config ),
454+ lists :foreach (fun (N ) ->
455+ Bin = integer_to_binary (N ),
456+ QName = <<" queue-" , Bin /binary >>,
457+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName )
458+ end , lists :seq (1 , NumQueues )),
459+ ok = cleanup (Init ).
460+
461+ declare_queues (Config , Num ) ->
462+ Init = {_ , LinkPair } = init (Config ),
463+ ok = declare_queues0 (LinkPair , Num ),
464+ ok = cleanup (Init ).
465+
466+ declare_queues0 (_LinkPair , 0 ) ->
467+ ok ;
468+ declare_queues0 (LinkPair , Left ) ->
469+ Bin = integer_to_binary (Left ),
470+ QName = <<" queue-" , Bin /binary >>,
471+ ? assertMatch ({ok , _ }, rabbitmq_amqp_client :declare_queue (LinkPair , QName , #{})),
472+ declare_queues0 (LinkPair , Left - 1 ).
473+
439474declare_queue_default_queue_type (Config ) ->
440475 Node = get_node_config (Config , 0 , nodename ),
441476 Vhost = QName = atom_to_binary (? FUNCTION_NAME ),
@@ -864,11 +899,11 @@ pipeline(Config) ->
864899 % % because RabbitMQ grants us 8 link credits initially.
865900 Num = 8 ,
866901 pipeline0 (Num , LinkPair , <<" PUT" >>, {map , []}),
867- eventually (? _assertEqual (Num , rpc (Config , rabbit_amqqueue , count , [] )), 200 , 20 ),
902+ eventually (? _assertEqual (Num , count_queues (Config )), 200 , 20 ),
868903 flush (queues_created ),
869904
870905 pipeline0 (Num , LinkPair , <<" DELETE" >>, null ),
871- eventually (? _assertEqual (0 , rpc (Config , rabbit_amqqueue , count , [] )), 200 , 20 ),
906+ eventually (? _assertEqual (0 , count_queues (Config )), 200 , 20 ),
872907 flush (queues_deleted ),
873908
874909 ok = cleanup (Init ).
@@ -1120,3 +1155,6 @@ gen_server_state(Pid) ->
11201155 L1 = lists :last (L0 ),
11211156 {data , L2 } = lists :last (L1 ),
11221157 proplists :get_value (" State" , L2 ).
1158+
1159+ count_queues (Config ) ->
1160+ rpc (Config , rabbit_amqqueue , count , []).
0 commit comments