@@ -54,7 +54,8 @@ common_tests() ->
5454 target_per_message_queue ,
5555 target_per_message_unset_to_address ,
5656 target_per_message_bad_to_address ,
57- target_per_message_exchange_absent ,
57+ target_per_message_exchange_absent_settled ,
58+ target_per_message_exchange_absent_unsettled ,
5859 target_bad_address ,
5960 source_bad_address
6061 ].
@@ -393,16 +394,15 @@ target_per_message_unset_to_address(Config) ->
393394 % % Send message with 'to' unset.
394395 DTag = <<1 >>,
395396 ok = amqp10_client :send_msg (Sender , amqp10_msg :new (DTag , <<0 >>)),
396- ok = wait_for_settled ( released , DTag ),
397- receive { amqp10_event ,
398- { link , Sender ,
399- { detached ,
400- # 'v1_0.error' {
401- condition = ? V_1_0_AMQP_ERROR_PRECONDITION_FAILED ,
402- description = { utf8 , << " anonymous terminus requires 'to' address to be set " >>}} }}} -> ok
403- after 5000 -> ct :fail (" server did not close our outgoing link " )
397+ ExpectedError = # 'v1_0.error' {
398+ condition = ? V_1_0_AMQP_ERROR_PRECONDITION_FAILED ,
399+ description = { utf8 , << " anonymous terminus requires 'to' address to be set " >>}} ,
400+ ok = wait_for_settled ({ rejected , ExpectedError }, DTag ) ,
401+
402+ ok = amqp10_client : detach_link ( Sender ) ,
403+ receive { amqp10_event , { link , Sender , { detached , normal }}} -> ok
404+ after 5000 -> ct :fail ({ missing_event , ? LINE } )
404405 end ,
405-
406406 ok = amqp10_client :end_session (Session ),
407407 ok = amqp10_client :close_connection (Connection ).
408408
@@ -449,34 +449,32 @@ bad_v2_addresses() ->
449449
450450% % Test v2 target address 'null' with an invalid 'to' addresses.
451451target_per_message_bad_to_address (Config ) ->
452- lists :foreach (fun (Addr ) ->
453- ok = target_per_message_bad_to_address0 (Addr , Config )
454- end , bad_v2_addresses ()).
455-
456- target_per_message_bad_to_address0 (Address , Config ) ->
457452 OpnConf = connection_config (Config ),
458453 {ok , Connection } = amqp10_client :open_connection (OpnConf ),
459454 {ok , Session } = amqp10_client :begin_session_sync (Connection ),
460455 {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" sender" >>, null ),
461456 ok = wait_for_credit (Sender ),
462457
463- DTag = <<255 >>,
464- Msg = amqp10_msg :set_properties (#{to => Address }, amqp10_msg :new (DTag , <<0 >>)),
465- ok = amqp10_client :send_msg (Sender , Msg ),
466- ok = wait_for_settled (released , DTag ),
467- receive {amqp10_event ,
468- {link , Sender ,
469- {detached ,
470- # 'v1_0.error' {
471- condition = ? V_1_0_AMQP_ERROR_PRECONDITION_FAILED ,
472- description = {utf8 , <<" bad 'to' address" , _Rest /binary >>}}}}} -> ok
473- after 5000 -> ct :fail (" server did not close our outgoing link" )
474- end ,
458+ lists :foreach (
459+ fun (Addr ) ->
460+ DTag = <<" some delivery tag" >>,
461+ Msg = amqp10_msg :set_properties (#{to => Addr }, amqp10_msg :new (DTag , <<0 >>, false )),
462+ ok = amqp10_client :send_msg (Sender , Msg ),
463+ receive
464+ {amqp10_disposition , {{rejected , Error }, DTag }} ->
465+ ? assertMatch (# 'v1_0.error' {condition = ? V_1_0_AMQP_ERROR_PRECONDITION_FAILED ,
466+ description = {utf8 , <<" bad 'to' address" , _Rest /binary >>}},
467+ Error )
468+ after 5000 ->
469+ flush (missing_disposition ),
470+ ct :fail (missing_disposition )
471+ end
472+ end , bad_v2_addresses ()),
475473
476474 ok = amqp10_client :end_session (Session ),
477475 ok = amqp10_client :close_connection (Connection ).
478476
479- target_per_message_exchange_absent (Config ) ->
477+ target_per_message_exchange_absent_settled (Config ) ->
480478 Init = {_ , LinkPair = # link_pair {session = Session }} = init (Config ),
481479 XName = <<" 🎈" /utf8 >>,
482480 Address = rabbitmq_amqp_address :exchange (XName ),
@@ -492,20 +490,59 @@ target_per_message_exchange_absent(Config) ->
492490 ok = rabbitmq_amqp_client :delete_exchange (LinkPair , XName ),
493491
494492 DTag2 = <<2 >>,
495- Msg2 = amqp10_msg :set_properties (#{to => Address }, amqp10_msg :new (DTag2 , <<" m2" >>)),
493+ Msg2 = amqp10_msg :set_properties (#{to => Address }, amqp10_msg :new (DTag2 , <<" m2" >>, true )),
496494 ok = amqp10_client :send_msg (Sender , Msg2 ),
497- ok = wait_for_settled (released , DTag2 ),
495+
496+ % % "the routing node MUST detach the link over which the message was sent with an error.
497+ % % [...] Additionally the info field of error MUST contain an entry with symbolic key delivery-tag
498+ % % and binary value of the delivery-tag of the message which caused the failure."
499+ % % https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
498500 receive {amqp10_event , {link , Sender , {detached , Error }}} ->
499501 ? assertEqual (
500502 # 'v1_0.error' {
501503 condition = ? V_1_0_AMQP_ERROR_NOT_FOUND ,
502- description = {utf8 , <<" no exchange '" , XName /binary , " ' in vhost '/'" >>}},
504+ description = {utf8 , <<" no exchange '" , XName /binary , " ' in vhost '/'" >>},
505+ info = {map , [{{symbol , <<" delivery-tag" >>}, {binary , DTag2 }}]}
506+ },
503507 Error )
504508 after 5000 -> ct :fail (" server did not close our outgoing link" )
505509 end ,
506510
507511 ok = cleanup (Init ).
508512
513+ target_per_message_exchange_absent_unsettled (Config ) ->
514+ Init = {_ , LinkPair = # link_pair {session = Session }} = init (Config ),
515+ XName = <<" 🎈" /utf8 >>,
516+ Address = rabbitmq_amqp_address :exchange (XName ),
517+ ok = rabbitmq_amqp_client :declare_exchange (LinkPair , XName , #{}),
518+ {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" sender" >>, null ),
519+ ok = wait_for_credit (Sender ),
520+
521+ DTag1 = <<" my tag" >>,
522+ Msg1 = amqp10_msg :set_properties (#{to => Address }, amqp10_msg :new (DTag1 , <<" hey" >>)),
523+ ok = amqp10_client :send_msg (Sender , Msg1 ),
524+ ok = wait_for_settled (released , DTag1 ),
525+
526+ ok = rabbitmq_amqp_client :delete_exchange (LinkPair , XName ),
527+
528+ % % "If the source of the link supports the rejected outcome, and the message has not
529+ % % already been settled by the sender, then the routing node MUST reject the message.
530+ % % In this case the error field of rejected MUST contain the error which would have been communicated
531+ % % in the detach which would have be sent if a link to the same address had been attempted."
532+ % % https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
533+ % % We test here multiple rejections implicilty checking that link flow control works correctly.
534+ ExpectedError = # 'v1_0.error' {
535+ condition = ? V_1_0_AMQP_ERROR_NOT_FOUND ,
536+ description = {utf8 , <<" no exchange '" , XName /binary , " ' in vhost '/'" >>}},
537+ [begin
538+ DTag = Body = integer_to_binary (N ),
539+ Msg = amqp10_msg :set_properties (#{to => Address }, amqp10_msg :new (DTag , Body , false )),
540+ ok = amqp10_client :send_msg (Sender , Msg ),
541+ ok = wait_for_settled ({rejected , ExpectedError }, DTag )
542+ end || N <- lists :seq (1 , 300 )],
543+
544+ ok = cleanup (Init ).
545+
509546target_bad_address (Config ) ->
510547 % % bad v1 and bad v2 target address
511548 TargetAddr = <<" /qqq/🎈" /utf8 >>,
0 commit comments