Skip to content

Commit 18b590b

Browse files
committed
QQ strict priority queue
1 parent 4a63afc commit 18b590b

File tree

8 files changed

+424
-92
lines changed

8 files changed

+424
-92
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6-
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
6+
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries.
7+
%% All rights reserved.
78
-module(rabbit_fifo).
89

910
-behaviour(ra_machine).
@@ -339,9 +340,9 @@ apply_(#{index := Idx} = Meta,
339340
State0 = add_bytes_return(Header, State00),
340341
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0),
341342
credit = increase_credit(Con0, 1)},
342-
State1 = State0#?STATE{messages = rabbit_fifo_q:in(no,
343-
?MSG(Idx, Header),
344-
Messages)},
343+
State1 = State0#?STATE{messages = rabbit_fifo_pq:in(4,
344+
?MSG(Idx, Header),
345+
Messages)},
345346
State2 = update_or_remove_con(Meta, ConsumerKey, Con, State1),
346347
{State3, Effects} = activate_next_consumer({State2, []}),
347348
checkout(Meta, State0, State3, Effects);
@@ -470,7 +471,7 @@ apply_(#{index := Idx} = Meta,
470471
apply_(#{index := Index}, #purge{},
471472
#?STATE{messages_total = Total} = State0) ->
472473
NumReady = messages_ready(State0),
473-
State1 = State0#?STATE{messages = rabbit_fifo_q:new(),
474+
State1 = State0#?STATE{messages = rabbit_fifo_pq:new(),
474475
messages_total = Total - NumReady,
475476
returns = lqueue:new(),
476477
msg_bytes_enqueue = 0
@@ -708,7 +709,7 @@ live_indexes(#?STATE{cfg = #cfg{},
708709
messages = Messages,
709710
consumers = Consumers,
710711
dlx = #?DLX{discards = Discards}}) ->
711-
MsgsIdxs = rabbit_fifo_q:indexes(Messages),
712+
MsgsIdxs = rabbit_fifo_pq:indexes(Messages),
712713
DlxIndexes = lqueue:fold(fun (?TUPLE(_, Msg), Acc) ->
713714
I = get_msg_idx(Msg),
714715
[I | Acc]
@@ -760,8 +761,17 @@ convert_v7_to_v8(#{} = _Meta, StateV7) ->
760761
(_CKey, C) ->
761762
C
762763
end, Cons0),
764+
Msgs = element(#?STATE.messages, StateV7),
765+
{Hi, No} = rabbit_fifo_q:to_queues(Msgs),
766+
Pq0 = queue:fold(fun (I, Acc) ->
767+
rabbit_fifo_pq:in(9, I, Acc)
768+
end, rabbit_fifo_pq:new(), Hi),
769+
Pq = queue:fold(fun (I, Acc) ->
770+
rabbit_fifo_pq:in(4, I, Acc)
771+
end, Pq0, No),
763772
StateV8 = StateV7,
764773
StateV8#?STATE{discarded_bytes = 0,
774+
messages = Pq,
765775
consumers = Cons,
766776
unused_0 = ?NIL}.
767777

@@ -935,8 +945,11 @@ overview(#?STATE{consumers = Cons,
935945
#{}
936946
end,
937947
MsgsRet = lqueue:len(Returns),
938-
#{num_hi := MsgsHi,
939-
num_no := MsgsNo} = rabbit_fifo_q:overview(Messages),
948+
%% TODO emit suitable overview metrics
949+
#{
950+
% num_hi := MsgsHi,
951+
% num_no := MsgsNo
952+
} = rabbit_fifo_pq:overview(Messages),
940953

941954
Overview = #{type => ?STATE,
942955
config => Conf,
@@ -945,13 +958,14 @@ overview(#?STATE{consumers = Cons,
945958
num_checked_out => num_checked_out(State),
946959
num_enqueuers => maps:size(Enqs),
947960
num_ready_messages => messages_ready(State),
948-
num_ready_messages_high => MsgsHi,
949-
num_ready_messages_normal => MsgsNo,
961+
% num_ready_messages_high => MsgsHi,
962+
% num_ready_messages_normal => MsgsNo,
950963
num_ready_messages_return => MsgsRet,
951964
num_messages => messages_total(State),
952965
enqueue_message_bytes => EnqueueBytes,
953966
checkout_message_bytes => CheckoutBytes,
954-
discarded_bytes => DiscardedBytes
967+
discarded_bytes => DiscardedBytes,
968+
smallest_raft_index => smallest_raft_index(State)
955969
},
956970
DlxOverview = dlx_overview(DlxState),
957971
maps:merge(maps:merge(Overview, DlxOverview), SacOverview).
@@ -1428,7 +1442,7 @@ usage(Name) when is_atom(Name) ->
14281442

14291443
messages_ready(#?STATE{messages = M,
14301444
returns = R}) ->
1431-
rabbit_fifo_q:len(M) + lqueue:len(R).
1445+
rabbit_fifo_pq:len(M) + lqueue:len(R).
14321446

14331447
messages_total(#?STATE{messages_total = Total,
14341448
dlx = DlxState}) ->
@@ -1738,7 +1752,7 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg,
17381752
PTag = priority_tag(RawMsg),
17391753
State = State0#?STATE{msg_bytes_enqueue = Enqueue + Size,
17401754
messages_total = Total + 1,
1741-
messages = rabbit_fifo_q:in(PTag, Msg, Messages)
1755+
messages = rabbit_fifo_pq:in(PTag, Msg, Messages)
17421756
},
17431757
{ok, State, Effects};
17441758
maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
@@ -1771,7 +1785,7 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
17711785
PTag = priority_tag(RawMsg),
17721786
State = State0#?STATE{msg_bytes_enqueue = BytesEnqueued + Size,
17731787
messages_total = Total + 1,
1774-
messages = rabbit_fifo_q:in(PTag, Msg, Messages),
1788+
messages = rabbit_fifo_pq:in(PTag, Msg, Messages),
17751789
enqueuers = Enqueuers0#{From => Enq},
17761790
msg_cache = MsgCache
17771791
},
@@ -2154,7 +2168,7 @@ take_next_msg(#?STATE{returns = Returns0,
21542168
{{value, NextMsg}, Returns} ->
21552169
{NextMsg, State#?STATE{returns = Returns}};
21562170
{empty, _} ->
2157-
case rabbit_fifo_q:out(Messages0) of
2171+
case rabbit_fifo_pq:out(Messages0) of
21582172
empty ->
21592173
empty;
21602174
{Msg, Messages} ->
@@ -2166,7 +2180,7 @@ get_next_msg(#?STATE{returns = Returns0,
21662180
messages = Messages0}) ->
21672181
case lqueue:get(Returns0, empty) of
21682182
empty ->
2169-
rabbit_fifo_q:get(Messages0);
2183+
rabbit_fifo_pq:get(Messages0);
21702184
Msg ->
21712185
Msg
21722186
end.
@@ -2279,7 +2293,7 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
22792293
checkout_one(Meta, ExpiredMsg,
22802294
InitState#?STATE{service_queue = SQ1}, Effects1);
22812295
{empty, _} ->
2282-
case rabbit_fifo_q:len(Messages0) of
2296+
case rabbit_fifo_pq:len(Messages0) of
22832297
0 ->
22842298
{nochange, ExpiredMsg, InitState, Effects1};
22852299
_ ->
@@ -2865,7 +2879,7 @@ smallest_raft_index(#?STATE{messages = Messages,
28652879
SmallestDlxRaIdx = lqueue:fold(fun (?TUPLE(_, Msg), Acc) ->
28662880
min(get_msg_idx(Msg), Acc)
28672881
end, undefined, Discards),
2868-
SmallestMsgsRaIdx = rabbit_fifo_q:get_lowest_index(Messages),
2882+
SmallestMsgsRaIdx = rabbit_fifo_pq:get_lowest_index(Messages),
28692883
%% scan consumers and returns queue here instead
28702884
smallest_checked_out(State, min(SmallestDlxRaIdx, SmallestMsgsRaIdx)).
28712885

@@ -3023,14 +3037,13 @@ priority_tag(Msg) ->
30233037
case mc:is(Msg) of
30243038
true ->
30253039
case mc:priority(Msg) of
3026-
P when is_integer(P) andalso
3027-
P > 4 ->
3028-
hi;
3040+
P when is_integer(P) ->
3041+
min(P, 31);
30293042
_ ->
3030-
no
3043+
4
30313044
end;
30323045
false ->
3033-
no
3046+
4
30343047
end.
30353048

30363049
do_snapshot(MacVer, Ts, Ch,
@@ -3256,29 +3269,6 @@ dlx_apply(_, Cmd, DLH, State) ->
32563269
%% down: 90 bytes
32573270
%% enqueue overhead 210
32583271

3259-
% messages_get_next_msg(#messages{returns = Returns0,
3260-
% messages = Messages0}) ->
3261-
% case lqueue:get(Returns0, empty) of
3262-
% empty ->
3263-
% rabbit_fifo_q:get(Messages0);
3264-
% Msg ->
3265-
% Msg
3266-
% end.
3267-
3268-
% messages_take_next_msg(#messages{returns = Returns0,
3269-
% messages = Messages0} = Msgs) ->
3270-
% case lqueue:out(Returns0) of
3271-
% {{value, NextMsg}, Returns} ->
3272-
% {NextMsg, Msgs#messages{returns = Returns}};
3273-
% {empty, _} ->
3274-
% case rabbit_fifo_q:out(Messages0) of
3275-
% empty ->
3276-
% empty;
3277-
% {?MSG(_RaftIdx, _) = Msg, Messages} ->
3278-
% {Msg, Msgs#messages{messages = Messages}}
3279-
% end
3280-
% end.
3281-
32823272
ensure_worker_started(QRef, #?DLX{consumer = undefined}) ->
32833273
start_worker(QRef);
32843274
ensure_worker_started(QRef, #?DLX{consumer = #dlx_consumer{pid = Pid}}) ->

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@
207207

208208
-record(messages,
209209
{
210-
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
210+
messages = rabbit_fifo_pq:new() :: rabbit_fifo_pq:state(),
211211
messages_total = 0 :: non_neg_integer(),
212212
% queue of returned msg_in_ids - when checking out it picks from
213213
returns = lqueue:new() :: lqueue:lqueue(term())
@@ -233,7 +233,7 @@
233233
-record(rabbit_fifo,
234234
{cfg :: #cfg{},
235235
% unassigned messages
236-
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
236+
messages = rabbit_fifo_pq:new() :: rabbit_fifo_pq:state(),
237237
messages_total = 0 :: non_neg_integer(),
238238
% queue of returned msg_in_ids - when checking out it picks from
239239
returns = lqueue:new() :: lqueue:lqueue(term()),

0 commit comments

Comments
 (0)