@@ -51,6 +51,9 @@ groups() ->
5151 roundtrip_with_drain_classic_queue ,
5252 roundtrip_with_drain_quorum_queue ,
5353 roundtrip_with_drain_stream ,
54+ drain_many_classic_queue ,
55+ drain_many_quorum_queue ,
56+ drain_many_stream ,
5457 amqp_stream_amqpl ,
5558 amqp_quorum_queue_amqpl ,
5659 message_headers_conversion ,
@@ -106,6 +109,7 @@ groups() ->
106109 dead_letter_headers_exchange ,
107110 dead_letter_reject ,
108111 immutable_bare_message ,
112+ receive_many_made_available_over_time ,
109113 incoming_window_closed_transfer_flow_order ,
110114 incoming_window_closed_close_link
111115 ]},
@@ -171,6 +175,7 @@ end_per_group(_, Config) ->
171175init_per_testcase (T , Config )
172176 when T =:= message_headers_conversion orelse
173177 T =:= roundtrip_with_drain_quorum_queue orelse
178+ T =:= drain_many_quorum_queue orelse
174179 T =:= timed_get_quorum_queue orelse
175180 T =:= available_messages_quorum_queue ->
176181 case rpc (Config , rabbit_feature_flags , is_enabled , [credit_api_v2 ]) of
@@ -646,6 +651,69 @@ roundtrip_with_drain(Config, QueueType, QName)
646651 ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
647652 ok = amqp10_client :close_connection (Connection ).
648653
654+ drain_many_classic_queue (Config ) ->
655+ QName = atom_to_binary (? FUNCTION_NAME ),
656+ drain_many (Config , <<" classic" >>, QName ).
657+
658+ drain_many_quorum_queue (Config ) ->
659+ QName = atom_to_binary (? FUNCTION_NAME ),
660+ drain_many (Config , <<" quorum" >>, QName ).
661+
662+ drain_many_stream (Config ) ->
663+ QName = atom_to_binary (? FUNCTION_NAME ),
664+ drain_many (Config , <<" stream" >>, QName ).
665+
666+ drain_many (Config , QueueType , QName )
667+ when is_binary (QueueType ) ->
668+ Address = <<" /queue/" , QName /binary >>,
669+ {Connection , Session , LinkPair } = init (Config ),
670+ QProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QueueType }}},
671+ {ok , #{type := QueueType }} = rabbitmq_amqp_client :declare_queue (LinkPair , QName , QProps ),
672+ {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" test-sender" >>, Address ),
673+ wait_for_credit (Sender ),
674+
675+ Num = 5000 ,
676+ ok = send_messages (Sender , Num , false ),
677+ ok = wait_for_accepts (Num ),
678+
679+ TerminusDurability = none ,
680+ Filter = consume_from_first (QueueType ),
681+ {ok , Receiver } = amqp10_client :attach_receiver_link (
682+ Session , <<" test-receiver" >>, Address , settled ,
683+ TerminusDurability , Filter ),
684+
685+ ok = amqp10_client :flow_link_credit (Receiver , Num - 1 , never , true ),
686+ ? assertEqual (Num - 1 , count_received_messages (Receiver )),
687+ flush (" drained 1" ),
688+
689+ ok = amqp10_client :flow_link_credit (Receiver , Num , never , true ),
690+ ? assertEqual (1 , count_received_messages (Receiver )),
691+ flush (" drained 2" ),
692+
693+ ok = send_messages (Sender , Num , false ),
694+ ok = wait_for_accepts (Num ),
695+ receive Unexpected -> ct :fail ({unexpected , Unexpected })
696+ after 10 -> ok
697+ end ,
698+ % % Let's send 2 FLOW frames in sequence.
699+ ok = amqp10_client :flow_link_credit (Receiver , Num , never , true ),
700+ ok = amqp10_client :flow_link_credit (Receiver , Num , never , true ),
701+ ? assertEqual (Num , count_received_messages (Receiver )),
702+ flush (" drained 3" ),
703+
704+ ok = send_messages (Sender , 1 , false ),
705+ ok = wait_for_accepts (1 ),
706+ % % Our receiver shouldn't have any credit left to consume the last message.
707+ receive {amqp10_msg , _ , _ } -> ct :fail (unexpected_delivery )
708+ after 30 -> ok
709+ end ,
710+
711+ ok = amqp10_client :detach_link (Sender ),
712+ ok = amqp10_client :detach_link (Receiver ),
713+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
714+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
715+ ok = amqp10_client :close_connection (Connection ).
716+
649717amqp_stream_amqpl (Config ) ->
650718 amqp_amqpl (<<" stream" >>, Config ).
651719
@@ -3856,6 +3924,53 @@ footer_checksum(FooterOpt, Config) ->
38563924 ok = end_session_sync (Session ),
38573925 ok = amqp10_client :close_connection (Connection ).
38583926
3927+ % % This test grants many credits to the queue once while
3928+ % % messages are being made available at the source over time.
3929+ receive_many_made_available_over_time (Config ) ->
3930+ QName = atom_to_binary (? FUNCTION_NAME ),
3931+ QType = <<" quorum" >>,
3932+ Address = <<" /queue/" , QName /binary >>,
3933+ {Connection , Session , LinkPair } = init (Config ),
3934+ QProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QType }}},
3935+ {ok , #{type := QType }} = rabbitmq_amqp_client :declare_queue (LinkPair , QName , QProps ),
3936+ {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" test-sender" >>, Address ),
3937+ wait_for_credit (Sender ),
3938+
3939+ % % Send first batch of messages.
3940+ ok = send_messages (Sender , 10 , false ),
3941+ ok = wait_for_accepts (10 ),
3942+ {ok , Receiver } = amqp10_client :attach_receiver_link (
3943+ Session , <<" receiver" >>, Address , settled ),
3944+ flush (attached ),
3945+ % % Grant many credits to the queue once.
3946+ ok = amqp10_client :flow_link_credit (Receiver , 5000 , never ),
3947+ % % We expect to receive the first batch of messages.
3948+ ? assertEqual (10 , count_received_messages (Receiver )),
3949+
3950+ % % Make next batch of messages available.
3951+ ok = send_messages (Sender , 2990 , false ),
3952+ ok = wait_for_accepts (2990 ),
3953+ % % We expect to receive this batch of messages.
3954+ ? assertEqual (2990 , count_received_messages (Receiver )),
3955+
3956+ % % Make next batch of messages available.
3957+ ok = send_messages (Sender , 1999 , false ),
3958+ ok = wait_for_accepts (1999 ),
3959+ % % We expect to receive this batch of messages.
3960+ ? assertEqual (1999 , count_received_messages (Receiver )),
3961+
3962+ % % Make next batch of messages available.
3963+ ok = send_messages (Sender , 2 , false ),
3964+ ok = wait_for_accepts (2 ),
3965+ % % At this point, we only have 2 messages in the queue, but only 1 credit left.
3966+ ? assertEqual (1 , count_received_messages (Receiver )),
3967+
3968+ ok = amqp10_client :detach_link (Sender ),
3969+ ok = amqp10_client :detach_link (Receiver ),
3970+ {ok , #{message_count := 1 }} = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
3971+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
3972+ ok = amqp10_client :close_connection (Connection ).
3973+
38593974% % This test ensures that the server sends us TRANSFER and FLOW frames in the correct order
38603975% % even if the server is temporarily not allowed to send us any TRANSFERs due to our session
38613976% % incoming-window being closed.
@@ -4126,7 +4241,7 @@ count_received_messages0(Receiver, Count) ->
41264241 receive
41274242 {amqp10_msg , Receiver , _Msg } ->
41284243 count_received_messages0 (Receiver , Count + 1 )
4129- after 200 ->
4244+ after 1000 ->
41304245 Count
41314246 end .
41324247
0 commit comments