3636-export ([init /1 , terminate /2 , code_change /3 , handle_call /3 , handle_cast /2 ,
3737 handle_info /2 ]).
3838-export ([start_link /1 , shutdown /1 ]).
39- -export ([limit /2 , can_send /2 , ack /2 , register /2 , unregister /2 ]).
39+ -export ([limit /2 , can_send /3 , ack /2 , register /2 , unregister /2 ]).
4040
4141% %----------------------------------------------------------------------------
4242
4747-spec (start_link / 1 :: (pid ()) -> pid ()).
4848-spec (shutdown / 1 :: (maybe_pid ()) -> 'ok' ).
4949-spec (limit / 2 :: (maybe_pid (), non_neg_integer ()) -> 'ok' ).
50- -spec (can_send / 2 :: (maybe_pid (), pid ()) -> bool ()).
50+ -spec (can_send / 3 :: (maybe_pid (), pid (), bool ()) -> bool ()).
5151-spec (ack / 2 :: (maybe_pid (), non_neg_integer ()) -> 'ok' ).
5252-spec (register / 2 :: (maybe_pid (), pid ()) -> 'ok' ).
5353-spec (unregister / 2 :: (maybe_pid (), pid ()) -> 'ok' ).
@@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) ->
8585
8686% % Ask the limiter whether the queue can deliver a message without
8787% % breaching a limit
88- can_send (undefined , _QPid ) ->
88+ can_send (undefined , _QPid , _AckRequired ) ->
8989 true ;
90- can_send (LimiterPid , QPid ) ->
90+ can_send (LimiterPid , QPid , AckRequired ) ->
9191 rabbit_misc :with_exit_handler (
9292 fun () -> true end ,
93- fun () -> gen_server2 :call (LimiterPid , {can_send , QPid }, infinity ) end ).
93+ fun () -> gen_server2 :call (LimiterPid , {can_send , QPid , AckRequired },
94+ infinity ) end ).
9495
9596% % Let the limiter know that the channel has received some acks from a
9697% % consumer
@@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
110111init ([ChPid ]) ->
111112 {ok , # lim {ch_pid = ChPid } }.
112113
113- handle_call ({can_send , QPid }, _From , State = # lim {volume = Volume }) ->
114+ handle_call ({can_send , QPid , AckRequired }, _From ,
115+ State = # lim {volume = Volume }) ->
114116 case limit_reached (State ) of
115117 true -> {reply , false , limit_queue (QPid , State )};
116- false -> {reply , true , State # lim {volume = Volume + 1 }}
118+ false -> {reply , true , State # lim {volume = if AckRequired -> Volume + 1 ;
119+ true -> Volume
120+ end }}
117121 end .
118122
119123handle_cast (shutdown , State ) ->
0 commit comments