Skip to content

Commit dde69ca

Browse files
author
Daniil Fedotov
committed
Revert "Remove consumer bias & allow queues under max load to drain quickly"
This reverts commit 155eb6b.
1 parent c06a1f1 commit dde69ca

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
-define(SYNC_INTERVAL, 200). %% milliseconds
2424
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
25+
-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
2526

2627
-export([info_keys/0]).
2728

@@ -968,26 +969,26 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
968969

969970
%%----------------------------------------------------------------------------
970971

971-
prioritise_call(Msg, _From, _Len, _State) ->
972+
prioritise_call(Msg, _From, _Len, State) ->
972973
case Msg of
973974
info -> 9;
974975
{info, _Items} -> 9;
975976
consumers -> 9;
976977
stat -> 7;
977-
{basic_consume, _, _, _, _, _, _, _, _, _} -> 1;
978-
{basic_cancel, _, _, _} -> 1;
978+
{basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2);
979+
{basic_cancel, _, _, _} -> consumer_bias(State, 0, 2);
979980
_ -> 0
980981
end.
981982

982-
prioritise_cast(Msg, _Len, _State) ->
983+
prioritise_cast(Msg, _Len, State) ->
983984
case Msg of
984985
delete_immediately -> 8;
985986
{set_ram_duration_target, _Duration} -> 8;
986987
{set_maximum_since_use, _Age} -> 8;
987988
{run_backing_queue, _Mod, _Fun} -> 6;
988989
{ack, _AckTags, _ChPid} -> 4; %% [1]
989990
{resume, _ChPid} -> 3;
990-
{notify_sent, _ChPid, _Credit} -> 2;
991+
{notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2);
991992
_ -> 0
992993
end.
993994

@@ -1003,6 +1004,13 @@ prioritise_cast(Msg, _Len, _State) ->
10031004
%% credit to self is hard to reason about. Consumers can continue while
10041005
%% reduce_memory_use is in progress.
10051006

1007+
consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) ->
1008+
case BQ:msg_rates(BQS) of
1009+
{0.0, _} -> Low;
1010+
{Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High;
1011+
{_, _} -> Low
1012+
end.
1013+
10061014
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10071015
case Msg of
10081016
{'DOWN', _, process, DownPid, _} -> 8;

0 commit comments

Comments
 (0)