@@ -129,6 +129,7 @@ groups() ->
129129 modified_classic_queue ,
130130 modified_quorum_queue ,
131131 modified_dead_letter_headers_exchange ,
132+ modified_dead_letter_history ,
132133 dead_letter_headers_exchange ,
133134 dead_letter_reject ,
134135 dead_letter_reject_message_order_classic_queue ,
@@ -264,7 +265,8 @@ init_per_testcase(T, Config)
264265 end ;
265266init_per_testcase (T , Config )
266267 when T =:= modified_quorum_queue orelse
267- T =:= modified_dead_letter_headers_exchange ->
268+ T =:= modified_dead_letter_headers_exchange orelse
269+ T =:= modified_dead_letter_history ->
268270 case rpc (Config , rabbit_feature_flags , is_enabled , ['rabbitmq_4.0.0' ]) of
269271 true ->
270272 rabbit_ct_helpers :testcase_started (Config , T );
@@ -501,79 +503,127 @@ modified_quorum_queue(Config) ->
501503 ok = amqp10_client :send_msg (Sender , Msg2 ),
502504 ok = amqp10_client :detach_link (Sender ),
503505
504- {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" receiver" >>, Address , unsettled ),
506+ Receiver1Name = <<" receiver 1" >>,
507+ Receiver2Name = <<" receiver 2" >>,
508+ {ok , Receiver1 } = amqp10_client :attach_receiver_link (Session , Receiver1Name , Address , unsettled ),
509+ {ok , Receiver2 } = amqp10_client :attach_receiver_link (Session , Receiver2Name , Address , unsettled ),
505510
506- {ok , M1 } = amqp10_client :get_msg (Receiver ),
511+ {ok , M1 } = amqp10_client :get_msg (Receiver1 ),
507512 ? assertEqual ([<<" m1" >>], amqp10_msg :body (M1 )),
508513 ? assertMatch (#{delivery_count := 0 ,
509514 first_acquirer := true },
510515 amqp10_msg :headers (M1 )),
511- ok = amqp10_client :settle_msg (Receiver , M1 , {modified , false , true , #{}}),
516+ ok = amqp10_client :settle_msg (Receiver1 , M1 , {modified , false , true , #{}}),
512517
513- {ok , M2a } = amqp10_client :get_msg (Receiver ),
518+ {ok , M2a } = amqp10_client :get_msg (Receiver1 ),
514519 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2a )),
515520 ? assertMatch (#{delivery_count := 0 ,
516521 first_acquirer := true },
517522 amqp10_msg :headers (M2a )),
518- ok = amqp10_client :settle_msg (Receiver , M2a , {modified , false , false , #{}}),
523+ ok = amqp10_client :settle_msg (Receiver1 , M2a , {modified , false , false , #{}}),
519524
520- {ok , M2b } = amqp10_client :get_msg (Receiver ),
525+ {ok , M2b } = amqp10_client :get_msg (Receiver1 ),
521526 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2b )),
522527 ? assertMatch (#{delivery_count := 0 ,
523528 first_acquirer := false },
524529 amqp10_msg :headers (M2b )),
525- ok = amqp10_client :settle_msg (Receiver , M2b , {modified , true , false , #{}}),
530+ ok = amqp10_client :settle_msg (Receiver1 , M2b , {modified , true , false , #{}}),
526531
527- {ok , M2c } = amqp10_client :get_msg (Receiver ),
532+ {ok , M2c } = amqp10_client :get_msg (Receiver1 ),
528533 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2c )),
529534 ? assertMatch (#{delivery_count := 1 ,
530535 first_acquirer := false },
531536 amqp10_msg :headers (M2c )),
532- ok = amqp10_client :settle_msg (Receiver , M2c ,
533- {modified , true , false ,
534- #{<<" x-opt-key" >> => <<" val 1" >>}}),
535-
536- {ok , M2d } = amqp10_client :get_msg (Receiver ),
537+ ok = amqp10_client :settle_msg (
538+ Receiver1 , M2c ,
539+ {modified , true , false ,
540+ % % Test that a history of requeue events can be tracked as described in
541+ % % https://rabbitmq.com/blog/2024/10/11/modified-outcome
542+ #{<<" x-opt-requeued-by" >> => {array , utf8 , [{utf8 , Receiver1Name }]},
543+ <<" x-opt-requeue-reason" >> => {list , [{utf8 , <<" reason 1" >>}]},
544+ <<" x-opt-my-map" >> => {map , [
545+ {{utf8 , <<" k1" >>}, {byte , - 1 }},
546+ {{utf8 , <<" k2" >>}, {ulong , 2 }}
547+ ]}}}),
548+
549+ {ok , M2d } = amqp10_client :get_msg (Receiver2 ),
537550 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2d )),
538551 ? assertMatch (#{delivery_count := 2 ,
539552 first_acquirer := false },
540553 amqp10_msg :headers (M2d )),
541- ? assertMatch (#{<<" x-opt-key" >> := <<" val 1" >>}, amqp10_msg :message_annotations (M2d )),
542- ok = amqp10_client :settle_msg (Receiver , M2d ,
543- {modified , false , false ,
544- #{<<" x-opt-key" >> => <<" val 2" >>,
545- <<" x-other" >> => 99 }}),
546-
547- {ok , M2e } = amqp10_client :get_msg (Receiver ),
554+ #{<<" x-opt-requeued-by" >> := {array , utf8 , L0 },
555+ <<" x-opt-requeue-reason" >> := L1 ,
556+ <<" x-opt-my-map" >> := L2 } = amqp10_msg :message_annotations (M2d ),
557+ ok = amqp10_client :settle_msg (
558+ Receiver1 , M2d ,
559+ {modified , false , false ,
560+ #{<<" x-opt-requeued-by" >> => {array , utf8 , [{utf8 , Receiver2Name } | L0 ]},
561+ <<" x-opt-requeue-reason" >> => {list , [{symbol , <<" reason 2" >>} | L1 ]},
562+ <<" x-opt-my-map" >> => {map , L2 ++ [{{symbol , <<" k3" >>}, {symbol , <<" val 3" >>}}]},
563+ <<" x-other" >> => 99 }}),
564+
565+ {ok , M2e } = amqp10_client :get_msg (Receiver1 ),
548566 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2e )),
549567 ? assertMatch (#{delivery_count := 2 ,
550568 first_acquirer := false },
551569 amqp10_msg :headers (M2e )),
552- ? assertMatch (#{<<" x-opt-key" >> := <<" val 2" >>,
570+ ? assertMatch (#{<<" x-opt-requeued-by" >> := {array , utf8 , [{utf8 , Receiver2Name }, {utf8 , Receiver1Name }]},
571+ <<" x-opt-requeue-reason" >> := [{symbol , <<" reason 2" >>}, {utf8 , <<" reason 1" >>}],
572+ <<" x-opt-my-map" >> := [
573+ {{utf8 , <<" k1" >>}, {byte , - 1 }},
574+ {{utf8 , <<" k2" >>}, {ulong , 2 }},
575+ {{symbol , <<" k3" >>}, {symbol , <<" val 3" >>}}
576+ ],
553577 <<" x-other" >> := 99 }, amqp10_msg :message_annotations (M2e )),
554- ok = amqp10_client :settle_msg (Receiver , M2e , modified ),
578+ ok = amqp10_client :settle_msg (Receiver1 , M2e , modified ),
555579
556- ok = amqp10_client :detach_link (Receiver ),
557- ? assertMatch ({ok , #{message_count := 1 }},
558- rabbitmq_amqp_client :delete_queue (LinkPair , QName )),
580+ % % Test that we can consume via AMQP 0.9.1
581+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
582+ {# 'basic.get_ok' {},
583+ # amqp_msg {payload = <<" m2" >>,
584+ props = # 'P_basic' {headers = Headers }}
585+ } = amqp_channel :call (Ch , # 'basic.get' {queue = QName , no_ack = true }),
586+ % % We expect to receive only modified AMQP 1.0 message annotations that are of simple types
587+ % % (i.e. excluding list, map, array).
588+ ? assertEqual ({value , {<<" x-other" >>, long , 99 }},
589+ lists :keysearch (<<" x-other" >>, 1 , Headers )),
590+ ? assertEqual ({value , {<<" x-delivery-count" >>, long , 5 }},
591+ lists :keysearch (<<" x-delivery-count" >>, 1 , Headers )),
592+ ok = rabbit_ct_client_helpers :close_channel (Ch ),
593+
594+ ok = amqp10_client :detach_link (Receiver1 ),
595+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
559596 ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
560597 ok = end_session_sync (Session ),
561598 ok = amqp10_client :close_connection (Connection ).
562599
563600% % Test that a message can be routed based on the message-annotations
564- % % provided in the modified outcome.
601+ % % provided in the modified outcome as described in
602+ % % https://rabbitmq.com/blog/2024/10/11/modified-outcome
565603modified_dead_letter_headers_exchange (Config ) ->
566604 {Connection , Session , LinkPair } = init (Config ),
605+ HeadersXName = <<" my headers exchange" >>,
606+ AlternateXName = <<" my alternate exchange" >>,
567607 SourceQName = <<" source quorum queue" >>,
568608 AppleQName = <<" dead letter classic queue receiving apples" >>,
569609 BananaQName = <<" dead letter quorum queue receiving bananas" >>,
610+ TrashQName = <<" trash queue receiving anything that doesn't match" >>,
611+
612+ ok = rabbitmq_amqp_client :declare_exchange (
613+ LinkPair ,
614+ HeadersXName ,
615+ #{type => <<" headers" >>,
616+ arguments => #{<<" alternate-exchange" >> => {utf8 , AlternateXName }}}),
617+
618+ ok = rabbitmq_amqp_client :declare_exchange (LinkPair , AlternateXName , #{type => <<" fanout" >>}),
619+
570620 {ok , #{type := <<" quorum" >>}} = rabbitmq_amqp_client :declare_queue (
571621 LinkPair ,
572622 SourceQName ,
573623 #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>},
574624 <<" x-overflow" >> => {utf8 , <<" reject-publish" >>},
575625 <<" x-dead-letter-strategy" >> => {utf8 , <<" at-least-once" >>},
576- <<" x-dead-letter-exchange" >> => {utf8 , << " amq.headers " >> }}}),
626+ <<" x-dead-letter-exchange" >> => {utf8 , HeadersXName }}}),
577627 {ok , #{type := <<" classic" >>}} = rabbitmq_amqp_client :declare_queue (
578628 LinkPair ,
579629 AppleQName ,
@@ -582,14 +632,16 @@ modified_dead_letter_headers_exchange(Config) ->
582632 LinkPair ,
583633 BananaQName ,
584634 #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>}}}),
635+ {ok , _ } = rabbitmq_amqp_client :declare_queue (LinkPair , TrashQName , #{}),
585636 ok = rabbitmq_amqp_client :bind_queue (
586- LinkPair , AppleQName , << " amq.headers " >> , <<>>,
637+ LinkPair , AppleQName , HeadersXName , <<>>,
587638 #{<<" x-fruit" >> => {utf8 , <<" apple" >>},
588639 <<" x-match" >> => {utf8 , <<" any-with-x" >>}}),
589640 ok = rabbitmq_amqp_client :bind_queue (
590- LinkPair , BananaQName , << " amq.headers " >> , <<>>,
641+ LinkPair , BananaQName , HeadersXName , <<>>,
591642 #{<<" x-fruit" >> => {utf8 , <<" banana" >>},
592643 <<" x-match" >> => {utf8 , <<" any-with-x" >>}}),
644+ ok = rabbitmq_amqp_client :bind_queue (LinkPair , TrashQName , AlternateXName , <<>>, #{}),
593645
594646 {ok , Sender } = amqp10_client :attach_sender_link (
595647 Session , <<" test-sender" >>, rabbitmq_amqp_address :queue (SourceQName )),
@@ -600,6 +652,8 @@ modified_dead_letter_headers_exchange(Config) ->
600652 Session , <<" receiver apple" >>, rabbitmq_amqp_address :queue (AppleQName ), unsettled ),
601653 {ok , ReceiverBanana } = amqp10_client :attach_receiver_link (
602654 Session , <<" receiver banana" >>, rabbitmq_amqp_address :queue (BananaQName ), unsettled ),
655+ {ok , ReceiverTrash } = amqp10_client :attach_receiver_link (
656+ Session , <<" receiver trash" >>, rabbitmq_amqp_address :queue (TrashQName ), unsettled ),
603657
604658 ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t1" >>, <<" m1" >>)),
605659 ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t2" >>, <<" m2" >>)),
@@ -609,7 +663,8 @@ modified_dead_letter_headers_exchange(Config) ->
609663 ok = amqp10_client :send_msg (Sender , amqp10_msg :set_message_annotations (
610664 #{" x-fruit" => <<" apple" >>},
611665 amqp10_msg :new (<<" t4" >>, <<" m4" >>))),
612- ok = wait_for_accepts (3 ),
666+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t5" >>, <<" m5" >>)),
667+ ok = wait_for_accepts (5 ),
613668
614669 {ok , Msg1 } = amqp10_client :get_msg (Receiver ),
615670 ? assertMatch (#{delivery_count := 0 ,
@@ -650,13 +705,105 @@ modified_dead_letter_headers_exchange(Config) ->
650705 amqp10_msg :headers (MsgBanana2 )),
651706 ok = amqp10_client :accept_msg (ReceiverBanana , MsgBanana2 ),
652707
708+ {ok , Msg5 } = amqp10_client :get_msg (Receiver ),
709+ % % This message should be routed via the alternate exchange to the trash queue.
710+ ok = amqp10_client :settle_msg (Receiver , Msg5 , {modified , false , true , #{<<" x-fruit" >> => <<" strawberry" >>}}),
711+ {ok , MsgTrash } = amqp10_client :get_msg (ReceiverTrash ),
712+ ? assertEqual ([<<" m5" >>], amqp10_msg :body (MsgTrash )),
713+ ? assertMatch (#{delivery_count := 0 ,
714+ first_acquirer := false },
715+ amqp10_msg :headers (MsgTrash )),
716+ ok = amqp10_client :accept_msg (ReceiverTrash , MsgTrash ),
717+
653718 ok = detach_link_sync (Sender ),
654719 ok = detach_link_sync (Receiver ),
655720 ok = detach_link_sync (ReceiverApple ),
656721 ok = detach_link_sync (ReceiverBanana ),
657722 {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , SourceQName ),
658723 {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , AppleQName ),
659724 {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , BananaQName ),
725+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , TrashQName ),
726+ ok = rabbitmq_amqp_client :delete_exchange (LinkPair , HeadersXName ),
727+ ok = rabbitmq_amqp_client :delete_exchange (LinkPair , AlternateXName ),
728+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
729+ ok = end_session_sync (Session ),
730+ ok = amqp10_client :close_connection (Connection ).
731+
732+ % % Test that custom dead lettering event tracking works as described in
733+ % % https://rabbitmq.com/blog/2024/10/11/modified-outcome
734+ modified_dead_letter_history (Config ) ->
735+ {Connection , Session , LinkPair } = init (Config ),
736+ Q1 = <<" qq 1" >>,
737+ Q2 = <<" qq 2" >>,
738+
739+ {ok , _ } = rabbitmq_amqp_client :declare_queue (
740+ LinkPair , Q1 ,
741+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>},
742+ <<" x-dead-letter-strategy" >> => {utf8 , <<" at-most-once" >>},
743+ <<" x-dead-letter-exchange" >> => {utf8 , <<" amq.fanout" >>}}}),
744+ {ok , _ } = rabbitmq_amqp_client :declare_queue (
745+ LinkPair , Q2 ,
746+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>},
747+ <<" x-dead-letter-strategy" >> => {utf8 , <<" at-most-once" >>},
748+ <<" x-dead-letter-exchange" >> => {utf8 , <<>>}}}),
749+ ok = rabbitmq_amqp_client :bind_queue (LinkPair , Q2 , <<" amq.fanout" >>, <<>>, #{}),
750+
751+ {ok , Sender } = amqp10_client :attach_sender_link (
752+ Session , <<" test-sender" >>, rabbitmq_amqp_address :queue (Q1 )),
753+ wait_for_credit (Sender ),
754+ {ok , Receiver1 } = amqp10_client :attach_receiver_link (
755+ Session , <<" receiver 1" >>, rabbitmq_amqp_address :queue (Q1 ), unsettled ),
756+ {ok , Receiver2 } = amqp10_client :attach_receiver_link (
757+ Session , <<" receiver 2" >>, rabbitmq_amqp_address :queue (Q2 ), unsettled ),
758+
759+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t" >>, <<" m" >>)),
760+ ok = wait_for_accepts (1 ),
761+ ok = detach_link_sync (Sender ),
762+
763+ {ok , Msg1 } = amqp10_client :get_msg (Receiver1 ),
764+ ? assertMatch (#{delivery_count := 0 ,
765+ first_acquirer := true },
766+ amqp10_msg :headers (Msg1 )),
767+ ok = amqp10_client :settle_msg (
768+ Receiver1 , Msg1 ,
769+ {modified , true , true ,
770+ #{<<" x-opt-history-list" >> => {list , [{utf8 , <<" l1" >>}]},
771+ <<" x-opt-history-map" >> => {map , [{{symbol , <<" k1" >>}, {byte , - 1 }}]},
772+ <<" x-opt-history-array" >> => {array , utf8 , [{utf8 , <<" a1" >>}]}}
773+ }),
774+
775+ {ok , Msg2 } = amqp10_client :get_msg (Receiver2 ),
776+ ? assertMatch (#{delivery_count := 1 ,
777+ first_acquirer := false },
778+ amqp10_msg :headers (Msg2 )),
779+ #{<<" x-opt-history-list" >> := L1 ,
780+ <<" x-opt-history-map" >> := L2 ,
781+ <<" x-opt-history-array" >> := {array , utf8 , L0 }
782+ } = amqp10_msg :message_annotations (Msg2 ),
783+ ok = amqp10_client :settle_msg (
784+ Receiver2 , Msg2 ,
785+ {modified , true , true ,
786+ #{<<" x-opt-history-list" >> => {list , [{int , - 99 } | L1 ]},
787+ <<" x-opt-history-map" >> => {map , [{{symbol , <<" k2" >>}, {symbol , <<" v2" >>}} | L2 ]},
788+ <<" x-opt-history-array" >> => {array , utf8 , [{utf8 , <<" a2" >>} | L0 ]},
789+ <<" x-other" >> => - 99 }}),
790+
791+ {ok , Msg3 } = amqp10_client :get_msg (Receiver1 ),
792+ ? assertEqual ([<<" m" >>], amqp10_msg :body (Msg3 )),
793+ ? assertMatch (#{delivery_count := 2 ,
794+ first_acquirer := false },
795+ amqp10_msg :headers (Msg3 )),
796+ ? assertMatch (#{<<" x-opt-history-array" >> := {array , utf8 , [{utf8 , <<" a2" >>}, {utf8 , <<" a1" >>}]},
797+ <<" x-opt-history-list" >> := [{int , - 99 }, {utf8 , <<" l1" >>}],
798+ <<" x-opt-history-map" >> := [{{symbol , <<" k2" >>}, {symbol , <<" v2" >>}},
799+ {{symbol , <<" k1" >>}, {byte , - 1 }}],
800+ <<" x-other" >> := - 99 }, amqp10_msg :message_annotations (Msg3 )),
801+ ok = amqp10_client :accept_msg (Receiver1 , Msg3 ),
802+
803+ ok = detach_link_sync (Receiver1 ),
804+ ok = detach_link_sync (Receiver2 ),
805+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , Q1 ),
806+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , Q2 ),
660807 ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
661808 ok = end_session_sync (Session ),
662809 ok = amqp10_client :close_connection (Connection ).
0 commit comments