Skip to content

Commit 387391a

Browse files
committed
Impose limit on AMQP filter complexity
As described in section 7.1 of filtex-v1.0-wd09: > Impose a limit on the complexity of each filter expression. Here, we hard code the maximum properties within a filter expression to 16. There should never be a use case requiring to filter on more than 16 different properties.
1 parent 60c2bdf commit 387391a

File tree

2 files changed

+46
-9
lines changed

2 files changed

+46
-9
lines changed

deps/rabbit/src/rabbit_amqp_filtex.erl

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,24 @@
1313
-export([validate/1,
1414
filter/2]).
1515

16+
%% "Impose a limit on the complexity of each filter expression."
17+
%% [filtex-v1.0-wd09 7.1]
18+
-define(MAX_FILTER_FIELDS, 16).
19+
1620
-type simple_type() :: number() | binary() | atom().
1721
-type affix() :: {suffix, non_neg_integer(), binary()} |
1822
{prefix, non_neg_integer(), binary()}.
1923
-type filter_expression_value() :: simple_type() | affix().
20-
-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}]} |
21-
{application_properties, [{binary(), filter_expression_value()}]}.
24+
-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}, ...]} |
25+
{application_properties, [{binary(), filter_expression_value()}, ...]}.
2226
-type filter_expressions() :: [filter_expression()].
2327
-export_type([filter_expressions/0]).
2428

2529
-spec validate(tuple()) ->
2630
{ok, filter_expression()} | error.
27-
validate({described, Descriptor, {map, KVList}}) ->
31+
validate({described, Descriptor, {map, KVList}})
32+
when KVList =/= [] andalso
33+
length(KVList) =< ?MAX_FILTER_FIELDS ->
2834
try validate0(Descriptor, KVList)
2935
catch throw:{?MODULE, _, _} ->
3036
error
@@ -108,14 +114,12 @@ match_simple_type(RefVal, Val) ->
108114
RefVal == Val.
109115

110116
validate0(Descriptor, KVList) when
111-
(Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse
112-
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER}) andalso
113-
KVList =/= [] ->
117+
Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse
118+
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER} ->
114119
validate_props(KVList, []);
115120
validate0(Descriptor, KVList) when
116-
(Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse
117-
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso
118-
KVList =/= [] ->
121+
Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse
122+
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER} ->
119123
validate_app_props(KVList, []);
120124
validate0(_, _) ->
121125
error.

deps/rabbit/test/amqp_filtex_SUITE.erl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,39 @@ application_properties_section(Config) ->
377377
?assertEqual([<<"m4">>], amqp10_msg:body(R4M4)),
378378
ok = detach_link_sync(Receiver4),
379379

380+
%% Complex filter (too many properties to filter on) should fail validation in the server.
381+
%% RabbitMQ should exclude this filter in its reply attach frame because
382+
%% "the sending endpoint [RabbitMQ] sets the filter actually in place".
383+
%% Hence, no filter expression is actually in place and we should receive all messages.
384+
AppPropsFilter5 = [{{utf8, integer_to_binary(N)}, {uint, 1}} ||
385+
N <- lists:seq(1, 17)],
386+
Filter5 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
387+
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter5}},
388+
{ok, Receiver5} = amqp10_client:attach_receiver_link(
389+
Session, <<"receiver 5">>, Address,
390+
unsettled, configuration, Filter5),
391+
receive {amqp10_event,
392+
{link, Receiver5,
393+
{attached, #'v1_0.attach'{
394+
source = #'v1_0.source'{filter = {map, ActualFilter5}}}}}} ->
395+
?assertMatch([{{symbol,<<"rabbitmq:stream-offset-spec">>}, _}],
396+
ActualFilter5)
397+
after 30000 -> ct:fail({missing_event, ?LINE})
398+
end,
399+
{ok, R5M1} = amqp10_client:get_msg(Receiver5),
400+
{ok, R5M2} = amqp10_client:get_msg(Receiver5),
401+
{ok, R5M3} = amqp10_client:get_msg(Receiver5),
402+
{ok, R5M4} = amqp10_client:get_msg(Receiver5),
403+
ok = amqp10_client:accept_msg(Receiver5, R5M1),
404+
ok = amqp10_client:accept_msg(Receiver5, R5M2),
405+
ok = amqp10_client:accept_msg(Receiver5, R5M3),
406+
ok = amqp10_client:accept_msg(Receiver5, R5M4),
407+
?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)),
408+
?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)),
409+
?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)),
410+
?assertEqual([<<"m4">>], amqp10_msg:body(R5M4)),
411+
ok = detach_link_sync(Receiver5),
412+
380413
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
381414
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
382415
ok = end_session_sync(Session),

0 commit comments

Comments
 (0)