@@ -152,6 +152,7 @@ all_tests() ->
152152 queue_length_limit_drop_head ,
153153 queue_length_limit_reject_publish ,
154154 subscribe_redelivery_limit ,
155+ subscribe_redelivery_limit_disable ,
155156 subscribe_redelivery_limit_many ,
156157 subscribe_redelivery_policy ,
157158 subscribe_redelivery_limit_with_dead_letter ,
@@ -2495,8 +2496,8 @@ subscribe_redelivery_count(Config) ->
24952496 # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
24962497 ? assertMatch (undefined , rabbit_basic :header (DCHeader , H0 )),
24972498 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
2498- multiple = false ,
2499- requeue = true })
2499+ multiple = false ,
2500+ requeue = true })
25002501 after 5000 ->
25012502 exit (basic_deliver_timeout )
25022503 end ,
@@ -2508,8 +2509,8 @@ subscribe_redelivery_count(Config) ->
25082509 ct :pal (" H1 ~p " , [H1 ]),
25092510 ? assertMatch ({DCHeader , _ , 1 }, rabbit_basic :header (DCHeader , H1 )),
25102511 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
2511- multiple = false ,
2512- requeue = true })
2512+ multiple = false ,
2513+ requeue = true })
25132514 after 5000 ->
25142515 flush (1 ),
25152516 exit (basic_deliver_timeout_2 )
@@ -2521,7 +2522,7 @@ subscribe_redelivery_count(Config) ->
25212522 # amqp_msg {props = # 'P_basic' {headers = H2 }}} ->
25222523 ? assertMatch ({DCHeader , _ , 2 }, rabbit_basic :header (DCHeader , H2 )),
25232524 amqp_channel :cast (Ch , # 'basic.ack' {delivery_tag = DeliveryTag2 ,
2524- multiple = false }),
2525+ multiple = false }),
25252526 ct :pal (" wait_for_messages_ready" , []),
25262527 wait_for_messages_ready (Servers , RaName , 0 ),
25272528 ct :pal (" wait_for_messages_pending_ack" , []),
@@ -2551,8 +2552,8 @@ subscribe_redelivery_limit(Config) ->
25512552 # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
25522553 ? assertMatch (undefined , rabbit_basic :header (DCHeader , H0 )),
25532554 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
2554- multiple = false ,
2555- requeue = true })
2555+ multiple = false ,
2556+ requeue = true })
25562557 end ,
25572558
25582559 wait_for_messages (Config , [[QQ , <<" 1" >>, <<" 0" >>, <<" 1" >>]]),
@@ -2562,8 +2563,8 @@ subscribe_redelivery_limit(Config) ->
25622563 # amqp_msg {props = # 'P_basic' {headers = H1 }}} ->
25632564 ? assertMatch ({DCHeader , _ , 1 }, rabbit_basic :header (DCHeader , H1 )),
25642565 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
2565- multiple = false ,
2566- requeue = true })
2566+ multiple = false ,
2567+ requeue = true })
25672568 end ,
25682569
25692570 wait_for_messages (Config , [[QQ , <<" 0" >>, <<" 0" >>, <<" 0" >>]]),
@@ -2574,6 +2575,51 @@ subscribe_redelivery_limit(Config) ->
25742575 ok
25752576 end .
25762577
2578+ subscribe_redelivery_limit_disable (Config ) ->
2579+ [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
2580+
2581+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
2582+ QQ = ? config (queue_name , Config ),
2583+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
2584+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
2585+ {<<" x-delivery-limit" >>, long , - 1 }])),
2586+ publish (Ch , QQ ),
2587+ wait_for_messages (Config , [[QQ , <<" 1" >>, <<" 1" >>, <<" 0" >>]]),
2588+ subscribe (Ch , QQ , false ),
2589+
2590+ DCHeader = <<" x-delivery-count" >>,
2591+ receive
2592+ {# 'basic.deliver' {delivery_tag = DeliveryTag ,
2593+ redelivered = false },
2594+ # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
2595+ ? assertMatch (undefined , rabbit_basic :header (DCHeader , H0 )),
2596+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
2597+ multiple = false ,
2598+ requeue = true })
2599+ end ,
2600+
2601+ wait_for_messages (Config , [[QQ , <<" 1" >>, <<" 0" >>, <<" 1" >>]]),
2602+ % % set an operator policy, this should always win
2603+ ok = rabbit_ct_broker_helpers :set_operator_policy (
2604+ Config , 0 , <<" delivery-limit" >>, QQ , <<" queues" >>,
2605+ [{<<" delivery-limit" >>, 0 }]),
2606+
2607+ receive
2608+ {# 'basic.deliver' {delivery_tag = DeliveryTag2 ,
2609+ redelivered = true },
2610+ # amqp_msg {props = # 'P_basic' {}}} ->
2611+ % ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
2612+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag2 ,
2613+ multiple = false ,
2614+ requeue = true })
2615+ after 5000 ->
2616+ flush (1 ),
2617+ ct :fail (" message did not arrive as expected" )
2618+ end ,
2619+ wait_for_messages (Config , [[QQ , <<" 0" >>, <<" 0" >>, <<" 0" >>]]),
2620+ ok = rabbit_ct_broker_helpers :clear_operator_policy (Config , 0 , <<" delivery-limit" >>),
2621+ ok .
2622+
25772623% % Test that consumer credit is increased correctly.
25782624subscribe_redelivery_limit_many (Config ) ->
25792625 [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
@@ -2637,8 +2683,8 @@ subscribe_redelivery_policy(Config) ->
26372683 # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
26382684 ? assertMatch (undefined , rabbit_basic :header (DCHeader , H0 )),
26392685 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
2640- multiple = false ,
2641- requeue = true })
2686+ multiple = false ,
2687+ requeue = true })
26422688 end ,
26432689
26442690 wait_for_messages (Config , [[QQ , <<" 1" >>, <<" 0" >>, <<" 1" >>]]),
@@ -2648,8 +2694,8 @@ subscribe_redelivery_policy(Config) ->
26482694 # amqp_msg {props = # 'P_basic' {headers = H1 }}} ->
26492695 ? assertMatch ({DCHeader , _ , 1 }, rabbit_basic :header (DCHeader , H1 )),
26502696 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
2651- multiple = false ,
2652- requeue = true })
2697+ multiple = false ,
2698+ requeue = true })
26532699 end ,
26542700
26552701 wait_for_messages (Config , [[QQ , <<" 0" >>, <<" 0" >>, <<" 0" >>]]),
@@ -2687,8 +2733,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
26872733 # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
26882734 ? assertMatch (undefined , rabbit_basic :header (DCHeader , H0 )),
26892735 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
2690- multiple = false ,
2691- requeue = true })
2736+ multiple = false ,
2737+ requeue = true })
26922738 end ,
26932739
26942740 wait_for_messages (Config , [[QQ , <<" 1" >>, <<" 0" >>, <<" 1" >>]]),
@@ -2698,8 +2744,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
26982744 # amqp_msg {props = # 'P_basic' {headers = H1 }}} ->
26992745 ? assertMatch ({DCHeader , _ , 1 }, rabbit_basic :header (DCHeader , H1 )),
27002746 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
2701- multiple = false ,
2702- requeue = true })
2747+ multiple = false ,
2748+ requeue = true })
27032749 end ,
27042750
27052751 wait_for_messages (Config , [[QQ , <<" 0" >>, <<" 0" >>, <<" 0" >>]]),
@@ -2726,8 +2772,8 @@ consume_redelivery_count(Config) ->
27262772 no_ack = false }),
27272773 ? assertMatch (undefined , rabbit_basic :header (DCHeader , H0 )),
27282774 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
2729- multiple = false ,
2730- requeue = true }),
2775+ multiple = false ,
2776+ requeue = true }),
27312777 % % wait for requeuing
27322778 {# 'basic.get_ok' {delivery_tag = DeliveryTag1 ,
27332779 redelivered = true },
@@ -2736,8 +2782,8 @@ consume_redelivery_count(Config) ->
27362782
27372783 ? assertMatch ({DCHeader , _ , 1 }, rabbit_basic :header (DCHeader , H1 )),
27382784 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
2739- multiple = false ,
2740- requeue = true }),
2785+ multiple = false ,
2786+ requeue = true }),
27412787
27422788 {# 'basic.get_ok' {delivery_tag = DeliveryTag2 ,
27432789 redelivered = true },
@@ -2746,8 +2792,8 @@ consume_redelivery_count(Config) ->
27462792 no_ack = false }),
27472793 ? assertMatch ({DCHeader , _ , 2 }, rabbit_basic :header (DCHeader , H2 )),
27482794 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag2 ,
2749- multiple = false ,
2750- requeue = true }),
2795+ multiple = false ,
2796+ requeue = true }),
27512797 ok .
27522798
27532799message_bytes_metrics (Config ) ->
@@ -2784,8 +2830,8 @@ message_bytes_metrics(Config) ->
27842830 {# 'basic.deliver' {delivery_tag = DeliveryTag ,
27852831 redelivered = false }, _ } ->
27862832 amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
2787- multiple = false ,
2788- requeue = false }),
2833+ multiple = false ,
2834+ requeue = false }),
27892835 wait_for_messages_ready (Servers , RaName , 0 ),
27902836 wait_for_messages_pending_ack (Servers , RaName , 0 ),
27912837 rabbit_ct_helpers :await_condition (
0 commit comments