@@ -58,6 +58,9 @@ groups() ->
5858 sender_settle_mode_unsettled ,
5959 sender_settle_mode_unsettled_fanout ,
6060 sender_settle_mode_mixed ,
61+ durable_field_classic_queue ,
62+ durable_field_quorum_queue ,
63+ durable_field_stream ,
6164 invalid_transfer_settled_flag ,
6265 quorum_queue_rejects ,
6366 receiver_settle_mode_first ,
@@ -916,6 +919,77 @@ sender_settle_mode_mixed(Config) ->
916919 rabbitmq_amqp_client :delete_queue (LinkPair , QName )),
917920 ok = close (Init ).
918921
922+ durable_field_classic_queue (Config ) ->
923+ QName = atom_to_binary (? FUNCTION_NAME ),
924+ durable_field (Config , <<" classic" >>, QName ).
925+
926+ durable_field_quorum_queue (Config ) ->
927+ QName = atom_to_binary (? FUNCTION_NAME ),
928+ durable_field (Config , <<" quorum" >>, QName ).
929+
930+ durable_field_stream (Config ) ->
931+ QName = atom_to_binary (? FUNCTION_NAME ),
932+ durable_field (Config , <<" stream" >>, QName ).
933+
934+ durable_field (Config , QueueType , QName )
935+ when is_binary (QueueType ) ->
936+ Address = rabbitmq_amqp_address :queue (QName ),
937+ {_Connection , Session , LinkPair } = Init = init (Config ),
938+ QProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QueueType }}},
939+ {ok , #{type := QueueType }} = rabbitmq_amqp_client :declare_queue (LinkPair , QName , QProps ),
940+ {ok , Sender } = amqp10_client :attach_sender_link (
941+ Session , <<" test-sender" >>, Address , unsettled ),
942+ wait_for_credit (Sender ),
943+
944+ ok = amqp10_client :send_msg (Sender ,
945+ amqp10_msg :set_headers (
946+ #{durable => true },
947+ amqp10_msg :new (<<" t1" >>, <<" durable" >>))),
948+ ok = amqp10_client :send_msg (Sender ,
949+ amqp10_msg :set_headers (
950+ #{durable => false },
951+ amqp10_msg :new (<<" t2" >>, <<" non-durable" >>))),
952+ % % Even though the AMQP spec defines durable=false as default
953+ % % (i.e. durable is false if the field is omitted on the wire),
954+ % % we expect our AMQP Erlang library to be safe by default,
955+ % % and therefore send the message as durable=true on behalf of us.
956+ ok = amqp10_client :send_msg (
957+ Sender , amqp10_msg :new (<<" t3" >>, <<" lib publishes as durable by default" >>)),
958+ % % When we expliclitly publish without a header section, RabbitMQ should interpret
959+ % % durable as false according to the AMQP spec.
960+ ok = amqp10_client :send_msg (
961+ Sender ,
962+ amqp10_msg :from_amqp_records (
963+ [# 'v1_0.transfer' {delivery_tag = {binary , <<" t4" >>},
964+ settled = false ,
965+ message_format = {uint , 0 }},
966+ # 'v1_0.data' {content = <<" publish without header section" >>}])),
967+
968+ ok = wait_for_accepts (4 ),
969+ ok = detach_link_sync (Sender ),
970+ flush (sent ),
971+
972+ Filter = consume_from_first (QueueType ),
973+ {ok , Receiver } = amqp10_client :attach_receiver_link (
974+ Session , <<" test-receiver" >>, Address , unsettled ,
975+ none , Filter ),
976+
977+ ok = amqp10_client :flow_link_credit (Receiver , 4 , never ),
978+ [M1 , M2 , M3 , M4 ] = Msgs = receive_messages (Receiver , 4 ),
979+ ? assertEqual (<<" durable" >>, amqp10_msg :body_bin (M1 )),
980+ ? assertMatch (#{durable := true }, amqp10_msg :headers (M1 )),
981+ ? assertEqual (<<" non-durable" >>, amqp10_msg :body_bin (M2 )),
982+ ? assertMatch (#{durable := false }, amqp10_msg :headers (M2 )),
983+ ? assertEqual (<<" lib publishes as durable by default" >>, amqp10_msg :body_bin (M3 )),
984+ ? assertMatch (#{durable := true }, amqp10_msg :headers (M3 )),
985+ ? assertEqual (<<" publish without header section" >>, amqp10_msg :body_bin (M4 )),
986+ ? assertMatch (#{durable := false }, amqp10_msg :headers (M4 )),
987+ [ok = amqp10_client :accept_msg (Receiver , M ) || M <- Msgs ],
988+
989+ ok = detach_link_sync (Receiver ),
990+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
991+ close (Init ).
992+
919993invalid_transfer_settled_flag (Config ) ->
920994 OpnConf = connection_config (Config ),
921995 {ok , Connection } = amqp10_client :open_connection (OpnConf ),
@@ -1301,7 +1375,7 @@ amqp_amqpl(QType, Config) ->
13011375 Body6 = [# 'v1_0.data' {content = <<0 , 1 >>},
13021376 # 'v1_0.data' {content = <<2 , 3 >>}],
13031377
1304- % % Send only body sections
1378+ % % Send only header and body sections
13051379 [ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<>>, Body , true )) ||
13061380 Body <- [Body1 , Body2 , Body3 , Body4 , Body5 , Body6 ]],
13071381 % % Send with application-properties
@@ -1342,6 +1416,11 @@ amqp_amqpl(QType, Config) ->
13421416 #{<<" x-array" >> => {array , utf8 , [{utf8 , <<" e1" >>},
13431417 {utf8 , <<" e2" >>}]}},
13441418 amqp10_msg :new (<<>>, Body1 , true ))),
1419+ ok = amqp10_client :send_msg (
1420+ Sender ,
1421+ amqp10_msg :set_headers (
1422+ #{durable => false },
1423+ amqp10_msg :new (<<>>, Body1 , true ))),
13451424
13461425 ok = amqp10_client :detach_link (Sender ),
13471426 flush (detached ),
@@ -1365,8 +1444,10 @@ amqp_amqpl(QType, Config) ->
13651444 receive {# 'basic.deliver' {consumer_tag = CTag ,
13661445 redelivered = false },
13671446 # amqp_msg {payload = Payload1 ,
1368- props = # 'P_basic' {type = <<" amqp-1.0" >>}}} ->
1369- ? assertEqual ([Body1 ], amqp10_framing :decode_bin (Payload1 ))
1447+ props = # 'P_basic' {delivery_mode = DelMode2 ,
1448+ type = <<" amqp-1.0" >>}}} ->
1449+ ? assertEqual ([Body1 ], amqp10_framing :decode_bin (Payload1 )),
1450+ ? assertEqual (2 , DelMode2 )
13701451 after 30000 -> ct :fail ({missing_deliver , ? LINE })
13711452 end ,
13721453 receive {_ , # amqp_msg {payload = Payload2 ,
@@ -1428,6 +1509,12 @@ amqp_amqpl(QType, Config) ->
14281509 rabbit_misc :table_lookup (Headers11 , <<" x-array" >>))
14291510 after 30000 -> ct :fail ({missing_deliver , ? LINE })
14301511 end ,
1512+ receive {_ , # amqp_msg {payload = Payload12 ,
1513+ props = # 'P_basic' {delivery_mode = DelMode1 }}} ->
1514+ ? assertEqual ([Body1 ], amqp10_framing :decode_bin (Payload12 )),
1515+ ? assertNotEqual (2 , DelMode1 )
1516+ after 30000 -> ct :fail ({missing_deliver , ? LINE })
1517+ end ,
14311518
14321519 ok = rabbit_ct_client_helpers :close_connection_and_channel (Conn , Ch ),
14331520 {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
@@ -1514,10 +1601,17 @@ amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) ->
15141601 amqp_channel :cast (
15151602 Ch ,
15161603 # 'basic.publish' {routing_key = QName },
1517- # amqp_msg {props = # 'P_basic' {headers = Amqp091Headers },
1604+ # amqp_msg {props = # 'P_basic' {delivery_mode = 2 ,
1605+ priority = 5 ,
1606+ headers = Amqp091Headers },
15181607 payload = <<" foobar" >>}),
15191608
15201609 {ok , [Msg ]} = drain_queue (Session , Address , 1 ),
1610+
1611+ ? assertMatch (#{durable := true ,
1612+ priority := 5 },
1613+ amqp10_msg :headers (Msg )),
1614+
15211615 Amqp10MA = amqp10_msg :message_annotations (Msg ),
15221616 ? assertEqual (<<" my-string" >>, maps :get (<<" x-string" >>, Amqp10MA , undefined )),
15231617 ? assertEqual (92 , maps :get (<<" x-int" >>, Amqp10MA , undefined )),
@@ -3278,7 +3372,7 @@ max_message_size_client_to_server(Config) ->
32783372 {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" sender" >>, Address , mixed ),
32793373 ok = wait_for_credit (Sender ),
32803374
3281- PayloadSmallEnough = binary :copy (<<0 >>, MaxMessageSize - 10 ),
3375+ PayloadSmallEnough = binary :copy (<<0 >>, MaxMessageSize - 20 ),
32823376 ? assertEqual (ok ,
32833377 amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t1" >>, PayloadSmallEnough , false ))),
32843378 ok = wait_for_accepted (<<" t1" >>),
0 commit comments