2222
2323-define (SYNC_INTERVAL , 200 ). % % milliseconds
2424-define (RAM_DURATION_UPDATE_INTERVAL , 5000 ).
25+ -define (CONSUMER_BIAS_RATIO , 2.0 ). % % i.e. consume 100% faster
2526
2627-export ([info_keys /0 ]).
2728
@@ -968,26 +969,26 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
968969
969970% %----------------------------------------------------------------------------
970971
971- prioritise_call (Msg , _From , _Len , _State ) ->
972+ prioritise_call (Msg , _From , _Len , State ) ->
972973 case Msg of
973974 info -> 9 ;
974975 {info , _Items } -> 9 ;
975976 consumers -> 9 ;
976977 stat -> 7 ;
977- {basic_consume , _ , _ , _ , _ , _ , _ , _ , _ , _ } -> 1 ;
978- {basic_cancel , _ , _ , _ } -> 1 ;
978+ {basic_consume , _ , _ , _ , _ , _ , _ , _ , _ , _ } -> consumer_bias ( State , 0 , 2 ) ;
979+ {basic_cancel , _ , _ , _ } -> consumer_bias ( State , 0 , 2 ) ;
979980 _ -> 0
980981 end .
981982
982- prioritise_cast (Msg , _Len , _State ) ->
983+ prioritise_cast (Msg , _Len , State ) ->
983984 case Msg of
984985 delete_immediately -> 8 ;
985986 {set_ram_duration_target , _Duration } -> 8 ;
986987 {set_maximum_since_use , _Age } -> 8 ;
987988 {run_backing_queue , _Mod , _Fun } -> 6 ;
988989 {ack , _AckTags , _ChPid } -> 4 ; % % [1]
989990 {resume , _ChPid } -> 3 ;
990- {notify_sent , _ChPid , _Credit } -> 2 ;
991+ {notify_sent , _ChPid , _Credit } -> consumer_bias ( State , 0 , 2 ) ;
991992 _ -> 0
992993 end .
993994
@@ -1003,6 +1004,13 @@ prioritise_cast(Msg, _Len, _State) ->
10031004% % credit to self is hard to reason about. Consumers can continue while
10041005% % reduce_memory_use is in progress.
10051006
1007+ consumer_bias (# q {backing_queue = BQ , backing_queue_state = BQS }, Low , High ) ->
1008+ case BQ :msg_rates (BQS ) of
1009+ {0.0 , _ } -> Low ;
1010+ {Ingress , Egress } when Egress / Ingress < ? CONSUMER_BIAS_RATIO -> High ;
1011+ {_ , _ } -> Low
1012+ end .
1013+
10061014prioritise_info (Msg , _Len , # q {q = # amqqueue {exclusive_owner = DownPid }}) ->
10071015 case Msg of
10081016 {'DOWN' , _ , process , DownPid , _ } -> 8 ;
0 commit comments