@@ -68,8 +68,15 @@ groups() ->
6868 local_to_local_delete_dest_queue ,
6969 local_to_local_vhost_access ,
7070 local_to_local_user_access ,
71- local_to_local_credit_flow ,
72- local_to_local_stream_credit_flow
71+ local_to_local_credit_flow_on_confirm ,
72+ local_to_local_credit_flow_on_publish ,
73+ local_to_local_credit_flow_no_ack ,
74+ local_to_local_quorum_credit_flow_on_confirm ,
75+ local_to_local_quorum_credit_flow_on_publish ,
76+ local_to_local_quorum_credit_flow_no_ack ,
77+ local_to_local_stream_credit_flow_on_confirm ,
78+ local_to_local_stream_credit_flow_on_publish ,
79+ local_to_local_stream_credit_flow_no_ack
7380 ]}
7481 ].
7582
@@ -930,7 +937,16 @@ local_to_local_user_access(Config) ->
930937 none ]),
931938 shovel_test_utils :await_no_shovel (Config , ? PARAM ).
932939
933- local_to_local_credit_flow (Config ) ->
940+ local_to_local_credit_flow_on_confirm (Config ) ->
941+ local_to_local_credit_flow (Config , <<" on-confirm" >>).
942+
943+ local_to_local_credit_flow_on_publish (Config ) ->
944+ local_to_local_credit_flow (Config , <<" on-publish" >>).
945+
946+ local_to_local_credit_flow_no_ack (Config ) ->
947+ local_to_local_credit_flow (Config , <<" no-ack" >>).
948+
949+ local_to_local_credit_flow (Config , AckMode ) ->
934950 Src = ? config (srcq , Config ),
935951 Dest = ? config (destq , Config ),
936952 with_session (Config ,
@@ -939,13 +955,53 @@ local_to_local_credit_flow(Config) ->
939955 [{<<" src-protocol" >>, <<" local" >>},
940956 {<<" src-queue" >>, Src },
941957 {<<" dest-protocol" >>, <<" local" >>},
942- {<<" dest-queue" >>, Dest }
958+ {<<" dest-queue" >>, Dest },
959+ {<<" ack-mode" >>, AckMode }
960+ ]),
961+ publish_many (Sess , Src , Dest , <<" tag1" >>, 500 ),
962+ expect_many (Sess , Dest , 500 )
963+ end ).
964+
965+ local_to_local_quorum_credit_flow_on_confirm (Config ) ->
966+ local_to_local_quorum_credit_flow (Config , <<" on-confirm" >>).
967+
968+ local_to_local_quorum_credit_flow_on_publish (Config ) ->
969+ local_to_local_quorum_credit_flow (Config , <<" on-publish" >>).
970+
971+ local_to_local_quorum_credit_flow_no_ack (Config ) ->
972+ local_to_local_quorum_credit_flow (Config , <<" no-ack" >>).
973+
974+ local_to_local_quorum_credit_flow (Config , AckMode ) ->
975+ Src = ? config (srcq , Config ),
976+ Dest = ? config (destq , Config ),
977+ VHost = <<" /" >>,
978+ declare_queue (Config , VHost , Src , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]),
979+ declare_queue (Config , VHost , Dest , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]),
980+ with_session (Config ,
981+ fun (Sess ) ->
982+ shovel_test_utils :set_param (Config , ? PARAM ,
983+ [{<<" src-protocol" >>, <<" local" >>},
984+ {<<" src-queue" >>, Src },
985+ {<<" src-predeclared" >>, true },
986+ {<<" dest-protocol" >>, <<" local" >>},
987+ {<<" dest-queue" >>, Dest },
988+ {<<" dest-predeclared" >>, true },
989+ {<<" ack-mode" >>, AckMode }
943990 ]),
944991 publish_many (Sess , Src , Dest , <<" tag1" >>, 500 ),
945992 expect_many (Sess , Dest , 500 )
946993 end ).
947994
948- local_to_local_stream_credit_flow (Config ) ->
995+ local_to_local_stream_credit_flow_on_confirm (Config ) ->
996+ local_to_local_stream_credit_flow (Config , <<" on-confirm" >>).
997+
998+ local_to_local_stream_credit_flow_on_publish (Config ) ->
999+ local_to_local_stream_credit_flow (Config , <<" on-publish" >>).
1000+
1001+ local_to_local_stream_credit_flow_no_ack (Config ) ->
1002+ local_to_local_stream_credit_flow (Config , <<" no-ack" >>).
1003+
1004+ local_to_local_stream_credit_flow (Config , AckMode ) ->
9491005 Src = ? config (srcq , Config ),
9501006 Dest = ? config (destq , Config ),
9511007 VHost = <<" /" >>,
@@ -959,7 +1015,8 @@ local_to_local_stream_credit_flow(Config) ->
9591015 {<<" src-predeclared" >>, true },
9601016 {<<" dest-protocol" >>, <<" local" >>},
9611017 {<<" dest-queue" >>, Dest },
962- {<<" dest-predeclared" >>, true }
1018+ {<<" dest-predeclared" >>, true },
1019+ {<<" ack-mode" >>, AckMode }
9631020 ]),
9641021
9651022 Receiver = subscribe (Sess , Dest ),
@@ -972,6 +1029,7 @@ local_to_local_stream_credit_flow(Config) ->
9721029 amqp10_client :detach_link (Receiver )
9731030 end ).
9741031
1032+
9751033% %----------------------------------------------------------------------------
9761034with_session (Config , Fun ) ->
9771035 with_session (Config , <<" /" >>, Fun ).
0 commit comments