@@ -52,7 +52,8 @@ groups() ->
5252 {mock , [], [
5353 insufficient_credit ,
5454 incoming_heartbeat ,
55- multi_transfer_without_delivery_id
55+ multi_transfer_without_delivery_id ,
56+ set_sender_capabilities
5657 ]}
5758 ].
5859
@@ -68,7 +69,7 @@ shared() ->
6869 subscribe_with_auto_flow_unsettled ,
6970 outgoing_heartbeat ,
7071 roundtrip_large_messages ,
71- transfer_id_vs_delivery_id
72+ transfer_id_vs_delivery_id
7273 ].
7374
7475% % -------------------------------------------------------------------
@@ -687,6 +688,7 @@ subscribe_with_auto_flow_unsettled(Config) ->
687688 ok = amqp10_client :end_session (Session ),
688689 ok = amqp10_client :close_connection (Connection ).
689690
691+
690692insufficient_credit (Config ) ->
691693 Hostname = ? config (mock_host , Config ),
692694 Port = ? config (mock_port , Config ),
@@ -790,6 +792,76 @@ multi_transfer_without_delivery_id(Config) ->
790792 ok = amqp10_client :close_connection (Connection ),
791793 ok .
792794
795+ set_sender_capabilities (Config ) ->
796+ Hostname = ? config (mock_host , Config ),
797+ Port = ? config (mock_port , Config ),
798+
799+ OpenStep = fun ({0 = Ch , # 'v1_0.open' {}, _Pay }) ->
800+ {Ch , [# 'v1_0.open' {container_id = {utf8 , <<" mock" >>}}]}
801+ end ,
802+ BeginStep = fun ({1 = Ch , # 'v1_0.begin' {}, _Pay }) ->
803+ {Ch , [# 'v1_0.begin' {remote_channel = {ushort , 1 },
804+ next_outgoing_id = {uint , 1 },
805+ incoming_window = {uint , 1000 },
806+ outgoing_window = {uint , 1000 }}
807+ ]}
808+ end ,
809+ AttachStep = fun ({1 = Ch , # 'v1_0.attach' {role = true ,
810+ name = Name ,
811+ source = # 'v1_0.source' {
812+ capabilities = {utf8 , <<" capability-1" >>}}}, <<>>}) ->
813+ {Ch , [# 'v1_0.attach' {name = Name ,
814+ handle = {uint , 99 },
815+ initial_delivery_count = {uint , 1 },
816+ role = false }
817+ ]}
818+ end ,
819+
820+ LinkCreditStep = fun ({1 = Ch , # 'v1_0.flow' {}, <<>>}) ->
821+ {Ch , {multi , [[# 'v1_0.transfer' {handle = {uint , 99 },
822+ delivery_id = {uint , 12 },
823+ more = true },
824+ # 'v1_0.data' {content = <<" hello " >>}],
825+ [# 'v1_0.transfer' {handle = {uint , 99 },
826+ % delivery_id can be omitted
827+ % for continuation frames
828+ delivery_id = undefined ,
829+ settled = undefined ,
830+ more = false },
831+ # 'v1_0.data' {content = <<" world" >>}]
832+ ]}}
833+ end ,
834+ Steps = [fun mock_server :recv_amqp_header_step /1 ,
835+ fun mock_server :send_amqp_header_step /1 ,
836+ mock_server :amqp_step (OpenStep ),
837+ mock_server :amqp_step (BeginStep ),
838+ mock_server :amqp_step (AttachStep ),
839+ mock_server :amqp_step (LinkCreditStep )
840+ ],
841+
842+ ok = mock_server :set_steps (? config (mock_server , Config ), Steps ),
843+
844+ Cfg = #{address => Hostname , port => Port , sasl => none , notify => self ()},
845+ {ok , Connection } = amqp10_client :open_connection (Cfg ),
846+ {ok , Session } = amqp10_client :begin_session_sync (Connection ),
847+ {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" mock1-received" >>,
848+ <<" test" >>,
849+ setlled , none ,
850+ #{}, % filter
851+ #{}, % prorperties
852+ <<" capability-1" >>),
853+ amqp10_client :flow_link_credit (Receiver , 100 , 50 ),
854+ receive
855+ {amqp10_msg , Receiver , _InMsg } ->
856+ ok
857+ after 2000 ->
858+ exit (delivery_timeout )
859+ end ,
860+
861+ ok = amqp10_client :end_session (Session ),
862+ ok = amqp10_client :close_connection (Connection ),
863+ ok .
864+
793865outgoing_heartbeat (Config ) ->
794866 Hostname = ? config (rmq_hostname , Config ),
795867 Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
0 commit comments