diff --git a/deps/rabbit/src/rabbit_amqp_filtex.erl b/deps/rabbit/src/rabbit_amqp_filtex.erl index a1e346d30fa1..327457125822 100644 --- a/deps/rabbit/src/rabbit_amqp_filtex.erl +++ b/deps/rabbit/src/rabbit_amqp_filtex.erl @@ -13,18 +13,24 @@ -export([validate/1, filter/2]). +%% "Impose a limit on the complexity of each filter expression." +%% [filtex-v1.0-wd09 7.1] +-define(MAX_FILTER_FIELDS, 16). + -type simple_type() :: number() | binary() | atom(). -type affix() :: {suffix, non_neg_integer(), binary()} | {prefix, non_neg_integer(), binary()}. -type filter_expression_value() :: simple_type() | affix(). --type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}]} | - {application_properties, [{binary(), filter_expression_value()}]}. +-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}, ...]} | + {application_properties, [{binary(), filter_expression_value()}, ...]}. -type filter_expressions() :: [filter_expression()]. -export_type([filter_expressions/0]). -spec validate(tuple()) -> {ok, filter_expression()} | error. -validate({described, Descriptor, {map, KVList}}) -> +validate({described, Descriptor, {map, KVList}}) + when KVList =/= [] andalso + length(KVList) =< ?MAX_FILTER_FIELDS -> try validate0(Descriptor, KVList) catch throw:{?MODULE, _, _} -> error @@ -108,14 +114,12 @@ match_simple_type(RefVal, Val) -> RefVal == Val. validate0(Descriptor, KVList) when - (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse - Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER}) andalso - KVList =/= [] -> + Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse + Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER} -> validate_props(KVList, []); validate0(Descriptor, KVList) when - (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse - Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso - KVList =/= [] -> + Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse + Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER} -> validate_app_props(KVList, []); validate0(_, _) -> error. diff --git a/deps/rabbit/test/amqp_filtex_SUITE.erl b/deps/rabbit/test/amqp_filtex_SUITE.erl index 75f8528da9ca..2d4f34bd1883 100644 --- a/deps/rabbit/test/amqp_filtex_SUITE.erl +++ b/deps/rabbit/test/amqp_filtex_SUITE.erl @@ -377,6 +377,39 @@ application_properties_section(Config) -> ?assertEqual([<<"m4">>], amqp10_msg:body(R4M4)), ok = detach_link_sync(Receiver4), + %% Complex filter (too many properties to filter on) should fail validation in the server. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + AppPropsFilter5 = [{{utf8, integer_to_binary(N)}, {uint, 1}} || + N <- lists:seq(1, 17)], + Filter5 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter5}}, + {ok, Receiver5} = amqp10_client:attach_receiver_link( + Session, <<"receiver 5">>, Address, + unsettled, configuration, Filter5), + receive {amqp10_event, + {link, Receiver5, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter5}}}}}} -> + ?assertMatch([{{symbol,<<"rabbitmq:stream-offset-spec">>}, _}], + ActualFilter5) + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, R5M1} = amqp10_client:get_msg(Receiver5), + {ok, R5M2} = amqp10_client:get_msg(Receiver5), + {ok, R5M3} = amqp10_client:get_msg(Receiver5), + {ok, R5M4} = amqp10_client:get_msg(Receiver5), + ok = amqp10_client:accept_msg(Receiver5, R5M1), + ok = amqp10_client:accept_msg(Receiver5, R5M2), + ok = amqp10_client:accept_msg(Receiver5, R5M3), + ok = amqp10_client:accept_msg(Receiver5, R5M4), + ?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R5M4)), + ok = detach_link_sync(Receiver5), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session),