Skip to content

Commit 36a8b1e

Browse files
committed
Message selectors for quorum queues
## What? Add message selectors to quorum queues: * with SQL syntax as per JMS spec * with AMQP filter expressions, e.g. to allow for message groups ## How? * message selector feature is enabled statically on the QQ via queue args * hold a subset of message headers in memory as defined in the queue args * Use gb_trees for high prio or normal prio messages instead of rabbit_fifo_q * no consumer specific queues for messages * each QQ consumer remembers its last scanned Ra index and continues scanning where it left off previously * Use another gb_trees for prompt message expiry * Add/recommend limits for * max consumers per queue * max messages per queue * max metadata size per message The latter two provide an overall limit on memory usage. Notes: * Round robin isn't guaranteed in all cases. However, this should be okay because JMS doesn't define how a broker should dispatch messages: > "Apart from the requirements of any message selectors, Jakarta Messaging does not define how messages are distributed between multiple consumers on the same queue." * gb_trees rebalancing should be acceptable given that emitting all Ra live indexes requires O(N) space anyway Alternatives: * List of lists approach was slower CPU wise due to scanning overhead, but better memory wise. Deletions must be stored separately in maps. * Ideally we need a skip list instead of gb_trees because it's simple, avoids O(n) rebalance, and could even append in O(1). However, this cannot be implemented in Erlang. A NIF would be one solution, but isn't interoperable. * Probabilistic skip lists are better suited than red-black trees. Red-black trees can't be efficiently implemnted in Erlang. Next steps: * end to end spike
1 parent d6a9c63 commit 36a8b1e

File tree

8 files changed

+1546
-166
lines changed

8 files changed

+1546
-166
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 490 additions & 114 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121

2222
-define(DELIVERY_SEND_MSG_OPTS, [local, ra_event]).
2323

24+
%% These shorthands for section qualifiers are defined in filtex-v1.0-wd09 §6.4.4.4
25+
-define(HEADER_SECTION, h).
26+
-define(MESSAGE_ANNOTATIONS_SECTION, m).
27+
-define(PROPERTIES_SECTION, p).
28+
-define(APPLICATION_PROPERTIES_SECTION, a).
29+
2430
-type optimised_tuple(A, B) :: nonempty_improper_list(A, B).
2531

2632
-type option(T) :: undefined | T.
@@ -38,12 +44,33 @@
3844
%% in enqueue messages. Used to ensure ordering of messages send from the
3945
%% same process
4046

47+
-type msg_section() ::
48+
header |
49+
message_annotations |
50+
properties |
51+
application_properties.
52+
53+
%% AMQP §3.2
54+
-type msg_section_short() ::
55+
?HEADER_SECTION |
56+
?MESSAGE_ANNOTATIONS_SECTION |
57+
?PROPERTIES_SECTION |
58+
?APPLICATION_PROPERTIES_SECTION.
59+
60+
-type field_name() :: atom() | binary().
61+
62+
-type filter_fields() :: [{msg_section(), field_name()}].
63+
64+
%% Message metadata held in memory for consumers to filter on.
65+
-type msg_metadata() :: [{msg_section_short(), field_name(), term()}].
66+
4167
-type msg_header() :: msg_size() |
4268
optimised_tuple(msg_size(), Expiry :: milliseconds()) |
4369
#{size := msg_size(),
4470
acquired_count => non_neg_integer(),
4571
delivery_count => non_neg_integer(),
46-
expiry => milliseconds()}.
72+
expiry => milliseconds(),
73+
meta => msg_metadata()}.
4774
%% The message header:
4875
%% size: The size of the message payload in bytes.
4976
%% delivery_count: The number of unsuccessful delivery attempts.
@@ -116,6 +143,11 @@
116143
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).
117144

