@@ -118,6 +118,7 @@ groups() ->
118118 modified_classic_queue ,
119119 modified_quorum_queue ,
120120 modified_dead_letter_headers_exchange ,
121+ modified_dead_letter_history ,
121122 dead_letter_headers_exchange ,
122123 dead_letter_reject ,
123124 dead_letter_reject_message_order_classic_queue ,
@@ -253,7 +254,8 @@ init_per_testcase(T, Config)
253254 end ;
254255init_per_testcase (T , Config )
255256 when T =:= modified_quorum_queue orelse
256- T =:= modified_dead_letter_headers_exchange ->
257+ T =:= modified_dead_letter_headers_exchange orelse
258+ T =:= modified_dead_letter_history ->
257259 case rpc (Config , rabbit_feature_flags , is_enabled , ['rabbitmq_4.0.0' ]) of
258260 true ->
259261 rabbit_ct_helpers :testcase_started (Config , T );
@@ -716,6 +718,85 @@ modified_dead_letter_headers_exchange(Config) ->
716718 ok = end_session_sync (Session ),
717719 ok = amqp10_client :close_connection (Connection ).
718720
721+ % % Test that custom dead lettering event tracking works as described in
722+ % % https://rabbitmq.com/blog/2024/10/11/modified-outcome
723+ modified_dead_letter_history (Config ) ->
724+ {Connection , Session , LinkPair } = init (Config ),
725+ Q1 = <<" qq 1" >>,
726+ Q2 = <<" qq 2" >>,
727+
728+ {ok , _ } = rabbitmq_amqp_client :declare_queue (
729+ LinkPair , Q1 ,
730+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>},
731+ <<" x-dead-letter-strategy" >> => {utf8 , <<" at-most-once" >>},
732+ <<" x-dead-letter-exchange" >> => {utf8 , <<" amq.fanout" >>}}}),
733+ {ok , _ } = rabbitmq_amqp_client :declare_queue (
734+ LinkPair , Q2 ,
735+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>},
736+ <<" x-dead-letter-strategy" >> => {utf8 , <<" at-most-once" >>},
737+ <<" x-dead-letter-exchange" >> => {utf8 , <<>>}}}),
738+ ok = rabbitmq_amqp_client :bind_queue (LinkPair , Q2 , <<" amq.fanout" >>, <<>>, #{}),
739+
740+ {ok , Sender } = amqp10_client :attach_sender_link (
741+ Session , <<" test-sender" >>, rabbitmq_amqp_address :queue (Q1 )),
742+ wait_for_credit (Sender ),
743+ {ok , Receiver1 } = amqp10_client :attach_receiver_link (
744+ Session , <<" receiver 1" >>, rabbitmq_amqp_address :queue (Q1 ), unsettled ),
745+ {ok , Receiver2 } = amqp10_client :attach_receiver_link (
746+ Session , <<" receiver 2" >>, rabbitmq_amqp_address :queue (Q2 ), unsettled ),
747+
748+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t" >>, <<" m" >>)),
749+ ok = wait_for_accepts (1 ),
750+ ok = detach_link_sync (Sender ),
751+
752+ {ok , Msg1 } = amqp10_client :get_msg (Receiver1 ),
753+ ? assertMatch (#{delivery_count := 0 ,
754+ first_acquirer := true },
755+ amqp10_msg :headers (Msg1 )),
756+ ok = amqp10_client :settle_msg (
757+ Receiver1 , Msg1 ,
758+ {modified , true , true ,
759+ #{<<" x-opt-history-list" >> => {list , [{utf8 , <<" l1" >>}]},
760+ <<" x-opt-history-map" >> => {map , [{{symbol , <<" k1" >>}, {byte , - 1 }}]},
761+ <<" x-opt-history-array" >> => {array , utf8 , [{utf8 , <<" a1" >>}]}}
762+ }),
763+
764+ {ok , Msg2 } = amqp10_client :get_msg (Receiver2 ),
765+ ? assertMatch (#{delivery_count := 1 ,
766+ first_acquirer := false },
767+ amqp10_msg :headers (Msg2 )),
768+ #{<<" x-opt-history-list" >> := L1 ,
769+ <<" x-opt-history-map" >> := L2 ,
770+ <<" x-opt-history-array" >> := {array , utf8 , L0 }
771+ } = amqp10_msg :message_annotations (Msg2 ),
772+ ok = amqp10_client :settle_msg (
773+ Receiver2 , Msg2 ,
774+ {modified , true , true ,
775+ #{<<" x-opt-history-list" >> => {list , [{int , - 99 } | L1 ]},
776+ <<" x-opt-history-map" >> => {map , [{{symbol , <<" k2" >>}, {symbol , <<" v2" >>}} | L2 ]},
777+ <<" x-opt-history-array" >> => {array , utf8 , [{utf8 , <<" a2" >>} | L0 ]},
778+ <<" x-other" >> => 99 }}),
779+
780+ {ok , Msg3 } = amqp10_client :get_msg (Receiver1 ),
781+ ? assertEqual ([<<" m" >>], amqp10_msg :body (Msg3 )),
782+ ? assertMatch (#{delivery_count := 2 ,
783+ first_acquirer := false },
784+ amqp10_msg :headers (Msg3 )),
785+ ? assertMatch (#{<<" x-opt-history-array" >> := {array , utf8 , [{utf8 , <<" a2" >>}, {utf8 , <<" a1" >>}]},
786+ <<" x-opt-history-list" >> := [{int , - 99 }, {utf8 , <<" l1" >>}],
787+ <<" x-opt-history-map" >> := [{{symbol , <<" k2" >>}, {symbol , <<" v2" >>}},
788+ {{symbol , <<" k1" >>}, {byte , - 1 }}],
789+ <<" x-other" >> := 99 }, amqp10_msg :message_annotations (Msg3 )),
790+ ok = amqp10_client :accept_msg (Receiver1 , Msg3 ),
791+
792+ ok = detach_link_sync (Receiver1 ),
793+ ok = detach_link_sync (Receiver2 ),
794+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , Q1 ),
795+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , Q2 ),
796+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
797+ ok = end_session_sync (Session ),
798+ ok = amqp10_client :close_connection (Connection ).
799+
719800% % Tests that confirmations are returned correctly
720801% % when sending many messages async to a quorum queue.
721802sender_settle_mode_unsettled (Config ) ->
0 commit comments