Skip to content

Commit f42c030

Browse files
committed
Message selectors for quorum queues
## What? Add message selectors to quorum queues: * with SQL syntax as per JMS spec * with AMQP filter expressions, e.g. to allow for message groups ## How? * message selector feature is enabled statically on the QQ via queue args * hold a subset of message headers in memory as defined in the queue args * Use gb_trees for high prio or normal prio messages instead of rabbit_fifo_q * no consumer specific queues for messages * each QQ consumer remembers its last scanned Ra index and continues scanning where it left off previously * Use another gb_trees for prompt message expiry * Add/recommend limits for * max consumers per queue * max messages per queue * max metadata size per message The latter two provide an overall limit on memory usage. Notes: * Round robin isn't guaranteed in all cases. However, this should be okay because JMS doesn't define how a broker should dispatch messages: > "Apart from the requirements of any message selectors, Jakarta Messaging does not define how messages are distributed between multiple consumers on the same queue." * gb_trees rebalancing should be acceptable given that emitting all Ra live indexes requires O(N) space anyway Alternatives: * List of lists approach was slower CPU wise due to scanning overhead, but better memory wise. Deletions must be stored separately in maps. * Ideally we need a skip list instead of gb_trees because it's simple, avoids O(n) rebalance, and could even append in O(1). However, this cannot be implemented in Erlang. A NIF would be one solution, but isn't interoperable. * Probabilistic skip lists are better suited than red-black trees. Red-black trees can't be efficiently implemnted in Erlang. Next steps: * requeue/returns: get all quorum_queue_filter_SUITE tests green
1 parent d265fdd commit f42c030

File tree

8 files changed

+1426
-131
lines changed

8 files changed

+1426
-131
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 377 additions & 81 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121

2222
-define(DELIVERY_SEND_MSG_OPTS, [local, ra_event]).
2323

24+
%% These shorthands for section qualifiers are defined in filtex-v1.0-wd09 §6.4.4.4
25+
-define(HEADER_SECTION, h).
26+
-define(MESSAGE_ANNOTATIONS_SECTION, m).
27+
-define(PROPERTIES_SECTION, p).
28+
-define(APPLICATION_PROPERTIES_SECTION, a).
29+
2430
-type optimised_tuple(A, B) :: nonempty_improper_list(A, B).
2531

2632
-type option(T) :: undefined | T.
@@ -38,12 +44,33 @@
3844
%% in enqueue messages. Used to ensure ordering of messages send from the
3945
%% same process
4046

47+
-type msg_section() ::
48+
header |
49+
message_annotations |
50+
properties |
51+
application_properties.
52+
53+
%% AMQP §3.2
54+
-type msg_section_short() ::
55+
?HEADER_SECTION |
56+
?MESSAGE_ANNOTATIONS_SECTION |
57+
?PROPERTIES_SECTION |
58+
?APPLICATION_PROPERTIES_SECTION.
59+
60+
-type field_name() :: atom() | binary().
61+
62+
-type filter_fields() :: [{msg_section(), field_name()}].
63+
64+
%% Message metadata held in memory for consumers to filter on.
65+
-type msg_metadata() :: [{msg_section_short(), field_name(), term()}].
66+
4167
-type msg_header() :: msg_size() |
4268
optimised_tuple(msg_size(), Expiry :: milliseconds()) |
4369
#{size := msg_size(),
4470
acquired_count => non_neg_integer(),
4571
delivery_count => non_neg_integer(),
46-
expiry => milliseconds()}.
72+
expiry => milliseconds(),
73+
meta => msg_metadata()}.
4774
%% The message header:
4875
%% size: The size of the message payload in bytes.
4976
%% delivery_count: The number of unsuccessful delivery attempts.
@@ -116,6 +143,8 @@
116143
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).
117144

