@@ -200,7 +200,8 @@ init(#{name := Name,
200200 resource = Resource }}),
201201 case State #? STATE .cfg # cfg .filter_enabled of
202202 true ->
203- State #? STATE {messages = rabbit_fifo_filter_q :new ()};
203+ State #? STATE {messages = rabbit_fifo_filter_q :new (),
204+ returns = gb_trees :empty ()};
204205 false ->
205206 State
206207 end .
@@ -506,7 +507,7 @@ apply(#{index := Idx} = Meta,
506507apply (#{index := Index }, # purge {},
507508 #? STATE {cfg = # cfg {filter_enabled = FilterEnabled },
508509 messages_total = Total ,
509- returns = Returns ,
510+ returns = Returns0 ,
510511 ra_indexes = Indexes0
511512 } = State0 ) ->
512513 NumReady = messages_ready (State0 ),
@@ -517,7 +518,7 @@ apply(#{index := Index}, #purge{},
517518 % % No message is awaiting acknowledgement.
518519 % % Optimization: empty all 'ra_indexes'.
519520 rabbit_fifo_index :empty ();
520- _ ->
521+ _ when not FilterEnabled ->
521522 % % Some messages are checked out to consumers
522523 % % awaiting acknowledgement.
523524 % % Therefore we cannot empty all 'ra_indexes'.
@@ -526,16 +527,20 @@ apply(#{index := Index}, #purge{},
526527 % % not part of the 'ra_indexes'.
527528 lqueue :fold (fun (? MSG (I , _ ), Acc ) ->
528529 rabbit_fifo_index :delete (I , Acc )
529- end , Indexes0 , Returns )
530+ end , Indexes0 , Returns0 );
531+ _ ->
532+ Indexes0
530533 end ,
531- Messages = case FilterEnabled of
532- true -> rabbit_fifo_filter_q :new ();
533- false -> rabbit_fifo_q :new ()
534- end ,
534+ {Messages , Returns } = case FilterEnabled of
535+ true ->
536+ {rabbit_fifo_filter_q :new (), gb_trees :empty ()};
537+ false ->
538+ {rabbit_fifo_q :new (), lqueue :new ()}
539+ end ,
535540 State1 = State0 #? STATE {ra_indexes = Indexes ,
536541 messages = Messages ,
537542 messages_total = Total - NumReady ,
538- returns = lqueue : new () ,
543+ returns = Returns ,
539544 msg_bytes_enqueue = 0
540545 },
541546 Effects0 = [{aux , force_checkpoint }, garbage_collection ],
@@ -890,7 +895,7 @@ overview(#?STATE{consumers = Cons,
890895 enqueue_count = EnqCount ,
891896 msg_bytes_enqueue = EnqueueBytes ,
892897 msg_bytes_checkout = CheckoutBytes ,
893- cfg = Cfg ,
898+ cfg = # cfg { filter_enabled = FilterEnabled } = Cfg ,
894899 dlx = DlxState ,
895900 messages = Messages ,
896901 returns = Returns ,
@@ -917,12 +922,16 @@ overview(#?STATE{consumers = Cons,
917922 _ ->
918923 #{}
919924 end ,
920- MsgsRet = lqueue :len (Returns ),
921- #{num_hi := MsgsHi ,
922- num_no := MsgsNo } = case Cfg # cfg .filter_enabled of
923- true -> rabbit_fifo_filter_q :overview (Messages );
924- false -> rabbit_fifo_q :overview (Messages )
925- end ,
925+ {#{num_hi := MsgsHi ,
926+ num_no := MsgsNo },
927+ MsgsRet } = case FilterEnabled of
928+ true ->
929+ {rabbit_fifo_filter_q :overview (Messages ),
930+ gb_trees :size (Returns )};
931+ false ->
932+ {rabbit_fifo_q :overview (Messages ),
933+ lqueue :len (Returns )}
934+ end ,
926935 Overview = #{type => ? STATE ,
927936 config => Conf ,
928937 num_consumers => map_size (Cons ),
@@ -1401,8 +1410,8 @@ is_v4() ->
14011410
14021411messages_ready (#? STATE {cfg = # cfg {filter_enabled = true },
14031412 messages = M ,
1404- returns = _Todo }) ->
1405- rabbit_fifo_filter_q :len ( M );
1413+ returns = R }) ->
1414+ rabbit_fifo_filter_q :size ( M ) + gb_trees : size ( R );
14061415messages_ready (#? STATE {messages = M ,
14071416 returns = R }) ->
14081417 rabbit_fifo_q :len (M ) + lqueue :len (R ).
@@ -1523,7 +1532,8 @@ activate_next_consumer(#?STATE{cfg = #cfg{consumer_strategy = competing}} = Stat
15231532activate_next_consumer (#? STATE {cfg = # cfg {filter_enabled = FilterEnabled },
15241533 consumers = Cons0 ,
15251534 waiting_consumers = Waiting0 ,
1526- messages = Messages
1535+ messages = Messages ,
1536+ return_count = ReturnCount
15271537 } = State0 ,
15281538 Effects0 ) ->
15291539 % % invariant, the waiting list always need to be sorted by consumers that are
@@ -1555,7 +1565,8 @@ activate_next_consumer(#?STATE{cfg = #cfg{filter_enabled = FilterEnabled},
15551565 Consumer ,
15561566 ServiceQueue ,
15571567 FilterEnabled ,
1558- Messages ),
1568+ Messages ,
1569+ ReturnCount ),
15591570 State = State0 #? STATE {consumers = Cons0 #{NextCKey => Consumer },
15601571 service_queue = ServiceQueue1 ,
15611572 consumers_q = queue_consumer (NextCKey ,
@@ -1580,7 +1591,8 @@ activate_next_consumer(#?STATE{cfg = #cfg{filter_enabled = FilterEnabled},
15801591 Consumer ,
15811592 ServiceQueue ,
15821593 FilterEnabled ,
1583- Messages ),
1594+ Messages ,
1595+ ReturnCount ),
15841596 Cons1 = Cons0 #{NextCKey => Consumer },
15851597 Cons = maps :remove (ActiveCKey , Cons1 ),
15861598 ConQ1 = queue_consumer (NextCKey , ? CONSUMER_PRIORITY (Consumer ), ConQ0 ),
@@ -1677,6 +1689,8 @@ queue_filtering_consumers(#?STATE{cfg = #cfg{filter_enabled = true},
16771689 State #? STATE {service_queue = ConsQ ,
16781690 % % Rotating should help dispatching messages across multiple consumers.
16791691 % % Note however that this doesn't guarantee round robin.
1692+ % % TODO We might need some randomness whether we rotate to avoid
1693+ % % certain patterns where always the same consumer gets messages assigned.
16801694 consumers_q = priority_queue :rotate (ConsQ )};
16811695queue_filtering_consumers (State ) ->
16821696 State .
@@ -1830,26 +1844,27 @@ enqueue(Idx, Ts, RawMsg, {_MetaSize, BodySize}, _MsgMeta,
18301844
18311845return (#{} = Meta , ConsumerKey , MsgIds , IncrDelCount , Anns ,
18321846 Checked , Effects0 , State0 )
1833- when is_map (Anns ) ->
1847+ when is_map (Anns ) ->
18341848 % % We requeue in the same order as messages got returned by the client.
18351849 {State1 , Effects1 } =
1836- lists :foldl (
1837- fun (MsgId , Acc = {S0 , E0 }) ->
1838- case Checked of
1839- #{MsgId := Msg } ->
1840- return_one (Meta , MsgId , Msg , IncrDelCount , Anns ,
1841- S0 , E0 , ConsumerKey );
1842- #{} ->
1843- Acc
1844- end
1845- end , {State0 , Effects0 }, MsgIds ),
1850+ lists :foldl (
1851+ fun (MsgId , Acc = {S0 , E0 }) ->
1852+ case Checked of
1853+ #{MsgId := Msg } ->
1854+ return_one (Meta , MsgId , Msg , IncrDelCount , Anns ,
1855+ S0 , E0 , ConsumerKey );
1856+ #{} ->
1857+ Acc
1858+ end
1859+ end , {State0 , Effects0 }, MsgIds ),
18461860 State2 = case State1 #? STATE .consumers of
18471861 #{ConsumerKey := Con } ->
18481862 update_or_remove_con (Meta , ConsumerKey , Con , State1 );
18491863 _ ->
18501864 State1
18511865 end ,
1852- checkout (Meta , State0 , State2 , Effects1 ).
1866+ State3 = queue_filtering_consumers (State2 ),
1867+ checkout (Meta , State0 , State3 , Effects1 ).
18531868
18541869% used to process messages that are finished
18551870complete (Meta , ConsumerKey , [MsgId ],
@@ -1987,10 +2002,12 @@ annotate_msg(Header, Msg0) ->
19872002 end .
19882003
19892004return_one (Meta , MsgId , ? MSG (_ , _ ) = Msg0 , DelivFailed , Anns ,
1990- #? STATE {returns = Returns ,
2005+ #? STATE {returns = Returns0 ,
2006+ return_count = ReturnCount0 ,
19912007 consumers = Consumers ,
19922008 dlx = DlxState0 ,
1993- cfg = # cfg {delivery_limit = DeliveryLimit ,
2009+ cfg = # cfg {filter_enabled = FilterEnabled ,
2010+ delivery_limit = DeliveryLimit ,
19942011 dead_letter_handler = DLH }} = State0 ,
19952012 Effects0 , ConsumerKey ) ->
19962013 # consumer {checked_out = Checked0 } = Con0 = maps :get (ConsumerKey , Consumers ),
@@ -2007,20 +2024,30 @@ return_one(Meta, MsgId, ?MSG(_, _) = Msg0, DelivFailed, Anns,
20072024 Checked = maps :remove (MsgId , Checked0 ),
20082025 Con = Con0 # consumer {checked_out = Checked ,
20092026 credit = increase_credit (Con0 , 1 )},
2010- {add_bytes_return (
2011- Header ,
2012- State0 #? STATE {consumers = Consumers #{ConsumerKey := Con },
2013- returns = lqueue :in (Msg , Returns )}),
2014- Effects0 }
2027+ ReturnCount = ReturnCount0 + 1 ,
2028+ Returns = case FilterEnabled of
2029+ true ->
2030+ gb_trees :insert (ReturnCount , Msg , Returns0 );
2031+ false ->
2032+ lqueue :in (Msg , Returns0 )
2033+ end ,
2034+ State1 = State0 #? STATE {consumers = Consumers #{ConsumerKey := Con },
2035+ returns = Returns ,
2036+ return_count = ReturnCount },
2037+ State = add_bytes_return (Header , State1 ),
2038+ {State , Effects0 }
20152039 end .
20162040
20172041return_all (Meta , #? STATE {consumers = Cons } = State0 , Effects0 , ConsumerKey ,
20182042 # consumer {checked_out = Checked } = Con , DelivFailed ) ->
2019- State = State0 #? STATE {consumers = Cons #{ConsumerKey => Con }},
2020- lists :foldl (fun ({MsgId , Msg }, {S , E }) ->
2021- return_one (Meta , MsgId , Msg , DelivFailed , #{},
2022- S , E , ConsumerKey )
2023- end , {State , Effects0 }, lists :sort (maps :to_list (Checked ))).
2043+ State1 = State0 #? STATE {consumers = Cons #{ConsumerKey => Con }},
2044+ {State2 , Effects } = lists :foldl (
2045+ fun ({MsgId , Msg }, {S , E }) ->
2046+ return_one (Meta , MsgId , Msg , DelivFailed , #{},
2047+ S , E , ConsumerKey )
2048+ end , {State1 , Effects0 }, lists :sort (maps :to_list (Checked ))),
2049+ State = queue_filtering_consumers (State2 ),
2050+ {State , Effects }.
20242051
20252052checkout (Meta , OldState , State0 , Effects0 ) ->
20262053 checkout (Meta , OldState , State0 , Effects0 , ok ).
@@ -2148,21 +2175,58 @@ add_delivery_effects(Effects0, AccMap, State) ->
21482175 end , Efs , chunk_disk_msgs (DiskMsgs , 0 , [[]]))
21492176 end , Effects0 , AccMap ).
21502177
2151- % %TODO check returns first
21522178take_next_consumer_msg (RaTs ,
21532179 Con0 = # consumer {cfg = # consumer_cfg {filter = Filter },
2154- max_filtered_idxs = Idxs0 },
2180+ scanned_returns = ScannedReturns },
2181+ State0 = #? STATE {returns = Returns0 ,
2182+ return_count = ReturnCount })
2183+ when ScannedReturns < ReturnCount ->
2184+ case filter_returns (RaTs , Filter , ScannedReturns , Returns0 ) of
2185+ none ->
2186+ Con = Con0 # consumer {scanned_returns = ReturnCount },
2187+ take_next_consumer_msg (RaTs , Con , State0 );
2188+ {Key , Msg } ->
2189+ Returns = gb_trees :delete (Key , Returns0 ),
2190+ Con = Con0 # consumer {scanned_returns = Key },
2191+ State = State0 #? MODULE {returns = Returns },
2192+ {Msg , Con , State }
2193+ end ;
2194+ take_next_consumer_msg (RaTs ,
2195+ Con0 = # consumer {cfg = # consumer_cfg {filter = Filter },
2196+ scanned_idxs = Idxs0 },
21552197 State0 = #? STATE {messages = Messages0 }) ->
21562198 case rabbit_fifo_filter_q :take (RaTs , Filter , Idxs0 , Messages0 ) of
21572199 {empty , Idxs } ->
2158- Con = Con0 # consumer {max_filtered_idxs = Idxs },
2200+ Con = Con0 # consumer {scanned_idxs = Idxs },
21592201 {empty , Con };
21602202 {Msg , Idxs , Messages } ->
2161- Con = Con0 # consumer {max_filtered_idxs = Idxs },
2203+ Con = Con0 # consumer {scanned_idxs = Idxs },
21622204 State = State0 #? STATE {messages = Messages },
21632205 {Msg , Con , State }
21642206 end .
21652207
2208+ filter_returns (RaTs , Filter , Scanned , Tree ) ->
2209+ From = Scanned + 1 ,
2210+ Iter = gb_trees :iterator_from (From , Tree ),
2211+ filter_returns0 (RaTs , Filter , gb_trees :next (Iter )).
2212+
2213+ filter_returns0 (_RaTs , _Filter , none ) ->
2214+ none ;
2215+ filter_returns0 (RaTs , Filter ,
2216+ {ReturnCount , ? MSG (_Idx , #{meta := Meta } = Hdr ) = Msg , Iter }) ->
2217+ case rabbit_fifo :get_header (expiry , Hdr ) of
2218+ ExpiryTs when is_integer (ExpiryTs ) andalso RaTs >= ExpiryTs ->
2219+ % % Message expired.
2220+ filter_returns0 (RaTs , Filter , gb_trees :next (Iter ));
2221+ _ ->
2222+ case rabbit_fifo_filter :filter (Filter , Meta ) of
2223+ true ->
2224+ {ReturnCount , Msg };
2225+ false ->
2226+ filter_returns0 (RaTs , Filter , gb_trees :next (Iter ))
2227+ end
2228+ end .
2229+
21662230take_next_msg (#? STATE {returns = Returns0 ,
21672231 messages = Messages0 ,
21682232 ra_indexes = Indexes0
@@ -2241,7 +2305,7 @@ checkout_one(Meta = #{system_time := Ts},
22412305 Effects0 ) ->
22422306 case priority_queue :out (SQ0 ) of
22432307 {empty , _ } ->
2244- Activity = case rabbit_fifo_filter_q :len (Messages0 ) of
2308+ Activity = case rabbit_fifo_filter_q :size (Messages0 ) of
22452309 0 -> nochange ;
22462310 _ -> inactive
22472311 end ,
@@ -2515,7 +2579,8 @@ update_or_remove_con(_Meta, ConsumerKey,
25152579 #? STATE {cfg = # cfg {filter_enabled = FilterEnabled },
25162580 consumers = Cons ,
25172581 service_queue = ServiceQueue ,
2518- messages = Messages
2582+ messages = Messages ,
2583+ return_count = ReturnCount
25192584 } = State ) ->
25202585 % %TODO On the hot path, we want maps:update/3.
25212586 State #? STATE {consumers = maps :put (ConsumerKey , Con , Cons ),
@@ -2525,15 +2590,17 @@ update_or_remove_con(_Meta, ConsumerKey,
25252590 % % priority_queue:member/2 check.
25262591 service_queue = maybe_queue_consumer (
25272592 ConsumerKey , Con , ServiceQueue ,
2528- FilterEnabled , Messages )}.
2593+ FilterEnabled , Messages , ReturnCount )}.
25292594
25302595maybe_queue_consumer (Key , # consumer {credit = Credit ,
2531- max_filtered_idxs = Idxs ,
2596+ scanned_idxs = Idxs ,
2597+ scanned_returns = ScannedReturns ,
25322598 status = up ,
25332599 cfg = # consumer_cfg {priority = P }},
2534- ServiceQueue , true , Messages )
2600+ ServiceQueue , true , Messages , ReturnCount )
25352601 when Credit > 0 ->
2536- case rabbit_fifo_filter_q :is_fully_scanned (Idxs , Messages ) of
2602+ case ScannedReturns >= ReturnCount andalso
2603+ rabbit_fifo_filter_q :is_fully_scanned (Idxs , Messages ) of
25372604 true ->
25382605 ServiceQueue ;
25392606 false ->
@@ -2542,10 +2609,11 @@ maybe_queue_consumer(Key, #consumer{credit = Credit,
25422609maybe_queue_consumer (Key , # consumer {credit = Credit ,
25432610 status = up ,
25442611 cfg = # consumer_cfg {priority = P }},
2545- ServiceQueue , false , _Messages )
2612+ ServiceQueue , false , _Messages , _ReturnCount )
25462613 when Credit > 0 ->
25472614 queue_consumer (Key , P , ServiceQueue );
2548- maybe_queue_consumer (_Key , _Consumer , ServiceQueue , _FilterEnabled , _Messages ) ->
2615+ maybe_queue_consumer (_Key , _Consumer ,
2616+ ServiceQueue , _FilterEnabled , _Messages , _ReturnCount ) ->
25492617 ServiceQueue .
25502618
25512619queue_consumer (Key , Prio , ServiceQueue ) ->
@@ -2684,13 +2752,14 @@ credit_active_consumer(
26842752 #? STATE {cfg = # cfg {filter_enabled = FilterEnabled },
26852753 consumers = Cons0 ,
26862754 service_queue = ServiceQueue0 ,
2687- messages = Messages } = State0 ) ->
2755+ messages = Messages ,
2756+ return_count = ReturnCount } = State0 ) ->
26882757 LinkCreditSnd = link_credit_snd (DeliveryCountRcv , LinkCreditRcv ,
26892758 DeliveryCountSnd , Cfg ),
26902759 % % grant the credit
26912760 Con1 = Con0 # consumer {credit = LinkCreditSnd },
2692- ServiceQueue = maybe_queue_consumer (
2693- ConsumerKey , Con1 , ServiceQueue0 , FilterEnabled , Messages ),
2761+ ServiceQueue = maybe_queue_consumer (ConsumerKey , Con1 , ServiceQueue0 ,
2762+ FilterEnabled , Messages , ReturnCount ),
26942763 State1 = State0 #? STATE {service_queue = ServiceQueue ,
26952764 consumers = maps :update (ConsumerKey , Con1 , Cons0 )},
26962765 {State2 , ok , Effects } = checkout (Meta , State0 , State1 , []),
0 commit comments