@@ -501,61 +501,96 @@ modified_quorum_queue(Config) ->
501501 ok = amqp10_client :send_msg (Sender , Msg2 ),
502502 ok = amqp10_client :detach_link (Sender ),
503503
504- {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" receiver" >>, Address , unsettled ),
504+ Receiver1Name = <<" receiver 1" >>,
505+ Receiver2Name = <<" receiver 2" >>,
506+ {ok , Receiver1 } = amqp10_client :attach_receiver_link (Session , Receiver1Name , Address , unsettled ),
507+ {ok , Receiver2 } = amqp10_client :attach_receiver_link (Session , Receiver2Name , Address , unsettled ),
505508
506- {ok , M1 } = amqp10_client :get_msg (Receiver ),
509+ {ok , M1 } = amqp10_client :get_msg (Receiver1 ),
507510 ? assertEqual ([<<" m1" >>], amqp10_msg :body (M1 )),
508511 ? assertMatch (#{delivery_count := 0 ,
509512 first_acquirer := true },
510513 amqp10_msg :headers (M1 )),
511- ok = amqp10_client :settle_msg (Receiver , M1 , {modified , false , true , #{}}),
514+ ok = amqp10_client :settle_msg (Receiver1 , M1 , {modified , false , true , #{}}),
512515
513- {ok , M2a } = amqp10_client :get_msg (Receiver ),
516+ {ok , M2a } = amqp10_client :get_msg (Receiver1 ),
514517 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2a )),
515518 ? assertMatch (#{delivery_count := 0 ,
516519 first_acquirer := true },
517520 amqp10_msg :headers (M2a )),
518- ok = amqp10_client :settle_msg (Receiver , M2a , {modified , false , false , #{}}),
521+ ok = amqp10_client :settle_msg (Receiver1 , M2a , {modified , false , false , #{}}),
519522
520- {ok , M2b } = amqp10_client :get_msg (Receiver ),
523+ {ok , M2b } = amqp10_client :get_msg (Receiver1 ),
521524 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2b )),
522525 ? assertMatch (#{delivery_count := 0 ,
523526 first_acquirer := false },
524527 amqp10_msg :headers (M2b )),
525- ok = amqp10_client :settle_msg (Receiver , M2b , {modified , true , false , #{}}),
528+ ok = amqp10_client :settle_msg (Receiver1 , M2b , {modified , true , false , #{}}),
526529
527- {ok , M2c } = amqp10_client :get_msg (Receiver ),
530+ {ok , M2c } = amqp10_client :get_msg (Receiver1 ),
528531 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2c )),
529532 ? assertMatch (#{delivery_count := 1 ,
530533 first_acquirer := false },
531534 amqp10_msg :headers (M2c )),
532- ok = amqp10_client :settle_msg (Receiver , M2c ,
533- {modified , true , false ,
534- #{<<" x-opt-key" >> => <<" val 1" >>}}),
535-
536- {ok , M2d } = amqp10_client :get_msg (Receiver ),
535+ ok = amqp10_client :settle_msg (
536+ Receiver1 , M2c ,
537+ {modified , true , false ,
538+ % % Test that a history of requeue events can be tracked as described in
539+ % % https://rabbitmq.com/blog/2024/10/11/modified-outcome
540+ #{<<" x-opt-requeued-by" >> => {array , utf8 , [{utf8 , Receiver1Name }]},
541+ <<" x-opt-requeue-reason" >> => {list , [{utf8 , <<" reason 1" >>}]},
542+ <<" x-opt-my-map" >> => {map , [
543+ {{utf8 , <<" k1" >>}, {byte , - 1 }},
544+ {{utf8 , <<" k2" >>}, {ulong , 2 }}
545+ ]}}}),
546+
547+ {ok , M2d } = amqp10_client :get_msg (Receiver2 ),
537548 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2d )),
538549 ? assertMatch (#{delivery_count := 2 ,
539550 first_acquirer := false },
540551 amqp10_msg :headers (M2d )),
541- ? assertMatch (#{<<" x-opt-key" >> := <<" val 1" >>}, amqp10_msg :message_annotations (M2d )),
542- ok = amqp10_client :settle_msg (Receiver , M2d ,
543- {modified , false , false ,
544- #{<<" x-opt-key" >> => <<" val 2" >>,
545- <<" x-other" >> => 99 }}),
546-
547- {ok , M2e } = amqp10_client :get_msg (Receiver ),
552+ #{<<" x-opt-requeued-by" >> := {array , utf8 , L0 },
553+ <<" x-opt-requeue-reason" >> := L1 ,
554+ <<" x-opt-my-map" >> := L2 } = amqp10_msg :message_annotations (M2d ),
555+ ok = amqp10_client :settle_msg (
556+ Receiver1 , M2d ,
557+ {modified , false , false ,
558+ #{<<" x-opt-requeued-by" >> => {array , utf8 , [{utf8 , Receiver2Name } | L0 ]},
559+ <<" x-opt-requeue-reason" >> => {list , [{symbol , <<" reason 2" >>} | L1 ]},
560+ <<" x-opt-my-map" >> => {map , L2 ++ [{{symbol , <<" k3" >>}, {symbol , <<" val 3" >>}}]},
561+ <<" x-other" >> => 99 }}),
562+
563+ {ok , M2e } = amqp10_client :get_msg (Receiver1 ),
548564 ? assertEqual ([<<" m2" >>], amqp10_msg :body (M2e )),
549565 ? assertMatch (#{delivery_count := 2 ,
550566 first_acquirer := false },
551567 amqp10_msg :headers (M2e )),
552- ? assertMatch (#{<<" x-opt-key" >> := <<" val 2" >>,
568+ ? assertMatch (#{<<" x-opt-requeued-by" >> := {array , utf8 , [{utf8 , Receiver2Name }, {utf8 , Receiver1Name }]},
569+ <<" x-opt-requeue-reason" >> := [{symbol , <<" reason 2" >>}, {utf8 , <<" reason 1" >>}],
570+ <<" x-opt-my-map" >> := [
571+ {{utf8 , <<" k1" >>}, {byte , - 1 }},
572+ {{utf8 , <<" k2" >>}, {ulong , 2 }},
573+ {{symbol , <<" k3" >>}, {symbol , <<" val 3" >>}}
574+ ],
553575 <<" x-other" >> := 99 }, amqp10_msg :message_annotations (M2e )),
554- ok = amqp10_client :settle_msg (Receiver , M2e , modified ),
576+ ok = amqp10_client :settle_msg (Receiver1 , M2e , modified ),
555577
556- ok = amqp10_client :detach_link (Receiver ),
557- ? assertMatch ({ok , #{message_count := 1 }},
558- rabbitmq_amqp_client :delete_queue (LinkPair , QName )),
578+ % % Test that we can consume via AMQP 0.9.1
579+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
580+ {# 'basic.get_ok' {},
581+ # amqp_msg {payload = <<" m2" >>,
582+ props = # 'P_basic' {headers = Headers }}
583+ } = amqp_channel :call (Ch , # 'basic.get' {queue = QName , no_ack = true }),
584+ % % We expect to receive only modified AMQP 1.0 message annotations that are of simple types
585+ % % (i.e. excluding list, map, array).
586+ ? assertEqual ({value , {<<" x-other" >>, long , 99 }},
587+ lists :keysearch (<<" x-other" >>, 1 , Headers )),
588+ ? assertEqual ({value , {<<" x-delivery-count" >>, long , 5 }},
589+ lists :keysearch (<<" x-delivery-count" >>, 1 , Headers )),
590+ ok = rabbit_ct_client_helpers :close_channel (Ch ),
591+
592+ ok = amqp10_client :detach_link (Receiver1 ),
593+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
559594 ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
560595 ok = end_session_sync (Session ),
561596 ok = amqp10_client :close_connection (Connection ).
0 commit comments