22% % License, v. 2.0. If a copy of the MPL was not distributed with this
33% % file, You can obtain one at https://mozilla.org/MPL/2.0/.
44% %
5- % % Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6-
5+ % % Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
6+ % % The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries.
7+ % % All rights reserved.
78-module (rabbit_fifo ).
89
910-behaviour (ra_machine ).
@@ -339,9 +340,9 @@ apply_(#{index := Idx} = Meta,
339340 State0 = add_bytes_return (Header , State00 ),
340341 Con = Con0 # consumer {checked_out = maps :remove (MsgId , Checked0 ),
341342 credit = increase_credit (Con0 , 1 )},
342- State1 = State0 #? STATE {messages = rabbit_fifo_q :in (no ,
343- ? MSG (Idx , Header ),
344- Messages )},
343+ State1 = State0 #? STATE {messages = rabbit_fifo_pq :in (4 ,
344+ ? MSG (Idx , Header ),
345+ Messages )},
345346 State2 = update_or_remove_con (Meta , ConsumerKey , Con , State1 ),
346347 {State3 , Effects } = activate_next_consumer ({State2 , []}),
347348 checkout (Meta , State0 , State3 , Effects );
@@ -470,7 +471,7 @@ apply_(#{index := Idx} = Meta,
470471apply_ (#{index := Index }, # purge {},
471472 #? STATE {messages_total = Total } = State0 ) ->
472473 NumReady = messages_ready (State0 ),
473- State1 = State0 #? STATE {messages = rabbit_fifo_q :new (),
474+ State1 = State0 #? STATE {messages = rabbit_fifo_pq :new (),
474475 messages_total = Total - NumReady ,
475476 returns = lqueue :new (),
476477 msg_bytes_enqueue = 0
@@ -708,7 +709,7 @@ live_indexes(#?STATE{cfg = #cfg{},
708709 messages = Messages ,
709710 consumers = Consumers ,
710711 dlx = #? DLX {discards = Discards }}) ->
711- MsgsIdxs = rabbit_fifo_q :indexes (Messages ),
712+ MsgsIdxs = rabbit_fifo_pq :indexes (Messages ),
712713 DlxIndexes = lqueue :fold (fun (? TUPLE (_ , Msg ), Acc ) ->
713714 I = get_msg_idx (Msg ),
714715 [I | Acc ]
@@ -760,8 +761,17 @@ convert_v7_to_v8(#{} = _Meta, StateV7) ->
760761 (_CKey , C ) ->
761762 C
762763 end , Cons0 ),
764+ Msgs = element (#? STATE .messages , StateV7 ),
765+ {Hi , No } = rabbit_fifo_q :to_queues (Msgs ),
766+ Pq0 = queue :fold (fun (I , Acc ) ->
767+ rabbit_fifo_pq :in (9 , I , Acc )
768+ end , rabbit_fifo_pq :new (), Hi ),
769+ Pq = queue :fold (fun (I , Acc ) ->
770+ rabbit_fifo_pq :in (4 , I , Acc )
771+ end , Pq0 , No ),
763772 StateV8 = StateV7 ,
764773 StateV8 #? STATE {discarded_bytes = 0 ,
774+ messages = Pq ,
765775 consumers = Cons ,
766776 unused_0 = ? NIL }.
767777
@@ -935,8 +945,11 @@ overview(#?STATE{consumers = Cons,
935945 #{}
936946 end ,
937947 MsgsRet = lqueue :len (Returns ),
938- #{num_hi := MsgsHi ,
939- num_no := MsgsNo } = rabbit_fifo_q :overview (Messages ),
948+ % % TODO emit suitable overview metrics
949+ #{
950+ % num_hi := MsgsHi,
951+ % num_no := MsgsNo
952+ } = rabbit_fifo_pq :overview (Messages ),
940953
941954 Overview = #{type => ? STATE ,
942955 config => Conf ,
@@ -945,13 +958,14 @@ overview(#?STATE{consumers = Cons,
945958 num_checked_out => num_checked_out (State ),
946959 num_enqueuers => maps :size (Enqs ),
947960 num_ready_messages => messages_ready (State ),
948- num_ready_messages_high => MsgsHi ,
949- num_ready_messages_normal => MsgsNo ,
961+ % num_ready_messages_high => MsgsHi,
962+ % num_ready_messages_normal => MsgsNo,
950963 num_ready_messages_return => MsgsRet ,
951964 num_messages => messages_total (State ),
952965 enqueue_message_bytes => EnqueueBytes ,
953966 checkout_message_bytes => CheckoutBytes ,
954- discarded_bytes => DiscardedBytes
967+ discarded_bytes => DiscardedBytes ,
968+ smallest_raft_index => smallest_raft_index (State )
955969 },
956970 DlxOverview = dlx_overview (DlxState ),
957971 maps :merge (maps :merge (Overview , DlxOverview ), SacOverview ).
@@ -1428,7 +1442,7 @@ usage(Name) when is_atom(Name) ->
14281442
14291443messages_ready (#? STATE {messages = M ,
14301444 returns = R }) ->
1431- rabbit_fifo_q :len (M ) + lqueue :len (R ).
1445+ rabbit_fifo_pq :len (M ) + lqueue :len (R ).
14321446
14331447messages_total (#? STATE {messages_total = Total ,
14341448 dlx = DlxState }) ->
@@ -1738,7 +1752,7 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg,
17381752 PTag = priority_tag (RawMsg ),
17391753 State = State0 #? STATE {msg_bytes_enqueue = Enqueue + Size ,
17401754 messages_total = Total + 1 ,
1741- messages = rabbit_fifo_q :in (PTag , Msg , Messages )
1755+ messages = rabbit_fifo_pq :in (PTag , Msg , Messages )
17421756 },
17431757 {ok , State , Effects };
17441758maybe_enqueue (RaftIdx , Ts , From , MsgSeqNo , RawMsg ,
@@ -1771,7 +1785,7 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
17711785 PTag = priority_tag (RawMsg ),
17721786 State = State0 #? STATE {msg_bytes_enqueue = BytesEnqueued + Size ,
17731787 messages_total = Total + 1 ,
1774- messages = rabbit_fifo_q :in (PTag , Msg , Messages ),
1788+ messages = rabbit_fifo_pq :in (PTag , Msg , Messages ),
17751789 enqueuers = Enqueuers0 #{From => Enq },
17761790 msg_cache = MsgCache
17771791 },
@@ -2154,7 +2168,7 @@ take_next_msg(#?STATE{returns = Returns0,
21542168 {{value , NextMsg }, Returns } ->
21552169 {NextMsg , State #? STATE {returns = Returns }};
21562170 {empty , _ } ->
2157- case rabbit_fifo_q :out (Messages0 ) of
2171+ case rabbit_fifo_pq :out (Messages0 ) of
21582172 empty ->
21592173 empty ;
21602174 {Msg , Messages } ->
@@ -2166,7 +2180,7 @@ get_next_msg(#?STATE{returns = Returns0,
21662180 messages = Messages0 }) ->
21672181 case lqueue :get (Returns0 , empty ) of
21682182 empty ->
2169- rabbit_fifo_q :get (Messages0 );
2183+ rabbit_fifo_pq :get (Messages0 );
21702184 Msg ->
21712185 Msg
21722186 end .
@@ -2279,7 +2293,7 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
22792293 checkout_one (Meta , ExpiredMsg ,
22802294 InitState #? STATE {service_queue = SQ1 }, Effects1 );
22812295 {empty , _ } ->
2282- case rabbit_fifo_q :len (Messages0 ) of
2296+ case rabbit_fifo_pq :len (Messages0 ) of
22832297 0 ->
22842298 {nochange , ExpiredMsg , InitState , Effects1 };
22852299 _ ->
@@ -2865,7 +2879,7 @@ smallest_raft_index(#?STATE{messages = Messages,
28652879 SmallestDlxRaIdx = lqueue :fold (fun (? TUPLE (_ , Msg ), Acc ) ->
28662880 min (get_msg_idx (Msg ), Acc )
28672881 end , undefined , Discards ),
2868- SmallestMsgsRaIdx = rabbit_fifo_q :get_lowest_index (Messages ),
2882+ SmallestMsgsRaIdx = rabbit_fifo_pq :get_lowest_index (Messages ),
28692883 % % scan consumers and returns queue here instead
28702884 smallest_checked_out (State , min (SmallestDlxRaIdx , SmallestMsgsRaIdx )).
28712885
@@ -3023,14 +3037,13 @@ priority_tag(Msg) ->
30233037 case mc :is (Msg ) of
30243038 true ->
30253039 case mc :priority (Msg ) of
3026- P when is_integer (P ) andalso
3027- P > 4 ->
3028- hi ;
3040+ P when is_integer (P ) ->
3041+ min (P , 31 );
30293042 _ ->
3030- no
3043+ 4
30313044 end ;
30323045 false ->
3033- no
3046+ 4
30343047 end .
30353048
30363049do_snapshot (MacVer , Ts , Ch ,
@@ -3256,29 +3269,6 @@ dlx_apply(_, Cmd, DLH, State) ->
32563269% % down: 90 bytes
32573270% % enqueue overhead 210
32583271
3259- % messages_get_next_msg(#messages{returns = Returns0,
3260- % messages = Messages0}) ->
3261- % case lqueue:get(Returns0, empty) of
3262- % empty ->
3263- % rabbit_fifo_q:get(Messages0);
3264- % Msg ->
3265- % Msg
3266- % end.
3267-
3268- % messages_take_next_msg(#messages{returns = Returns0,
3269- % messages = Messages0} = Msgs) ->
3270- % case lqueue:out(Returns0) of
3271- % {{value, NextMsg}, Returns} ->
3272- % {NextMsg, Msgs#messages{returns = Returns}};
3273- % {empty, _} ->
3274- % case rabbit_fifo_q:out(Messages0) of
3275- % empty ->
3276- % empty;
3277- % {?MSG(_RaftIdx, _) = Msg, Messages} ->
3278- % {Msg, Msgs#messages{messages = Messages}}
3279- % end
3280- % end.
3281-
32823272ensure_worker_started (QRef , #? DLX {consumer = undefined }) ->
32833273 start_worker (QRef );
32843274ensure_worker_started (QRef , #? DLX {consumer = # dlx_consumer {pid = Pid }}) ->
0 commit comments