118145
-type milliseconds() :: non_neg_integer().
146+
-type timestamp() :: milliseconds().
147+
148+
-type counter() :: non_neg_integer().
149+
-type return_counter() :: counter().
150+
119151
-record(consumer_cfg,
120152
{meta = #{} :: consumer_meta(),
121153
pid :: pid(),
@@ -127,7 +159,9 @@
127159
%% command: `{credit, ReceiverDeliveryCount, Credit}'
128160
credit_mode :: credited | credit_mode(),
129161
lifetime = once :: once | auto,
130-
priority = 0 :: integer()}).
162+
priority = 0 :: integer(),
163+
filter = [] :: rabbit_amqp_filtex:filter_expressions()
164+
}).
131165

132166
-record(consumer,
133167
{cfg = #consumer_cfg{},
@@ -138,7 +172,10 @@
138172
%% decremented for each delivery
139173
credit = 0 :: non_neg_integer(),
140174
%% AMQP 1.0 §2.6.7
141-
delivery_count :: rabbit_queue_type:delivery_count()
175+
delivery_count :: rabbit_queue_type:delivery_count(),
176+
scanned_idxs = {0, 0} :: {HighPrio :: ra:index(),
177+
NormalPrio :: ra:index()},
178+
scanned_returns = 0 :: return_counter()
142179
}).
143180

144181
-type consumer() :: #consumer{}.
@@ -169,26 +206,44 @@
169206
overflow_strategy = drop_head :: drop_head | reject_publish,
170207
max_length :: option(non_neg_integer()),
171208
max_bytes :: option(non_neg_integer()),
209+
max_bytes_meta :: option(non_neg_integer()),
172210
%% whether single active consumer is on or not for this queue
173211
consumer_strategy = competing :: consumer_strategy(),
174212
%% the maximum number of unsuccessful delivery attempts permitted
175213
delivery_limit :: option(non_neg_integer()),
176214
expires :: option(milliseconds()),
177215
msg_ttl :: option(milliseconds()),
216+
filter_enabled :: boolean(),
217+
%% TODO Probably no need to store this filter here.
218+
%% It should be good enough to store the 0.9.1 queue args filter table
219+
%% in Khepri and to validate the consumer filter against this queue
220+
%% filter in rabbit_quorum_queue when attaching the consumer.
221+
%% Maybe still useful to expose effective filters in the Management UI?
222+
filter_fields :: filter_fields(),
178223
unused_2 = ?NIL,
179224
unused_3 = ?NIL
180225
}).
181226

182227
-record(rabbit_fifo,
183228
{cfg :: #cfg{},
184229
% unassigned messages
185-
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
230+
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state() | rabbit_fifo_filter_q:state(),
186231
messages_total = 0 :: non_neg_integer(),
187232
% queue of returned msg_in_ids - when checking out it picks from
188-
returns = lqueue:new() :: lqueue:lqueue(term()),
233+
returns = lqueue:new() :: lqueue:lqueue(term()) | gb_trees:tree(
234+
return_counter(),
235+
msg()),
236+
%% * only used if filtering is enabled
237+
%% * contains messages that are available and have a TTL set
238+
%% (we do not expire acquired messages)
239+
filter_msgs_expiry = gb_trees:empty() :: gb_trees:tree(
240+
{timestamp(), ra:index()},
241+
%% reference into where this msg is stored
242+
hi | no | return_counter()),
189243
% a counter of enqueues - used to trigger shadow copy points
190244
% reset to 0 when release_cursor gets stored
191-
enqueue_count = 0 :: non_neg_integer(),
245+
enqueue_count = 0 :: counter(),
246+
return_count = 0 :: return_counter(),
192247
% a map containing all the live processes that have ever enqueued
193248
% a message to this queue
194249
enqueuers = #{} :: #{pid() => #enqueuer{}},
@@ -201,6 +256,11 @@
201256
unused_1 = ?NIL,
202257
% consumers need to reflect consumer state at time of snapshot
203258
consumers = #{} :: #{consumer_key() => consumer()},
259+
%% TODO should only be used for filtering enabled queues.
260+
%% All active consumers including consumers that may be down or may contain 0 credits.
261+
%% An auxiliary data structure used as the base service queue after a new message has
262+
%% been enqueued.
263+
consumers_q = priority_queue:new() :: priority_queue:q(),
204264
% consumers that require further service are queued here
205265
service_queue = priority_queue:new() :: priority_queue:q(),
206266
%% state for at-least-once dead-lettering
@@ -223,8 +283,10 @@
223283
checkpoint_max_indexes => non_neg_integer(),
224284
max_length => non_neg_integer(),
225285
max_bytes => non_neg_integer(),
286+
max_bytes_meta => non_neg_integer(),
226287
overflow_strategy => drop_head | reject_publish,
227288
single_active_consumer_on => boolean(),
289+
filter => false | filter_fields(),
228290
delivery_limit => non_neg_integer() | -1,
229291
expires => non_neg_integer(),
230292
msg_ttl => non_neg_integer(),

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
% status = up :: up | cancelled,
5353
last_msg_id :: seq() | -1 | undefined,
5454
ack = false :: boolean(),
55+
filter :: rabbit_amqp_filter:filter_expression(),
5556
%% Remove this field when feature flag rabbitmq_4.0.0 becomes required.
5657
delivery_count :: {credit_api_v1, rabbit_queue_type:delivery_count()} |
5758
credit_api_v2
@@ -165,7 +166,14 @@ enqueue(QName, Correlation, Msg,
165166
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
166167
ServerId = pick_server(State0),
167168
% by default there is no correlation id
168-
Cmd = rabbit_fifo:make_enqueue(self(), EnqueueSeq, Msg),
169+
%% TODO Check which metadata this queue filters on.
170+
{utf8, GroupId} = mc:property(group_id, Msg),
171+
%% TODO rabbit_fifo should provide this shorthand.
172+
%% Probably it's best if rabbit_fifo_filter provides some client API
173+
%% to map the subset of metadata to filter on (set via queue args) to
174+
%% how to extract this from mc and store this in e3{}.
175+
MsgMeta = [{_ShorthandPropertiesSection = p, group_id, GroupId}],
176+
Cmd = rabbit_fifo:make_enqueue(self(), EnqueueSeq, Msg, MsgMeta),
169177
ok = ra:pipeline_command(ServerId, Cmd, Seq, low),
170178
IsSlow = map_size(Pending) >= SftLmt,
171179
State = State0#state{pending = Pending#{Seq => {Correlation, Cmd}},
@@ -418,12 +426,15 @@ checkout(ConsumerTag, CreditMode, #{} = Meta,
418426
false -> {credit_api_v1, 0}
419427
end,
420428
ConsumerKey = maps:get(key, Reply, ConsumerId),
429+
Filter = maps:get(filter, Meta),
421430
SDels = maps:update_with(
422431
ConsumerTag,
423-
fun (C) -> C#consumer{ack = Ack} end,
432+
fun (C) -> C#consumer{ack = Ack,
433+
filter = Filter} end,
424434
#consumer{key = ConsumerKey,
425435
last_msg_id = LastMsgId,
426436
ack = Ack,
437+
filter = Filter,
427438
delivery_count = DeliveryCount},
428439
CDels0),
429440
{ok, Reply, State0#state{leader = Leader,
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
-module(rabbit_fifo_filter).
2+
3+
-include("rabbit_fifo.hrl").
4+
5+
-export([filter/2]).
6+
7+
filter(Filters, MsgMetaData) ->
8+
%% "A message will pass through a filter-set if and only if
9+
%% it passes through each of the named filters." [3.5.8]
10+
lists:all(fun(Filter) ->
11+
filter0(Filter, MsgMetaData)
12+
end, Filters).
13+
14+
filter0({properties, KVList}, MsgMetaData) ->
15+
%% "The filter evaluates to true if all properties enclosed in the filter expression
16+
%% match the respective properties in the message."
17+
%% [filtex-v1.0-wd09 4.2.4]
18+
lists:all(
19+
fun({RefField, RefVal}) ->
20+
case lists:search(fun({?PROPERTIES_SECTION, Field, _Val})
21+
when Field =:= RefField ->
22+
true;
23+
(_) ->
24+
false
25+
end, MsgMetaData) of
26+
{value, {_, _, RefVal}} ->
27+
true;
28+
_ ->
29+
false
30+
end
31+
end, KVList).
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
%% A filterable queue with efficient random access and deletion.
2+
-module(rabbit_fifo_filter_q).
3+
4+
-include("rabbit_fifo.hrl").
5+
-export([
6+
new/0,
7+
in/3,
8+
size/1,
9+
take/3,
10+
take/4,
11+
is_fully_scanned/2,
12+
overview/1
13+
]).
14+
15+
16+
-record(?MODULE, {
17+
hi :: gb_trees:tree(ra:index(), msg_header()),
18+
no :: gb_trees:tree(ra:index(), msg_header()),
19+
%% cache largest to avoid frequent gb_trees:largest/1 calls
20+
hi_max :: ra:index(),
21+
no_max :: ra:index()
22+
}).
23+
24+
-opaque state() :: #?MODULE{}.
25+
26+
-export_type([state/0]).
27+
28+
-spec new() -> state().
29+
new() ->
30+
#?MODULE{hi = gb_trees:empty(),
31+
no = gb_trees:empty(),
32+
hi_max = 0,
33+
no_max = 0}.
34+
35+
-spec in(hi | no, msg(), state()) -> state().
36+
in(hi, ?MSG(RaIdx, Hdr), State = #?MODULE{hi = Hi,
37+
hi_max = HiMax})
38+
when RaIdx > HiMax ->
39+
State#?MODULE{hi = gb_trees:insert(RaIdx, Hdr, Hi),
40+
hi_max = RaIdx};
41+
in(no, ?MSG(RaIdx, Hdr), State = #?MODULE{no = No,
42+
no_max = NoMax})
43+
when RaIdx > NoMax ->
44+
State#?MODULE{no = gb_trees:insert(RaIdx, Hdr, No),
45+
no_max = RaIdx}.
46+
47+
-spec size(state()) -> non_neg_integer().
48+
size(#?MODULE{hi = Hi,
49+
no = No}) ->
50+
gb_trees:size(Hi) +
51+
gb_trees:size(No).
52+
53+
%% Assumes the provided Ra index exists in the tree.
54+
-spec take(ra:index(), hi | no, state()) ->
55+
{msg(), state()}.
56+
take(Idx, hi, State0 = #?MODULE{hi = Hi0,
57+
hi_max = HiMax0}) ->
58+
{Hdr, Hi} = gb_trees:take(Idx, Hi0),
59+
HiMax = new_largest(Idx, HiMax0, Hi),
60+
State = State0#?MODULE{hi = Hi,
61+
hi_max = HiMax},
62+
{?MSG(Idx, Hdr), State};
63+
take(Idx, no, State0 = #?MODULE{no = No0,
64+
no_max = NoMax0}) ->
65+
{Hdr, No} = gb_trees:take(Idx, No0),
66+
NoMax = new_largest(Idx, NoMax0, No),
67+
State = State0#?MODULE{no = No,
68+
no_max = NoMax},
69+
{?MSG(Idx, Hdr), State}.
70+
71+
-spec take(timestamp(),
72+
rabbit_amqp_filtex:filter_expressions(),
73+
{ra:index(), ra:index()},
74+
state()) ->
75+
{empty, {ra:index(), ra:index()}} |
76+
{msg(), {ra:index(), ra:index()}, state()}.
77+
take(RaTs, Filter, {HiIdx, NoIdx}, State0 = #?MODULE{hi = Hi0,
78+
hi_max = HiMax0})
79+
when HiIdx < HiMax0 ->
80+
case filter(RaTs, Filter, HiIdx, Hi0) of
81+
none ->
82+
take(RaTs, Filter, {HiMax0, NoIdx}, State0);
83+
Msg = ?MSG(Idx, _Hdr) ->
84+
Hi = gb_trees:delete(Idx, Hi0),
85+
HiMax = new_largest(Idx, HiMax0, Hi),
86+
State = State0#?MODULE{hi = Hi,
87+
hi_max = HiMax},
88+
{Msg, {Idx, NoIdx}, State}
89+
end;
90+
take(RaTs, Filter, {HiIdx, NoIdx}, State0 = #?MODULE{no = No0,
91+
no_max = NoMax0})
92+
when NoIdx < NoMax0 ->
93+
case filter(RaTs, Filter, NoIdx, No0) of
94+
none ->
95+
{empty, {HiIdx, NoMax0}};
96+
Msg = ?MSG(Idx, _Hdr) ->
97+
No = gb_trees:delete(Idx, No0),
98+
NoMax = new_largest(Idx, NoMax0, No),
99+
State = State0#?MODULE{no = No,
100+
no_max = NoMax},
101+
{Msg, {HiIdx, Idx}, State}
102+
end;
103+
take(_RaTs, _Filter, Idxs, _State) ->
104+
{empty, Idxs}.
105+
106+
new_largest(Idx, Idx, Tree) ->
107+
%% The largest index was taken.
108+
case gb_trees:is_empty(Tree) of
109+
true ->
110+
Idx;
111+
false ->
112+
{NewLargest, _Val} = gb_trees:largest(Tree),
113+
NewLargest
114+
end;
115+
new_largest(_IdxTaken, OldLargest, _Tree) ->
116+
OldLargest.
117+
118+
filter(RaTs, Filter, ScannedIdx, Tree) ->
119+
FromIdx = ScannedIdx + 1,
120+
Iter = gb_trees:iterator_from(FromIdx, Tree),
121+
filter_msg0(RaTs, Filter, gb_trees:next(Iter)).
122+
123+
filter_msg0(_RaTs, _Filter, none) ->
124+
none;
125+
filter_msg0(RaTs, Filter, {Idx, Hdr = #{meta := Meta}, Iter}) ->
126+
case rabbit_fifo:get_header(expiry, Hdr) of
127+
ExpiryTs when is_integer(ExpiryTs) andalso RaTs >= ExpiryTs ->
128+
%% Message expired.
129+
filter_msg0(RaTs, Filter, gb_trees:next(Iter));
130+
_ ->
131+
case rabbit_fifo_filter:filter(Filter, Meta) of
132+
true ->
133+
?MSG(Idx, Hdr);
134+
false ->
135+
filter_msg0(RaTs, Filter, gb_trees:next(Iter))
136+
end
137+
end.
138+
139+
-spec is_fully_scanned({ra:index(), ra:index()}, state()) ->
140+
boolean().
141+
is_fully_scanned({HiIdx, NoIdx}, #?MODULE{hi_max = HiMax,
142+
no_max = NoMax}) ->
143+
HiIdx >= HiMax andalso
144+
NoIdx >= NoMax.
145+
146+
-spec overview(state()) ->
147+
#{len := non_neg_integer(),
148+
num_hi := non_neg_integer(),
149+
num_no := non_neg_integer()}.
150+
overview(#?MODULE{hi = Hi,
151+
no = No}) ->
152+
NumHi = gb_trees:size(Hi),
153+
NumNo = gb_trees:size(No),
154+
#{len => NumHi + NumNo,
155+
num_hi => NumHi,
156+
num_no => NumNo}.

0 commit comments

Comments
 (0)