7777 policy , operator_policy , effective_policy_definition , type , memory ,
7878 consumers , segments ]).
7979
80+ -define (UNMATCHED_THRESHOLD , 200 ).
81+
8082-type appender_seq () :: non_neg_integer ().
8183
8284-type msg () :: term (). % % TODO: refine
8385
8486-record (stream , {mode :: rabbit_queue_type :consume_mode (),
8587 delivery_count :: none | rabbit_queue_type :delivery_count (),
8688 credit :: rabbit_queue_type :credit (),
89+ drain = false :: boolean (),
90+ credit_reply_outstanding = false :: boolean (),
8791 ack :: boolean (),
8892 start_offset = 0 :: non_neg_integer (),
8993 listening_offset = 0 :: non_neg_integer (),
9599 % % reversed order until the consumer has more credits to consume them.
96100 buffer_msgs_rev = [] :: [rabbit_amqqueue :qmsg ()],
97101 filter :: rabbit_amqp_filter :expression (),
102+ % % Number of consecutive messages for which the filter evaluated to false
103+ unmatched = 0 :: non_neg_integer (),
104+ filtering_paused = false :: boolean (),
98105 reader_options :: map ()}).
99106
100107-record (stream_client , {stream_id :: string (),
@@ -513,39 +520,22 @@ credit_v1(_, _, _, _, _) ->
513520credit (QName , CTag , DeliveryCountRcv , LinkCreditRcv , Drain ,
514521 # stream_client {readers = Readers ,
515522 name = Name ,
516- local_pid = LocalPid } = State0 ) ->
523+ local_pid = LocalPid } = State ) ->
517524 case Readers of
518525 #{CTag := Str0 = # stream {delivery_count = DeliveryCountSnd }} ->
519526 LinkCreditSnd = amqp10_util :link_credit_snd (
520527 DeliveryCountRcv , LinkCreditRcv , DeliveryCountSnd ),
521- Str1 = Str0 # stream {credit = LinkCreditSnd },
522- {Str2 = # stream {delivery_count = DeliveryCount ,
523- credit = Credit ,
524- ack = Ack }, Msgs } = stream_entries (QName , Name , LocalPid , Str1 ),
525- Str = case Drain andalso Credit > 0 of
526- true ->
527- Str2 # stream {delivery_count = serial_number :add (DeliveryCount , Credit ),
528- credit = 0 };
529- false ->
530- Str2
531- end ,
532- State = State0 # stream_client {readers = maps :update (CTag , Str , Readers )},
533- Actions = deliver_actions (CTag , Ack , Msgs ) ++ [{credit_reply ,
534- CTag ,
535- Str # stream .delivery_count ,
536- Str # stream .credit ,
537- available_messages (Str ),
538- Drain }],
539- {State , Actions };
528+ Str1 = Str0 # stream {credit = LinkCreditSnd ,
529+ drain = Drain ,
530+ credit_reply_outstanding = true },
531+ {Str2 , Msgs } = stream_entries (QName , Name , CTag , LocalPid , Str1 ),
532+ {Str , Actions } = actions (CTag , Msgs , Str2 ),
533+ {State # stream_client {readers = maps :update (CTag , Str , Readers )},
534+ Actions };
540535 _ ->
541- {State0 , []}
536+ {State , []}
542537 end .
543538
544- % % Returns only an approximation.
545- available_messages (# stream {log = Log ,
546- last_consumed_offset = LastConsumedOffset }) ->
547- max (0 , osiris_log :committed_offset (Log ) - LastConsumedOffset ).
548-
549539deliver (QSs , Msg , Options ) ->
550540 lists :foldl (
551541 fun ({Q , stateless }, {Qs , Actions }) ->
@@ -624,17 +614,34 @@ handle_event(_QName, {osiris_written, From, _WriterId, Corrs},
624614 slow = Slow },
625615 {ok , State , Actions };
626616handle_event (QName , {osiris_offset , _From , _Offs },
627- State = # stream_client {local_pid = LocalPid ,
628- readers = Readers0 ,
629- name = Name }) ->
617+ State0 = # stream_client {local_pid = LocalPid ,
618+ readers = Readers0 ,
619+ name = Name }) ->
630620 % % offset isn't actually needed as we use the atomic to read the
631621 % % current committed
632622 {Readers , Actions } = maps :fold (
633623 fun (Tag , Str0 , {Rds , As }) ->
634- {Str , Msgs } = stream_entries (QName , Name , LocalPid , Str0 ),
635- {Rds #{Tag => Str }, deliver_actions (Tag , Str # stream .ack , Msgs ) ++ As }
636- end , {#{}, []}, Readers0 ),
637- {ok , State # stream_client {readers = Readers }, Actions };
624+ {Str1 , Msgs } = stream_entries (QName , Name , Tag , LocalPid , Str0 ),
625+ {Str , As1 } = actions (Tag , Msgs , Str1 ),
626+ {[{Tag , Str } | Rds ], As1 ++ As }
627+ end , {[], []}, Readers0 ),
628+ State = State0 # stream_client {readers = maps :from_list (Readers )},
629+ {ok , State , Actions };
630+ handle_event (QName , {resume_filtering , CTag },
631+ # stream_client {name = Name ,
632+ local_pid = LocalPid ,
633+ readers = Readers0 } = State ) ->
634+ case Readers0 of
635+ #{CTag := Str0 } ->
636+ Str1 = Str0 # stream {unmatched = 0 ,
637+ filtering_paused = false },
638+ {Str2 , Msgs } = stream_entries (QName , Name , CTag , LocalPid , Str1 ),
639+ {Str , Actions } = actions (CTag , Msgs , Str2 ),
640+ Readers = maps :update (CTag , Str , Readers0 ),
641+ {ok , State # stream_client {readers = Readers }, Actions };
642+ _ ->
643+ {ok , State , []}
644+ end ;
638645handle_event (_QName , {stream_leader_change , Pid }, State ) ->
639646 {ok , update_leader_pid (Pid , State ), []};
640647handle_event (_QName , {stream_local_member_change , Pid },
@@ -690,7 +697,7 @@ settle(QName, _, CTag, MsgIds, #stream_client{readers = Readers0,
690697 % % all settle reasons will "give credit" to the stream queue
691698 Credit = length (MsgIds ),
692699 Str1 = Str0 # stream {credit = Credit0 + Credit },
693- {Str , Msgs } = stream_entries (QName , Name , LocalPid , Str1 ),
700+ {Str , Msgs } = stream_entries (QName , Name , CTag , LocalPid , Str1 ),
694701 Readers = maps :update (CTag , Str , Readers0 ),
695702 {State # stream_client {readers = Readers },
696703 deliver_actions (CTag , Ack , Msgs )};
@@ -1132,7 +1139,10 @@ add_if_defined(Key, Value, Map) ->
11321139 maps :put (Key , Value , Map ).
11331140
11341141format_osiris_event (Evt , QRef ) ->
1135- {'$gen_cast' , {queue_event , QRef , Evt }}.
1142+ {'$gen_cast' , queue_event (QRef , Evt )}.
1143+
1144+ queue_event (QRef , Evt ) ->
1145+ {queue_event , QRef , Evt }.
11361146
11371147max_age (undefined ) ->
11381148 undefined ;
@@ -1159,21 +1169,21 @@ recover(Q) ->
11591169maybe_send_reply (_ChPid , undefined ) -> ok ;
11601170maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
11611171
1162- stream_entries (QName , Name , LocalPid ,
1172+ stream_entries (QName , Name , CTag , LocalPid ,
11631173 # stream {chunk_iterator = undefined ,
11641174 credit = Credit } = Str0 ) ->
11651175 case Credit > 0 of
11661176 true ->
11671177 case chunk_iterator (Str0 , LocalPid ) of
11681178 {ok , Str } ->
1169- stream_entries (QName , Name , LocalPid , Str );
1179+ stream_entries (QName , Name , CTag , LocalPid , Str );
11701180 {end_of_stream , Str } ->
11711181 {Str , []}
11721182 end ;
11731183 false ->
11741184 {Str0 , []}
11751185 end ;
1176- stream_entries (QName , Name , LocalPid ,
1186+ stream_entries (QName , Name , CTag , LocalPid ,
11771187 # stream {delivery_count = DC ,
11781188 credit = Credit ,
11791189 buffer_msgs_rev = Buf0 ,
@@ -1194,40 +1204,49 @@ stream_entries(QName, Name, LocalPid,
11941204 credit = Credit - BufLen ,
11951205 buffer_msgs_rev = [],
11961206 last_consumed_offset = LastOff + BufLen },
1197- stream_entries (QName , Name , LocalPid , Str , Buf0 )
1207+ stream_entries (QName , Name , CTag , LocalPid , Str , Buf0 )
11981208 end ;
1199- stream_entries (QName , Name , LocalPid , Str ) ->
1200- stream_entries (QName , Name , LocalPid , Str , []).
1209+ stream_entries (QName , Name , CTag , LocalPid , Str ) ->
1210+ stream_entries (QName , Name , CTag , LocalPid , Str , []).
12011211
1202- stream_entries (_ , _ , _ , # stream {credit = Credit } = Str , Acc )
1212+ stream_entries (_ , _ , _ , _ , # stream {credit = Credit } = Str , Acc )
12031213 when Credit < 1 ->
12041214 {Str , lists :reverse (Acc )};
1205- stream_entries (QName , Name , LocalPid ,
1215+ stream_entries (QName , Name , CTag , LocalPid ,
12061216 # stream {chunk_iterator = Iter0 ,
12071217 delivery_count = DC ,
12081218 credit = Credit ,
12091219 start_offset = StartOffset ,
1210- filter = Filter } = Str0 , Acc0 ) ->
1220+ filter = Filter ,
1221+ unmatched = Unmatched } = Str0 , Acc0 ) ->
12111222 case osiris_log :iterator_next (Iter0 ) of
1223+ end_of_chunk when Unmatched > ? UNMATCHED_THRESHOLD ->
1224+ % % Pause filtering temporariliy for two reasons:
1225+ % % 1. Process Erlang messages in our mailbox to avoid blocking other links
1226+ % % 2. Send matched messages to the receiver as soon as possible
1227+ gen_server :cast (self (), queue_event (QName , {resume_filtering , CTag })),
1228+ {Str0 # stream {filtering_paused = true }, lists :reverse (Acc0 )};
12121229 end_of_chunk ->
12131230 case chunk_iterator (Str0 , LocalPid ) of
12141231 {ok , Str } ->
1215- stream_entries (QName , Name , LocalPid , Str , Acc0 );
1232+ stream_entries (QName , Name , CTag , LocalPid , Str , Acc0 );
12161233 {end_of_stream , Str } ->
12171234 {Str , lists :reverse (Acc0 )}
12181235 end ;
12191236 {{Offset , Entry }, Iter } ->
12201237 {Str , Acc } = case Entry of
12211238 {batch , _NumRecords , 0 , _Len , BatchedEntries } ->
1222- {MsgsRev , NumMsgs } = parse_uncompressed_subbatch (
1223- BatchedEntries , Offset , StartOffset ,
1224- QName , Name , LocalPid , Filter , {[], 0 }),
1239+ {MsgsRev , NumMsgs , U } = parse_uncompressed_subbatch (
1240+ BatchedEntries , Offset , StartOffset ,
1241+ QName , Name , LocalPid , Filter ,
1242+ {[], 0 , Unmatched }),
12251243 case Credit >= NumMsgs of
12261244 true ->
12271245 {Str0 # stream {chunk_iterator = Iter ,
12281246 delivery_count = delivery_count_add (DC , NumMsgs ),
12291247 credit = Credit - NumMsgs ,
1230- last_consumed_offset = Offset + NumMsgs - 1 },
1248+ last_consumed_offset = Offset + NumMsgs - 1 ,
1249+ unmatched = U },
12311250 MsgsRev ++ Acc0 };
12321251 false ->
12331252 % % Consumer doesn't have sufficient credit.
@@ -1238,7 +1257,8 @@ stream_entries(QName, Name, LocalPid,
12381257 delivery_count = delivery_count_add (DC , Credit ),
12391258 credit = 0 ,
12401259 buffer_msgs_rev = Buf ,
1241- last_consumed_offset = Offset + Credit - 1 },
1260+ last_consumed_offset = Offset + Credit - 1 ,
1261+ unmatched = U },
12421262 MsgsRev1 ++ Acc0 }
12431263 end ;
12441264 {batch , _ , _CompressionType , _ , _ } ->
@@ -1252,20 +1272,22 @@ stream_entries(QName, Name, LocalPid,
12521272 Name , LocalPid , Filter ) of
12531273 none ->
12541274 {Str0 # stream {chunk_iterator = Iter ,
1255- last_consumed_offset = Offset },
1275+ last_consumed_offset = Offset ,
1276+ unmatched = Unmatched + 1 },
12561277 Acc0 };
12571278 Msg ->
12581279 {Str0 # stream {chunk_iterator = Iter ,
12591280 delivery_count = delivery_count_add (DC , 1 ),
12601281 credit = Credit - 1 ,
1261- last_consumed_offset = Offset },
1282+ last_consumed_offset = Offset ,
1283+ unmatched = 0 },
12621284 [Msg | Acc0 ]}
12631285 end ;
12641286 false ->
12651287 {Str0 # stream {chunk_iterator = Iter }, Acc0 }
12661288 end
12671289 end ,
1268- stream_entries (QName , Name , LocalPid , Str , Acc )
1290+ stream_entries (QName , Name , CTag , LocalPid , Str , Acc )
12691291 end .
12701292
12711293chunk_iterator (# stream {credit = Credit ,
@@ -1300,14 +1322,14 @@ parse_uncompressed_subbatch(
13001322 Len :31 /unsigned ,
13011323 Entry :Len /binary ,
13021324 Rem /binary >>,
1303- Offset , StartOffset , QName , Name , LocalPid , Filter , Acc0 = {AccList , AccCount }) ->
1325+ Offset , StartOffset , QName , Name , LocalPid , Filter , Acc0 = {AccList , AccCount , Unmatched }) ->
13041326 Acc = case Offset >= StartOffset of
13051327 true ->
13061328 case entry_to_msg (Entry , Offset , QName , Name , LocalPid , Filter ) of
13071329 none ->
1308- Acc0 ;
1330+ setelement ( 3 , Acc0 , Unmatched + 1 ) ;
13091331 Msg ->
1310- {[Msg | AccList ], AccCount + 1 }
1332+ {[Msg | AccList ], AccCount + 1 , 0 }
13111333 end ;
13121334 false ->
13131335 Acc0
@@ -1418,6 +1440,37 @@ is_minority(All, Up) ->
14181440 MinQuorum = length (All ) div 2 + 1 ,
14191441 length (Up ) < MinQuorum .
14201442
1443+ actions (CTag , Msgs , # stream {ack = Ack } = Str0 ) ->
1444+ Str1 = maybe_drain (Str0 ),
1445+ {Str , Actions } = credit_reply (CTag , Str1 ),
1446+ {Str , deliver_actions (CTag , Ack , Msgs ) ++ Actions }.
1447+
1448+ maybe_drain (# stream {delivery_count = DeliveryCount ,
1449+ credit = Credit ,
1450+ drain = true ,
1451+ filtering_paused = false } = Str )
1452+ when Credit > 0 ->
1453+ Str # stream {delivery_count = serial_number :add (DeliveryCount , Credit ),
1454+ credit = 0 };
1455+ maybe_drain (Str ) ->
1456+ Str .
1457+
1458+ credit_reply (CTag , # stream {delivery_count = DeliveryCount ,
1459+ credit = Credit ,
1460+ drain = Drain ,
1461+ credit_reply_outstanding = true ,
1462+ filtering_paused = false } = Str ) ->
1463+ {Str # stream {credit_reply_outstanding = false },
1464+ [{credit_reply , CTag , DeliveryCount , Credit ,
1465+ available_messages (Str ), Drain }]};
1466+ credit_reply (_ , Str ) ->
1467+ {Str , []}.
1468+
1469+ % % Returns only an approximation.
1470+ available_messages (# stream {log = Log ,
1471+ last_consumed_offset = LastConsumedOffset }) ->
1472+ max (0 , osiris_log :committed_offset (Log ) - LastConsumedOffset ).
1473+
14211474deliver_actions (_ , _ , []) ->
14221475 [];
14231476deliver_actions (CTag , Ack , Msgs ) ->
0 commit comments