@@ -2363,45 +2363,79 @@ reduce_memory_use(State = #vqstate {
23632363 out = AvgEgress ,
23642364 ack_in = AvgAckIngress ,
23652365 ack_out = AvgAckEgress } }) ->
2366- State1 = # vqstate { q2 = Q2 , q3 = Q3 } =
2366+ {CreditDiscBound , _ } = rabbit_misc :get_env (rabbit ,
2367+ msg_store_credit_disc_bound ,
2368+ ? CREDIT_DISC_BOUND ),
2369+ {NeedResumeA2B , State1 } = {_ , # vqstate { q2 = Q2 , q3 = Q3 }} =
23672370 case chunk_size (RamMsgCount + gb_trees :size (RPA ), TargetRamCount ) of
2368- 0 -> State ;
2371+ 0 -> { false , State } ;
23692372 % % Reduce memory of pending acks and alphas. The order is
23702373 % % determined based on which is growing faster. Whichever
23712374 % % comes second may very well get a quota of 0 if the
23722375 % % first manages to push out the max number of messages.
2373- S1 -> Funs = case ((AvgAckIngress - AvgAckEgress ) >
2376+ A2BChunk ->
2377+ % % In case there are few messages to be sent to a message store
2378+ % % and many messages to be embedded to the queue index,
2379+ % % we should limit the number of messages to be flushed
2380+ % % to avoid blocking the process.
2381+ A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
2382+ true -> CreditDiscBound * 2 ;
2383+ false -> A2BChunk
2384+ end ,
2385+ Funs = case ((AvgAckIngress - AvgAckEgress ) >
23742386 (AvgIngress - AvgEgress )) of
23752387 true -> [fun limit_ram_acks /2 ,
23762388 fun push_alphas_to_betas /2 ];
23772389 false -> [fun push_alphas_to_betas /2 ,
23782390 fun limit_ram_acks /2 ]
23792391 end ,
2380- { _ , State2 } = lists :foldl (fun (ReduceFun , {QuotaN , StateN }) ->
2392+ { Quota , State2 } = lists :foldl (fun (ReduceFun , {QuotaN , StateN }) ->
23812393 ReduceFun (QuotaN , StateN )
2382- end , {S1 , State }, Funs ),
2383- State2
2394+ end , {A2BChunkActual , State }, Funs ),
2395+ {( Quota == 0 ) andalso ( A2BChunk > A2BChunkActual ), State2 }
23842396 end ,
2385-
2386- State3 =
2397+ Permitted = permitted_beta_count ( State1 ),
2398+ { NeedResumeB2D , State3 } =
23872399 % % If there are more messages with their queue position held in RAM,
23882400 % % a.k.a. betas, in Q2 & Q3 than IoBatchSize,
23892401 % % write their queue position to disk, a.k.a. push_betas_to_deltas
23902402 case chunk_size (? QUEUE :len (Q2 ) + ? QUEUE :len (Q3 ),
2391- permitted_beta_count (State1 )) of
2392- S2 when S2 >= IoBatchSize ->
2393- % % There is an implicit, but subtle, upper bound here. We
2394- % % may shuffle a lot of messages from Q2/3 into delta, but
2395- % % the number of these that require any disk operation,
2396- % % namely index writing, i.e. messages that are genuine
2397- % % betas and not gammas, is bounded by the credit_flow
2398- % % limiting of the alpha->beta conversion above.
2399- push_betas_to_deltas (S2 , State1 );
2403+ Permitted ) of
2404+ B2DChunk when B2DChunk >= IoBatchSize ->
2405+ % % Same as for alphas to betas. Limit a number of messages
2406+ % % to be flushed to disk at once to avoid blocking the process.
2407+ B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
2408+ true -> CreditDiscBound * 2 ;
2409+ false -> B2DChunk
2410+ end ,
2411+ StateBD = push_betas_to_deltas (B2DChunkActual , State1 ),
2412+ {B2DChunk > B2DChunkActual , StateBD };
24002413 _ ->
2401- State1
2414+ { false , State1 }
24022415 end ,
2403- % % See rabbitmq-server-290 for the reasons behind this GC call.
2404- garbage_collect (),
2416+ % % We can be blocked by the credit flow, or limited by a batch size,
2417+ % % or finished with flushing.
2418+ % % If blocked by the credit flow - the credit grant will resume processing,
2419+ % % if limited by a batch - the batch continuation message should be sent.
2420+ % % The continuation message will be prioritised over publishes,
2421+ % % but not cinsumptions, so the queue can make progess.
2422+ Blocked = credit_flow :blocked (),
2423+ case {Blocked , NeedResumeA2B orelse NeedResumeB2D } of
2424+ % % Credit bump will continue paging
2425+ {true , _ } -> ok ;
2426+ % % Finished with paging
2427+ {false , false } -> ok ;
2428+ % % Planning next batch
2429+ {false , true } ->
2430+ % % We don't want to use self-credit-flow, because it's harder to
2431+ % % reason about. So the process sends a (prioritised) message to
2432+ % % itself and sets a waiting_bump value to keep the message box clean
2433+ case get (waiting_bump ) of
2434+ true -> ok ;
2435+ _ -> self () ! bump_reduce_memory_use ,
2436+ put (waiting_bump , true )
2437+ end
2438+ end ,
24052439 State3 ;
24062440% % When using lazy queues, there are no alphas, so we don't need to
24072441% % call push_alphas_to_betas/2.
0 commit comments