1111
1212-include_lib (" eunit/include/eunit.hrl" ).
1313-include_lib (" amqp10_common/include/amqp10_filtex.hrl" ).
14+ -include_lib (" amqp10_common/include/amqp10_framing.hrl" ).
1415
1516-compile ([nowarn_export_all ,
1617 export_all ]).
2122 [eventually /1 ]).
2223-import (amqp_utils ,
2324 [init /1 ,
25+ connection_config /1 ,
2426 flush /1 ,
2527 wait_for_credit /1 ,
2628 wait_for_accepts /1 ,
@@ -85,7 +87,12 @@ end_per_testcase(Testcase, Config) ->
8587properties_section (Config ) ->
8688 Stream = atom_to_binary (? FUNCTION_NAME ),
8789 Address = rabbitmq_amqp_address :queue (Stream ),
88- {Connection , Session , LinkPair } = init (Config ),
90+
91+ OpnConf0 = connection_config (Config ),
92+ OpnConf = OpnConf0 #{notify_with_performative => true },
93+ {ok , Connection } = amqp10_client :open_connection (OpnConf ),
94+ {ok , Session } = amqp10_client :begin_session_sync (Connection ),
95+ {ok , LinkPair } = rabbitmq_amqp_client :attach_management_link_pair_sync (Session , <<" my link pair" >>),
8996 {ok , #{}} = rabbitmq_amqp_client :declare_queue (
9097 LinkPair ,
9198 Stream ,
@@ -189,6 +196,14 @@ properties_section(Config) ->
189196 {ok , Receiver4 } = amqp10_client :attach_receiver_link (
190197 Session , <<" receiver 4" >>, Address ,
191198 unsettled , configuration , Filter4 ),
199+ receive {amqp10_event ,
200+ {link , Receiver4 ,
201+ {attached , # 'v1_0.attach' {
202+ source = # 'v1_0.source' {filter = {map , ActualFilter }}}}}} ->
203+ ? assertMatch ([{{symbol ,<<" rabbitmq:stream-offset-spec" >>}, _ }],
204+ ActualFilter )
205+ after 5000 -> ct :fail ({missing_event , ? LINE })
206+ end ,
192207 {ok , R4M1 } = amqp10_client :get_msg (Receiver4 ),
193208 {ok , R4M2 } = amqp10_client :get_msg (Receiver4 ),
194209 {ok , R4M3 } = amqp10_client :get_msg (Receiver4 ),
@@ -208,7 +223,11 @@ properties_section(Config) ->
208223application_properties_section (Config ) ->
209224 Stream = atom_to_binary (? FUNCTION_NAME ),
210225 Address = rabbitmq_amqp_address :queue (Stream ),
211- {Connection , Session , LinkPair } = init (Config ),
226+ OpnConf0 = connection_config (Config ),
227+ OpnConf = OpnConf0 #{notify_with_performative => true },
228+ {ok , Connection } = amqp10_client :open_connection (OpnConf ),
229+ {ok , Session } = amqp10_client :begin_session_sync (Connection ),
230+ {ok , LinkPair } = rabbitmq_amqp_client :attach_management_link_pair_sync (Session , <<" my link pair" >>),
212231 {ok , #{}} = rabbitmq_amqp_client :declare_queue (
213232 LinkPair ,
214233 Stream ,
@@ -264,6 +283,20 @@ application_properties_section(Config) ->
264283 {ok , Receiver1 } = amqp10_client :attach_receiver_link (
265284 Session , <<" receiver 1" >>, Address ,
266285 settled , configuration , Filter1 ),
286+ receive {amqp10_event ,
287+ {link , Receiver1 ,
288+ {attached , # 'v1_0.attach' {
289+ source = # 'v1_0.source' {filter = {map , ActualFilter1 }}}}}} ->
290+ ? assertMatch (
291+ {described , _Type , {map , [
292+ {{utf8 , <<" k1" >>}, {int , - 2 }},
293+ {{utf8 , <<" k5" >>}, {symbol , <<" hey" >>}},
294+ {{utf8 , <<" k4" >>}, true },
295+ {{utf8 , <<" k3" >>}, false }
296+ ]}},
297+ proplists :get_value ({symbol , ? DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER }, ActualFilter1 ))
298+ after 5000 -> ct :fail ({missing_event , ? LINE })
299+ end ,
267300 ok = amqp10_client :flow_link_credit (Receiver1 , 10 , never ),
268301 receive {amqp10_msg , Receiver1 , R1M1 } ->
269302 ? assertEqual ([<<" m1" >>], amqp10_msg :body (R1M1 ))
@@ -306,6 +339,38 @@ application_properties_section(Config) ->
306339 ? assertEqual ([<<" m4" >>], amqp10_msg :body (R3M3 )),
307340 ok = detach_link_sync (Receiver3 ),
308341
342+ % % Wrong type should fail validation in the server.
343+ % % RabbitMQ should exclude this filter in its reply attach frame because
344+ % % "the sending endpoint [RabbitMQ] sets the filter actually in place".
345+ % % Hence, no filter expression is actually in place and we should receive all messages.
346+ AppPropsFilter4 = [{{symbol , <<" k2" >>}, {uint , 10 }}],
347+ Filter4 = #{<<" rabbitmq:stream-offset-spec" >> => <<" first" >>,
348+ ? DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map , AppPropsFilter4 }},
349+ {ok , Receiver4 } = amqp10_client :attach_receiver_link (
350+ Session , <<" receiver 4" >>, Address ,
351+ unsettled , configuration , Filter4 ),
352+ receive {amqp10_event ,
353+ {link , Receiver4 ,
354+ {attached , # 'v1_0.attach' {
355+ source = # 'v1_0.source' {filter = {map , ActualFilter4 }}}}}} ->
356+ ? assertMatch ([{{symbol ,<<" rabbitmq:stream-offset-spec" >>}, _ }],
357+ ActualFilter4 )
358+ after 5000 -> ct :fail ({missing_event , ? LINE })
359+ end ,
360+ {ok , R4M1 } = amqp10_client :get_msg (Receiver4 ),
361+ {ok , R4M2 } = amqp10_client :get_msg (Receiver4 ),
362+ {ok , R4M3 } = amqp10_client :get_msg (Receiver4 ),
363+ {ok , R4M4 } = amqp10_client :get_msg (Receiver4 ),
364+ ok = amqp10_client :accept_msg (Receiver4 , R4M1 ),
365+ ok = amqp10_client :accept_msg (Receiver4 , R4M2 ),
366+ ok = amqp10_client :accept_msg (Receiver4 , R4M3 ),
367+ ok = amqp10_client :accept_msg (Receiver4 , R4M4 ),
368+ ? assertEqual ([<<" m1" >>], amqp10_msg :body (R4M1 )),
369+ ? assertEqual ([<<" m2" >>], amqp10_msg :body (R4M2 )),
370+ ? assertEqual ([<<" m3" >>], amqp10_msg :body (R4M3 )),
371+ ? assertEqual ([<<" m4" >>], amqp10_msg :body (R4M4 )),
372+ ok = detach_link_sync (Receiver4 ),
373+
309374 {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , Stream ),
310375 ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
311376 ok = end_session_sync (Session ),
0 commit comments