2626 set_ram_duration_target /2 , ram_duration /1 , needs_timeout /1 , timeout /1 ,
2727 handle_pre_hibernate /1 , resume /1 , msg_rates /1 ,
2828 info /2 , invoke /3 , is_duplicate /2 , set_queue_mode /2 ,
29- zip_msgs_and_acks /4 , multiple_routing_keys /0 ]).
29+ zip_msgs_and_acks /4 , multiple_routing_keys /0 , handle_info / 2 ]).
3030
3131-export ([start /2 , stop /1 ]).
3232
325325 memory_reduction_run_count ,
326326 % % Queue data is grouped by VHost. We need to store it
327327 % % to work with queue index.
328- virtual_host
328+ virtual_host ,
329+ waiting_bump = false
329330 }).
330331
331332-record (rates , { in , out , ack_in , ack_out , timestamp }).
@@ -911,6 +912,9 @@ timeout(State = #vqstate { index_state = IndexState }) ->
911912handle_pre_hibernate (State = # vqstate { index_state = IndexState }) ->
912913 State # vqstate { index_state = rabbit_queue_index :flush (IndexState ) }.
913914
915+ handle_info (bump_reduce_memory_use , State = # vqstate { waiting_bump = true }) ->
916+ State # vqstate { waiting_bump = false }.
917+
914918resume (State ) -> a (reduce_memory_use (State )).
915919
916920msg_rates (# vqstate { rates = # rates { in = AvgIngressRate ,
@@ -2466,21 +2470,16 @@ reduce_memory_use(State = #vqstate {
24662470 Blocked = credit_flow :blocked (),
24672471 case {Blocked , NeedResumeA2B orelse NeedResumeB2D } of
24682472 % % Credit bump will continue paging
2469- {true , _ } -> ok ;
2473+ {true , _ } -> State3 ;
24702474 % % Finished with paging
2471- {false , false } -> ok ;
2475+ {false , false } -> State3 ;
24722476 % % Planning next batch
24732477 {false , true } ->
24742478 % % We don't want to use self-credit-flow, because it's harder to
24752479 % % reason about. So the process sends a (prioritised) message to
24762480 % % itself and sets a waiting_bump value to keep the message box clean
2477- case get (waiting_bump ) of
2478- true -> ok ;
2479- _ -> self () ! bump_reduce_memory_use ,
2480- put (waiting_bump , true )
2481- end
2482- end ,
2483- State3 ;
2481+ maybe_bump_reduce_memory_use (State3 )
2482+ end ;
24842483% % When using lazy queues, there are no alphas, so we don't need to
24852484% % call push_alphas_to_betas/2.
24862485reduce_memory_use (State = # vqstate {
@@ -2506,6 +2505,12 @@ reduce_memory_use(State = #vqstate {
25062505 garbage_collect (),
25072506 State3 .
25082507
2508+ maybe_bump_reduce_memory_use (State = # vqstate { waiting_bump = true }) ->
2509+ State ;
2510+ maybe_bump_reduce_memory_use (State ) ->
2511+ self () ! bump_reduce_memory_use ,
2512+ State # vqstate { waiting_bump = true }.
2513+
25092514limit_ram_acks (0 , State ) ->
25102515 {0 , ui (State )};
25112516limit_ram_acks (Quota , State = # vqstate { ram_pending_ack = RPA ,
0 commit comments