@@ -139,6 +139,8 @@ groups() ->
139139 dynamic_target_long_link_name ,
140140 dynamic_source_rpc ,
141141 dynamic_terminus_delete ,
142+ delivery_count_classic_queue ,
143+ delivery_count_quorum_queue ,
142144 modified_classic_queue ,
143145 modified_quorum_queue ,
144146 modified_dead_letter_headers_exchange ,
@@ -344,6 +346,82 @@ reliable_send_receive(QType, Outcome, Config) ->
344346 ok = end_session_sync (Session2 ),
345347 ok = close_connection_sync (Connection2 ).
346348
349+ delivery_count_classic_queue (Config ) ->
350+ delivery_count (<<" classic" >>, Config ).
351+
352+ delivery_count_quorum_queue (Config ) ->
353+ delivery_count (<<" quorum" >>, Config ).
354+
355+ delivery_count (QType , Config ) ->
356+ QName = <<" source queue" >>,
357+ QAddress = rabbitmq_amqp_address :queue (QName ),
358+ DLQName = <<" dead letter queue" >>,
359+ DLQAddress = rabbitmq_amqp_address :queue (DLQName ),
360+
361+ {_ , Session , LinkPair } = Init = init (Config ),
362+ QProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QType },
363+ <<" x-dead-letter-exchange" >> => {utf8 , <<>>},
364+ <<" x-dead-letter-routing-key" >> => {utf8 , DLQName }}},
365+ {ok , #{type := QType }} = rabbitmq_amqp_client :declare_queue (LinkPair , QName , QProps ),
366+
367+ DLQProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QType },
368+ <<" x-dead-letter-exchange" >> => {utf8 , <<>>},
369+ <<" x-dead-letter-routing-key" >> => {utf8 , QName }}},
370+ {ok , #{type := QType }} = rabbitmq_amqp_client :declare_queue (LinkPair , DLQName , DLQProps ),
371+
372+ {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" sender source queue" >>, QAddress ),
373+ ok = wait_for_credit (Sender ),
374+ M1 = amqp10_msg :new (<<" tag" >>, <<" msg" >>, true ),
375+ ok = amqp10_client :send_msg (Sender , M1 ),
376+ ok = detach_link_sync (Sender ),
377+
378+ {ok , QReceiver } = amqp10_client :attach_receiver_link (
379+ Session , <<" receiver source queue" >>, QAddress , unsettled ),
380+ {ok , DLQReceiver } = amqp10_client :attach_receiver_link (
381+ Session , <<" receiver dead letter queue" >>, DLQAddress , unsettled ),
382+
383+ {ok , M2 } = amqp10_client :get_msg (QReceiver ),
384+ ? assertMatch (#{delivery_count := 0 ,
385+ first_acquirer := true },
386+ amqp10_msg :headers (M2 )),
387+
388+ % % released coutcome must not increment the delivery-count
389+ ok = amqp10_client :settle_msg (QReceiver , M2 , released ),
390+ {ok , M3 } = amqp10_client :get_msg (QReceiver ),
391+ ? assertMatch (#{delivery_count := 0 ,
392+ first_acquirer := false },
393+ amqp10_msg :headers (M3 )),
394+
395+ % % delivery-failed=true in modified coutcome must increment the delivery-count
396+ ok = amqp10_client :settle_msg (QReceiver , M3 , {modified , true , false , #{}}),
397+ {ok , M4 } = amqp10_client :get_msg (QReceiver ),
398+ ? assertMatch (#{delivery_count := 1 ,
399+ first_acquirer := false },
400+ amqp10_msg :headers (M4 )),
401+
402+ % % delivery-failed=false in modified coutcome must not increment the delivery-count
403+ ok = amqp10_client :settle_msg (QReceiver , M4 , {modified , false , true , #{}}),
404+ {ok , M5 } = amqp10_client :get_msg (DLQReceiver ),
405+ ? assertMatch (#{delivery_count := 1 ,
406+ first_acquirer := false },
407+ amqp10_msg :headers (M5 )),
408+
409+ % % rejected coutcome must increment the delivery-count
410+ ok = amqp10_client :settle_msg (DLQReceiver , M5 , rejected ),
411+ {ok , M6 } = amqp10_client :get_msg (QReceiver ),
412+ ? assertMatch (#{delivery_count := 2 ,
413+ first_acquirer := false },
414+ amqp10_msg :headers (M6 )),
415+
416+ ok = amqp10_client :settle_msg (QReceiver , M6 , accepted ),
417+ ok = detach_link_sync (QReceiver ),
418+ ok = detach_link_sync (DLQReceiver ),
419+ ? assertMatch ({ok , #{message_count := 0 }},
420+ rabbitmq_amqp_client :delete_queue (LinkPair , QName )),
421+ ? assertMatch ({ok , #{message_count := 0 }},
422+ rabbitmq_amqp_client :delete_queue (LinkPair , DLQName )),
423+ ok = close (Init ).
424+
347425% % We test the modified outcome with classic queues.
348426% % We expect that classic queues implement field
349427% % * delivery-failed correctly
0 commit comments