3636-export ([init /1 , terminate /2 , code_change /3 , handle_call /3 , handle_cast /2 ,
3737 handle_info /2 ]).
3838-export ([start_link /1 , shutdown /1 ]).
39- -export ([limit /2 , can_send /2 , ack /2 , register /2 , unregister /2 ]).
39+ -export ([limit /2 , can_send /3 , ack /2 , register /2 , unregister /2 ]).
4040
4141% %----------------------------------------------------------------------------
4242
4747-spec (start_link / 1 :: (pid ()) -> pid ()).
4848-spec (shutdown / 1 :: (maybe_pid ()) -> 'ok' ).
4949-spec (limit / 2 :: (maybe_pid (), non_neg_integer ()) -> 'ok' ).
50- -spec (can_send / 2 :: (maybe_pid (), pid ()) -> bool ()).
50+ -spec (can_send / 3 :: (maybe_pid (), pid (), bool ()) -> bool ()).
5151-spec (ack / 2 :: (maybe_pid (), non_neg_integer ()) -> 'ok' ).
5252-spec (register / 2 :: (maybe_pid (), pid ()) -> 'ok' ).
5353-spec (unregister / 2 :: (maybe_pid (), pid ()) -> 'ok' ).
@@ -85,12 +85,12 @@ limit(LimiterPid, PrefetchCount) ->
8585
8686% % Ask the limiter whether the queue can deliver a message without
8787% % breaching a limit
88- can_send (undefined , _QPid ) ->
88+ can_send (undefined , _QPid , _AckRequired ) ->
8989 true ;
90- can_send (LimiterPid , QPid ) ->
90+ can_send (LimiterPid , QPid , AckRequired ) ->
9191 rabbit_misc :with_exit_handler (
9292 fun () -> true end ,
93- fun () -> gen_server2 :call (LimiterPid , {can_send , QPid }, infinity ) end ).
93+ fun () -> gen_server2 :call (LimiterPid , {can_send , QPid , AckRequired }, infinity ) end ).
9494
9595% % Let the limiter know that the channel has received some acks from a
9696% % consumer
@@ -110,10 +110,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
110110init ([ChPid ]) ->
111111 {ok , # lim {ch_pid = ChPid } }.
112112
113- handle_call ({can_send , QPid }, _From , State = # lim {volume = Volume }) ->
113+ handle_call ({can_send , QPid , AckRequired }, _From , State = # lim {volume = Volume }) ->
114+ Volume1 = if AckRequired -> Volume + 1 ;
115+ true -> Volume
116+ end ,
114117 case limit_reached (State ) of
115118 true -> {reply , false , limit_queue (QPid , State )};
116- false -> {reply , true , State # lim {volume = Volume + 1 }}
119+ false -> {reply , true , State # lim {volume = Volume1 }}
117120 end .
118121
119122handle_cast (shutdown , State ) ->
0 commit comments