@@ -58,6 +58,9 @@ groups() ->
5858 sender_settle_mode_unsettled ,
5959 sender_settle_mode_unsettled_fanout ,
6060 sender_settle_mode_mixed ,
61+ durable_field_classic_queue ,
62+ durable_field_quorum_queue ,
63+ durable_field_stream ,
6164 invalid_transfer_settled_flag ,
6265 quorum_queue_rejects ,
6366 receiver_settle_mode_first ,
@@ -916,6 +919,71 @@ sender_settle_mode_mixed(Config) ->
916919 rabbitmq_amqp_client :delete_queue (LinkPair , QName )),
917920 ok = close (Init ).
918921
922+ durable_field_classic_queue (Config ) ->
923+ QName = atom_to_binary (? FUNCTION_NAME ),
924+ durable_field (Config , <<" classic" >>, QName ).
925+
926+ durable_field_quorum_queue (Config ) ->
927+ QName = atom_to_binary (? FUNCTION_NAME ),
928+ durable_field (Config , <<" quorum" >>, QName ).
929+
930+ durable_field_stream (Config ) ->
931+ QName = atom_to_binary (? FUNCTION_NAME ),
932+ durable_field (Config , <<" stream" >>, QName ).
933+
934+ durable_field (Config , QueueType , QName )
935+ when is_binary (QueueType ) ->
936+ Address = rabbitmq_amqp_address :queue (QName ),
937+ {_Connection , Session , LinkPair } = Init = init (Config ),
938+ QProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QueueType }}},
939+ {ok , #{type := QueueType }} = rabbitmq_amqp_client :declare_queue (LinkPair , QName , QProps ),
940+ {ok , Sender } = amqp10_client :attach_sender_link (
941+ Session , <<" test-sender" >>, Address , unsettled ),
942+ wait_for_credit (Sender ),
943+
944+ ok = amqp10_client :send_msg (
945+ Sender ,
946+ amqp10_msg :set_headers (
947+ #{durable => true },
948+ amqp10_msg :new (<<" t1" >>, <<" durable" >>))),
949+ ok = amqp10_client :send_msg (
950+ Sender ,
951+ amqp10_msg :set_headers (
952+ #{durable => false },
953+ amqp10_msg :new (<<" t2" >>, <<" non-durable" >>))),
954+ % % Even though the AMQP spec defines durable=false as default
955+ % % (i.e. durable is false if the field is omitted on the wire),
956+ % % we expect our AMQP Erlang library to be safe by default,
957+ % % and therefore send the message as durable=true on behalf of us.
958+ ok = amqp10_client :send_msg (
959+ Sender , amqp10_msg :new (<<" t3" >>, <<" lib sends as durable by default" >>)),
960+
961+ ok = wait_for_accepts (3 ),
962+ ok = detach_link_sync (Sender ),
963+ flush (sent ),
964+
965+ Filter = consume_from_first (QueueType ),
966+ {ok , Receiver } = amqp10_client :attach_receiver_link (
967+ Session , <<" test-receiver" >>, Address , unsettled ,
968+ none , Filter ),
969+
970+ ok = amqp10_client :flow_link_credit (Receiver , 3 , never ),
971+ [M1 , M2 , M3 ] = receive_messages (Receiver , 3 ),
972+ ? assertEqual (<<" durable" >>, amqp10_msg :body_bin (M1 )),
973+ ? assertMatch (#{durable := true }, amqp10_msg :headers (M1 )),
974+ ? assertEqual (<<" non-durable" >>, amqp10_msg :body_bin (M2 )),
975+ ? assertMatch (#{durable := false }, amqp10_msg :headers (M2 )),
976+ ? assertEqual (<<" lib sends as durable by default" >>, amqp10_msg :body_bin (M3 )),
977+ ? assertMatch (#{durable := true }, amqp10_msg :headers (M3 )),
978+
979+ ok = amqp10_client :accept_msg (Receiver , M1 ),
980+ ok = amqp10_client :accept_msg (Receiver , M2 ),
981+ ok = amqp10_client :accept_msg (Receiver , M3 ),
982+
983+ ok = detach_link_sync (Receiver ),
984+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
985+ close (Init ).
986+
919987invalid_transfer_settled_flag (Config ) ->
920988 OpnConf = connection_config (Config ),
921989 {ok , Connection } = amqp10_client :open_connection (OpnConf ),
0 commit comments