@@ -153,6 +153,10 @@ all_tests() ->
153153 dead_letter_to_quorum_queue ,
154154 dead_letter_from_classic_to_quorum_queue ,
155155 dead_letter_policy ,
156+ at_most_once_dead_letter_order_maxlen ,
157+ at_most_once_dead_letter_order_rejected ,
158+ at_most_once_dead_letter_order_delivery_limit ,
159+ at_most_once_dead_letter_order_expired ,
156160 cleanup_queue_state_on_channel_after_publish ,
157161 cleanup_queue_state_on_channel_after_subscribe ,
158162 sync_queue ,
@@ -167,7 +171,6 @@ all_tests() ->
167171 subscribe_redelivery_count ,
168172 message_bytes_metrics ,
169173 queue_length_limit_drop_head ,
170- queue_length_bytes_limit_drop_head ,
171174 queue_length_limit_reject_publish ,
172175 queue_length_limit_policy_cleared ,
173176 subscribe_redelivery_limit ,
@@ -2092,6 +2095,196 @@ dead_letter_policy(Config) ->
20922095 ok = rabbit_ct_broker_helpers :clear_policy (Config , 0 , <<" dlx" >>),
20932096 test_dead_lettering (false , Config , Ch , Servers , RaName , QQ , CQ ).
20942097
2098+ % % Test that messages are at most once dead letter in the correct order
2099+ % % for reason 'maxlen'.
2100+ at_most_once_dead_letter_order_maxlen (Config ) ->
2101+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
2102+
2103+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
2104+ QQ = ? config (queue_name , Config ),
2105+ DLQ = <<" dead letter queue" >>,
2106+
2107+ ? assertEqual ({'queue.declare_ok' , DLQ , 0 , 0 },
2108+ declare (Ch , DLQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
2109+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
2110+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
2111+ {<<" x-overflow" >>, longstr , <<" drop-head" >>},
2112+ {<<" x-max-length-bytes" >>, long , 1000 },
2113+ {<<" x-dead-letter-exchange" >>, longstr , <<>>},
2114+ {<<" x-dead-letter-routing-key" >>, longstr , DLQ }])),
2115+
2116+ LargePayload = binary :copy (<<" x" >>, 1500 ),
2117+ ok = amqp_channel :cast (Ch ,
2118+ # 'basic.publish' {routing_key = QQ },
2119+ # amqp_msg {payload = <<" m1" >>}),
2120+ ok = amqp_channel :cast (Ch ,
2121+ # 'basic.publish' {routing_key = QQ },
2122+ # amqp_msg {payload = <<" m2" >>}),
2123+ ok = amqp_channel :cast (Ch ,
2124+ # 'basic.publish' {routing_key = QQ },
2125+ # amqp_msg {payload = LargePayload }),
2126+ wait_for_consensus (QQ , Config ),
2127+ wait_for_consensus (DLQ , Config ),
2128+ RaName = ra_name (DLQ ),
2129+ wait_for_messages_ready (Servers , RaName , 3 ),
2130+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m1" >>}},
2131+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2132+ no_ack = true })),
2133+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m2" >>}},
2134+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2135+ no_ack = true })),
2136+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = LargePayload }},
2137+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2138+ no_ack = true })),
2139+
2140+ [? assertEqual (# 'queue.delete_ok' {message_count = 0 },
2141+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q }))
2142+ || Q <- [QQ , DLQ ]].
2143+
2144+ % % Test that messages are at most once dead letter in the correct order
2145+ % % for reason 'rejected'.
2146+ at_most_once_dead_letter_order_rejected (Config ) ->
2147+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
2148+
2149+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
2150+ QQ = ? config (queue_name , Config ),
2151+ DLQ = <<" dead letter queue" >>,
2152+
2153+ ? assertEqual ({'queue.declare_ok' , DLQ , 0 , 0 },
2154+ declare (Ch , DLQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
2155+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
2156+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
2157+ {<<" x-dead-letter-exchange" >>, longstr , <<>>},
2158+ {<<" x-dead-letter-routing-key" >>, longstr , DLQ }])),
2159+
2160+ ok = amqp_channel :cast (Ch ,
2161+ # 'basic.publish' {routing_key = QQ },
2162+ # amqp_msg {payload = <<" m1" >>}),
2163+ ok = amqp_channel :cast (Ch ,
2164+ # 'basic.publish' {routing_key = QQ },
2165+ # amqp_msg {payload = <<" m2" >>}),
2166+
2167+ ok = subscribe (Ch , QQ , false ),
2168+ receive {_ , # amqp_msg {payload = P1 }} ->
2169+ ? assertEqual (<<" m1" >>, P1 )
2170+ end ,
2171+ receive {_ , # amqp_msg {payload = P2 }} ->
2172+ ? assertEqual (<<" m2" >>, P2 )
2173+ end ,
2174+ ok = amqp_channel :call (Ch , # 'basic.nack' {delivery_tag = 0 ,
2175+ multiple = true ,
2176+ requeue = false }),
2177+
2178+ wait_for_consensus (DLQ , Config ),
2179+ wait_for_messages_ready (Servers , ra_name (DLQ ), 2 ),
2180+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m1" >>}},
2181+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2182+ no_ack = true })),
2183+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m2" >>}},
2184+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2185+ no_ack = true })),
2186+
2187+ [? assertEqual (# 'queue.delete_ok' {message_count = 0 },
2188+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q }))
2189+ || Q <- [QQ , DLQ ]].
2190+
2191+ % % Test that messages are at most once dead letter in the correct order
2192+ % % for reason 'delivery_limit'.
2193+ at_most_once_dead_letter_order_delivery_limit (Config ) ->
2194+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
2195+
2196+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
2197+ QQ = ? config (queue_name , Config ),
2198+ DLQ = <<" dead letter queue" >>,
2199+
2200+ ? assertEqual ({'queue.declare_ok' , DLQ , 0 , 0 },
2201+ declare (Ch , DLQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
2202+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
2203+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
2204+ {<<" x-delivery-limit" >>, long , 0 },
2205+ {<<" x-dead-letter-exchange" >>, longstr , <<>>},
2206+ {<<" x-dead-letter-routing-key" >>, longstr , DLQ }])),
2207+
2208+ ok = amqp_channel :cast (Ch ,
2209+ # 'basic.publish' {routing_key = QQ },
2210+ # amqp_msg {payload = <<" m1" >>}),
2211+ ok = amqp_channel :cast (Ch ,
2212+ # 'basic.publish' {routing_key = QQ },
2213+ # amqp_msg {payload = <<" m2" >>}),
2214+
2215+ ok = subscribe (Ch , QQ , false ),
2216+ receive {_ , # amqp_msg {payload = P1 }} ->
2217+ ? assertEqual (<<" m1" >>, P1 )
2218+ end ,
2219+ receive {_ , # amqp_msg {payload = P2 }} ->
2220+ ? assertEqual (<<" m2" >>, P2 )
2221+ end ,
2222+ ok = amqp_channel :call (Ch , # 'basic.nack' {delivery_tag = 0 ,
2223+ multiple = true ,
2224+ requeue = true }),
2225+
2226+ wait_for_consensus (DLQ , Config ),
2227+ wait_for_messages_ready (Servers , ra_name (DLQ ), 2 ),
2228+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m1" >>}},
2229+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2230+ no_ack = true })),
2231+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m2" >>}},
2232+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2233+ no_ack = true })),
2234+
2235+ [? assertEqual (# 'queue.delete_ok' {message_count = 0 },
2236+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q }))
2237+ || Q <- [QQ , DLQ ]].
2238+
2239+ % % Test that messages are at most once dead letter in the correct order
2240+ % % for reason 'expired'.
2241+ at_most_once_dead_letter_order_expired (Config ) ->
2242+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
2243+
2244+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
2245+ QQ = ? config (queue_name , Config ),
2246+ DLQ = <<" dead letter queue" >>,
2247+
2248+ ? assertEqual ({'queue.declare_ok' , DLQ , 0 , 0 },
2249+ declare (Ch , DLQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
2250+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
2251+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
2252+ {<<" x-dead-letter-exchange" >>, longstr , <<>>},
2253+ {<<" x-dead-letter-routing-key" >>, longstr , DLQ }])),
2254+
2255+ ok = amqp_channel :cast (Ch ,
2256+ # 'basic.publish' {routing_key = QQ },
2257+ # amqp_msg {payload = <<" m1" >>}),
2258+ ok = amqp_channel :cast (Ch ,
2259+ # 'basic.publish' {routing_key = QQ },
2260+ # amqp_msg {props = # 'P_basic' {expiration = <<" 1" >>},
2261+ payload = <<" m2" >>}),
2262+ ok = amqp_channel :cast (Ch ,
2263+ # 'basic.publish' {routing_key = QQ },
2264+ # amqp_msg {props = # 'P_basic' {expiration = <<" 1" >>},
2265+ payload = <<" m3" >>}),
2266+ wait_for_consensus (QQ , Config ),
2267+ wait_for_messages_ready (Servers , ra_name (QQ ), 3 ),
2268+
2269+ % % Let m2 and m3 expire before consuming m1.
2270+ timer :sleep (10 ),
2271+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m1" >>}},
2272+ amqp_channel :call (Ch , # 'basic.get' {queue = QQ ,
2273+ no_ack = true })),
2274+
2275+ wait_for_consensus (DLQ , Config ),
2276+ wait_for_messages_ready (Servers , ra_name (DLQ ), 2 ),
2277+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m2" >>}},
2278+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2279+ no_ack = true })),
2280+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m3" >>}},
2281+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
2282+ no_ack = true })),
2283+
2284+ [? assertEqual (# 'queue.delete_ok' {message_count = 0 },
2285+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q }))
2286+ || Q <- [QQ , DLQ ]].
2287+
20952288invalid_policy (Config ) ->
20962289 [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
20972290
@@ -3670,50 +3863,6 @@ queue_length_limit_drop_head(Config) ->
36703863 amqp_channel :call (Ch , # 'basic.get' {queue = QQ ,
36713864 no_ack = true })).
36723865
3673- queue_length_bytes_limit_drop_head (Config ) ->
3674- [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
3675-
3676- Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
3677- QQ = ? config (queue_name , Config ),
3678- DLQ = <<" dead letter queue" >>,
3679-
3680- ? assertEqual ({'queue.declare_ok' , DLQ , 0 , 0 },
3681- declare (Ch , DLQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
3682- ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
3683- declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
3684- {<<" x-overflow" >>, longstr , <<" drop-head" >>},
3685- {<<" x-max-length-bytes" >>, long , 1000 },
3686- {<<" x-dead-letter-exchange" >>, longstr , <<>>},
3687- {<<" x-dead-letter-routing-key" >>, longstr , DLQ }])),
3688-
3689- LargePayload = binary :copy (<<" x" >>, 1500 ),
3690- ok = amqp_channel :cast (Ch ,
3691- # 'basic.publish' {routing_key = QQ },
3692- # amqp_msg {payload = <<" m1" >>}),
3693- ok = amqp_channel :cast (Ch ,
3694- # 'basic.publish' {routing_key = QQ },
3695- # amqp_msg {payload = <<" m2" >>}),
3696- ok = amqp_channel :cast (Ch ,
3697- # 'basic.publish' {routing_key = QQ },
3698- # amqp_msg {payload = LargePayload }),
3699- wait_for_consensus (QQ , Config ),
3700- wait_for_consensus (DLQ , Config ),
3701- RaName = ra_name (DLQ ),
3702- wait_for_messages_ready (Servers , RaName , 3 ),
3703- ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m1" >>}},
3704- amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
3705- no_ack = true })),
3706- ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m2" >>}},
3707- amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
3708- no_ack = true })),
3709- ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = LargePayload }},
3710- amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
3711- no_ack = true })),
3712-
3713- [? assertEqual (# 'queue.delete_ok' {message_count = 0 },
3714- amqp_channel :call (Ch , # 'queue.delete' {queue = Q }))
3715- || Q <- [QQ , DLQ ]].
3716-
37173866queue_length_limit_reject_publish (Config ) ->
37183867 [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
37193868
0 commit comments