118145
-type milliseconds() :: non_neg_integer().
146+
-type timestamp() :: milliseconds().
147+
119148
-record(consumer_cfg,
120149
{meta = #{} :: consumer_meta(),
121150
pid :: pid(),
@@ -127,7 +156,9 @@
127156
%% command: `{credit, ReceiverDeliveryCount, Credit}'
128157
credit_mode :: credited | credit_mode(),
129158
lifetime = once :: once | auto,
130-
priority = 0 :: integer()}).
159+
priority = 0 :: integer(),
160+
filter = [] :: rabbit_amqp_filtex:filter_expressions()
161+
}).
131162

132163
-record(consumer,
133164
{cfg = #consumer_cfg{},
@@ -138,7 +169,8 @@
138169
%% decremented for each delivery
139170
credit = 0 :: non_neg_integer(),
140171
%% AMQP 1.0 §2.6.7
141-
delivery_count :: rabbit_queue_type:delivery_count()
172+
delivery_count :: rabbit_queue_type:delivery_count(),
173+
max_filtered_idxs = {0, 0} :: {HighPrio :: ra:index(), NormalPrio :: ra:index()}
142174
}).
143175

144176
-type consumer() :: #consumer{}.
@@ -169,23 +201,38 @@
169201
overflow_strategy = drop_head :: drop_head | reject_publish,
170202
max_length :: option(non_neg_integer()),
171203
max_bytes :: option(non_neg_integer()),
204+
max_bytes_meta :: option(non_neg_integer()),
172205
%% whether single active consumer is on or not for this queue
173206
consumer_strategy = competing :: consumer_strategy(),
174207
%% the maximum number of unsuccessful delivery attempts permitted
175208
delivery_limit :: option(non_neg_integer()),
176209
expires :: option(milliseconds()),
177210
msg_ttl :: option(milliseconds()),
211+
filter_enabled :: boolean(),
212+
%% TODO Probably no need to store this filter here.
213+
%% It should be good enough to store the 0.9.1 queue args filter table
214+
%% in Khepri and to validate the consumer filter against this queue
215+
%% filter in rabbit_quorum_queue when attaching the consumer.
216+
%% Maybe still useful to expose effective filters in the Management UI?
217+
filter_fields :: filter_fields(),
178218
unused_2 = ?NIL,
179219
unused_3 = ?NIL
180220
}).
181221

182222
-record(rabbit_fifo,
183223
{cfg :: #cfg{},
184224
% unassigned messages
185-
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
225+
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state() | rabbit_fifo_filter_q:state(),
186226
messages_total = 0 :: non_neg_integer(),
187227
% queue of returned msg_in_ids - when checking out it picks from
188228
returns = lqueue:new() :: lqueue:lqueue(term()),
229+
%% * only used if filtering is enabled
230+
%% * contains messages that are available and have a TTL set
231+
%% (we do not expire acquired messages)
232+
filter_msgs_expiry = gb_trees:empty() :: gb_trees:tree(
233+
{timestamp(), ra:index()},
234+
%% reference into where this msg is stored
235+
hi | no | returns),
189236
% a counter of enqueues - used to trigger shadow copy points
190237
% reset to 0 when release_cursor gets stored
191238
enqueue_count = 0 :: non_neg_integer(),
@@ -201,6 +248,11 @@
201248
unused_1 = ?NIL,
202249
% consumers need to reflect consumer state at time of snapshot
203250
consumers = #{} :: #{consumer_key() => consumer()},
251+
%% TODO should only be used for filtering enabled queues.
252+
%% All active consumers including consumers that may be down or may contain 0 credits.
253+
%% An auxiliary data structure used as the base service queue after a new message has
254+
%% been enqueued.
255+
consumers_q = priority_queue:new() :: priority_queue:q(),
204256
% consumers that require further service are queued here
205257
service_queue = priority_queue:new() :: priority_queue:q(),
206258
%% state for at-least-once dead-lettering
@@ -223,8 +275,10 @@
223275
checkpoint_max_indexes => non_neg_integer(),
224276
max_length => non_neg_integer(),
225277
max_bytes => non_neg_integer(),
278+
max_bytes_meta => non_neg_integer(),
226279
overflow_strategy => drop_head | reject_publish,
227280
single_active_consumer_on => boolean(),
281+
filter => false | filter_fields(),
228282
delivery_limit => non_neg_integer() | -1,
229283
expires => non_neg_integer(),
230284
msg_ttl => non_neg_integer(),

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
% status = up :: up | cancelled,
5353
last_msg_id :: seq() | -1 | undefined,
5454
ack = false :: boolean(),
55+
filter :: rabbit_amqp_filter:filter_expression(),
5556
%% Remove this field when feature flag rabbitmq_4.0.0 becomes required.
5657
delivery_count :: {credit_api_v1, rabbit_queue_type:delivery_count()} |
5758
credit_api_v2
@@ -165,7 +166,14 @@ enqueue(QName, Correlation, Msg,
165166
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
166167
ServerId = pick_server(State0),
167168
% by default there is no correlation id
168-
Cmd = rabbit_fifo:make_enqueue(self(), EnqueueSeq, Msg),
169+
%% TODO Check which metadata this queue filters on.
170+
{utf8, GroupId} = mc:property(group_id, Msg),
171+
%% TODO rabbit_fifo should provide this shorthand.
172+
%% Probably it's best if rabbit_fifo_filter provides some client API
173+
%% to map the subset of metadata to filter on (set via queue args) to
174+
%% how to extract this from mc and store this in e3{}.
175+
MsgMeta = [{_ShorthandPropertiesSection = p, group_id, GroupId}],
176+
Cmd = rabbit_fifo:make_enqueue(self(), EnqueueSeq, Msg, MsgMeta),
169177
ok = ra:pipeline_command(ServerId, Cmd, Seq, low),
170178
IsSlow = map_size(Pending) >= SftLmt,
171179
State = State0#state{pending = Pending#{Seq => {Correlation, Cmd}},
@@ -418,12 +426,15 @@ checkout(ConsumerTag, CreditMode, #{} = Meta,
418426
false -> {credit_api_v1, 0}
419427
end,
420428
ConsumerKey = maps:get(key, Reply, ConsumerId),
429+
Filter = maps:get(filter, Meta),
421430
SDels = maps:update_with(
422431
ConsumerTag,
423-
fun (C) -> C#consumer{ack = Ack} end,
432+
fun (C) -> C#consumer{ack = Ack,
433+
filter = Filter} end,
424434
#consumer{key = ConsumerKey,
425435
last_msg_id = LastMsgId,
426436
ack = Ack,
437+
filter = Filter,
427438
delivery_count = DeliveryCount},
428439
CDels0),
429440
{ok, Reply, State0#state{leader = Leader,
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
-module(rabbit_fifo_filter).
2+
3+
-include("rabbit_fifo.hrl").
4+
5+
-export([filter/2]).
6+
7+
filter(Filters, MsgMetaData) ->
8+
%% "A message will pass through a filter-set if and only if
9+
%% it passes through each of the named filters." [3.5.8]
10+
lists:all(fun(Filter) ->
11+
filter0(Filter, MsgMetaData)
12+
end, Filters).
13+
14+
filter0({properties, KVList}, MsgMetaData) ->
15+
%% "The filter evaluates to true if all properties enclosed in the filter expression
16+
%% match the respective properties in the message."
17+
%% [filtex-v1.0-wd09 4.2.4]
18+
lists:all(
19+
fun({RefField, RefVal}) ->
20+
case lists:search(fun({?PROPERTIES_SECTION, Field, _Val})
21+
when Field =:= RefField ->
22+
true;
23+
(_) ->
24+
false
25+
end, MsgMetaData) of
26+
{value, {_, _, RefVal}} ->
27+
true;
28+
_ ->
29+
false
30+
end
31+
end, KVList).
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
%% A filterable queue with efficient random access and deletion.
2+
-module(rabbit_fifo_filter_q).
3+
4+
-include("rabbit_fifo.hrl").
5+
-export([
6+
new/0,
7+
in/3,
8+
len/1,
9+
take/3,
10+
take/4,
11+
is_fully_scanned/2,
12+
overview/1
13+
]).
14+
15+
16+
-record(?MODULE, {
17+
hi :: gb_trees:tree(ra:index(), msg_header()),
18+
no :: gb_trees:tree(ra:index(), msg_header()),
19+
%% cache largest to avoid frequent gb_trees:largest/1 calls
20+
hi_max :: ra:index(),
21+
no_max :: ra:index()
22+
}).
23+
24+
-opaque state() :: #?MODULE{}.
25+
26+
-export_type([state/0]).
27+
28+
-spec new() -> state().
29+
new() ->
30+
#?MODULE{hi = gb_trees:empty(),
31+
no = gb_trees:empty(),
32+
hi_max = 0,
33+
no_max = 0}.
34+
35+
-spec in(hi | no, msg(), state()) -> state().
36+
in(hi, ?MSG(RaIdx, Hdr), State = #?MODULE{hi = Hi,
37+
hi_max = HiMax})
38+
when RaIdx > HiMax ->
39+
State#?MODULE{hi = gb_trees:insert(RaIdx, Hdr, Hi),
40+
hi_max = RaIdx};
41+
in(no, ?MSG(RaIdx, Hdr), State = #?MODULE{no = No,
42+
no_max = NoMax})
43+
when RaIdx > NoMax ->
44+
State#?MODULE{no = gb_trees:insert(RaIdx, Hdr, No),
45+
no_max = RaIdx}.
46+
47+
-spec len(state()) -> non_neg_integer().
48+
len(#?MODULE{hi = Hi,
49+
no = No}) ->
50+
gb_trees:size(Hi) +
51+
gb_trees:size(No).
52+
53+
%% Assumes the provided Ra index exists in the tree.
54+
-spec take(ra:index(), hi | no, state()) ->
55+
{msg(), state()}.
56+
take(Idx, hi, State0 = #?MODULE{hi = Hi0,
57+
hi_max = HiMax0}) ->
58+
{Hdr, Hi} = gb_trees:take(Idx, Hi0),
59+
HiMax = new_largest(Idx, HiMax0, Hi),
60+
State = State0#?MODULE{hi = Hi,
61+
hi_max = HiMax},
62+
{?MSG(Idx, Hdr), State};
63+
take(Idx, no, State0 = #?MODULE{no = No0,
64+
no_max = NoMax0}) ->
65+
{Hdr, No} = gb_trees:take(Idx, No0),
66+
NoMax = new_largest(Idx, NoMax0, No),
67+
State = State0#?MODULE{no = No,
68+
no_max = NoMax},
69+
{?MSG(Idx, Hdr), State}.
70+
71+
-spec take(timestamp(),
72+
rabbit_amqp_filtex:filter_expressions(),
73+
{ra:index(), ra:index()},
74+
state()) ->
75+
{empty, {ra:index(), ra:index()}} |
76+
{msg(), {ra:index(), ra:index()}, state()}.
77+
take(RaTs, Filter, {HiIdx, NoIdx}, State0 = #?MODULE{hi = Hi0,
78+
hi_max = HiMax0})
79+
when HiIdx < HiMax0 ->
80+
case filter(RaTs, Filter, HiIdx, Hi0) of
81+
none ->
82+
take(RaTs, Filter, {HiMax0, NoIdx}, State0);
83+
Msg = ?MSG(Idx, _Hdr) ->
84+
Hi = gb_trees:delete(Idx, Hi0),
85+
HiMax = new_largest(Idx, HiMax0, Hi),
86+
State = State0#?MODULE{hi = Hi,
87+
hi_max = HiMax},
88+
{Msg, {Idx, NoIdx}, State}
89+
end;
90+
take(RaTs, Filter, {HiIdx, NoIdx}, State0 = #?MODULE{no = No0,
91+
no_max = NoMax0})
92+
when NoIdx < NoMax0 ->
93+
case filter(RaTs, Filter, NoIdx, No0) of
94+
none ->
95+
{empty, {HiIdx, NoMax0}};
96+
Msg = ?MSG(Idx, _Hdr) ->
97+
No = gb_trees:delete(Idx, No0),
98+
NoMax = new_largest(Idx, NoMax0, No),
99+
State = State0#?MODULE{no = No,
100+
no_max = NoMax},
101+
{Msg, {HiIdx, Idx}, State}
102+
end;
103+
take(_RaTs, _Filter, Idxs, _State) ->
104+
{empty, Idxs}.
105+
106+
new_largest(Idx, Idx, Tree) ->
107+
%% The largest index was taken.
108+
case gb_trees:is_empty(Tree) of
109+
true ->
110+
Idx;
111+
false ->
112+
{NewLargest, _Val} = gb_trees:largest(Tree),
113+
NewLargest
114+
end;
115+
new_largest(_IdxTaken, OldLargest, _Tree) ->
116+
OldLargest.
117+
118+
filter(RaTs, Filter, FromIdx, Tree) ->
119+
Iter = gb_trees:iterator_from(FromIdx, Tree),
120+
filter_msg0(RaTs, Filter, gb_trees:next(Iter)).
121+
122+
filter_msg0(_RaTs, _Filter, none) ->
123+
none;
124+
filter_msg0(RaTs, Filter, {Idx, Hdr = #{meta := Meta}, Iter}) ->
125+
case rabbit_fifo:get_header(expiry, Hdr) of
126+
ExpiryTs when is_integer(ExpiryTs) andalso RaTs >= ExpiryTs ->
127+
%% Message expired.
128+
filter_msg0(RaTs, Filter, gb_trees:next(Iter));
129+
_ ->
130+
case rabbit_fifo_filter:filter(Filter, Meta) of
131+
true ->
132+
?MSG(Idx, Hdr);
133+
false ->
134+
filter_msg0(RaTs, Filter, gb_trees:next(Iter))
135+
end
136+
end.
137+
138+
-spec is_fully_scanned({ra:index(), ra:index()}, state()) ->
139+
boolean().
140+
is_fully_scanned({HiIdx, NoIdx}, #?MODULE{hi_max = HiMax,
141+
no_max = NoMax}) ->
142+
HiIdx >= HiMax andalso
143+
NoIdx >= NoMax.
144+
145+
-spec overview(state()) ->
146+
#{len := non_neg_integer(),
147+
num_hi := non_neg_integer(),
148+
num_no := non_neg_integer()}.
149+
overview(#?MODULE{hi = Hi,
150+
no = No}) ->
151+
NumHi = gb_trees:size(Hi),
152+
NumNo = gb_trees:size(No),
153+
#{len => NumHi + NumNo,
154+
num_hi => NumHi,
155+
num_no => NumNo}.

0 commit comments

Comments
 (0)