@@ -167,6 +167,7 @@ all_tests() ->
167167 subscribe_redelivery_count ,
168168 message_bytes_metrics ,
169169 queue_length_limit_drop_head ,
170+ queue_length_bytes_limit_drop_head ,
170171 queue_length_limit_reject_publish ,
171172 queue_length_limit_policy_cleared ,
172173 subscribe_redelivery_limit ,
@@ -3677,6 +3678,50 @@ queue_length_limit_drop_head(Config) ->
36773678 amqp_channel :call (Ch , # 'basic.get' {queue = QQ ,
36783679 no_ack = true })).
36793680
3681+ queue_length_bytes_limit_drop_head (Config ) ->
3682+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
3683+
3684+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
3685+ QQ = ? config (queue_name , Config ),
3686+ DLQ = <<" dead letter queue" >>,
3687+
3688+ ? assertEqual ({'queue.declare_ok' , DLQ , 0 , 0 },
3689+ declare (Ch , DLQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
3690+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
3691+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
3692+ {<<" x-overflow" >>, longstr , <<" drop-head" >>},
3693+ {<<" x-max-length-bytes" >>, long , 1000 },
3694+ {<<" x-dead-letter-exchange" >>, longstr , <<>>},
3695+ {<<" x-dead-letter-routing-key" >>, longstr , DLQ }])),
3696+
3697+ LargePayload = binary :copy (<<" x" >>, 1500 ),
3698+ ok = amqp_channel :cast (Ch ,
3699+ # 'basic.publish' {routing_key = QQ },
3700+ # amqp_msg {payload = <<" m1" >>}),
3701+ ok = amqp_channel :cast (Ch ,
3702+ # 'basic.publish' {routing_key = QQ },
3703+ # amqp_msg {payload = <<" m2" >>}),
3704+ ok = amqp_channel :cast (Ch ,
3705+ # 'basic.publish' {routing_key = QQ },
3706+ # amqp_msg {payload = LargePayload }),
3707+ wait_for_consensus (QQ , Config ),
3708+ wait_for_consensus (DLQ , Config ),
3709+ RaName = ra_name (DLQ ),
3710+ wait_for_messages_ready (Servers , RaName , 3 ),
3711+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m1" >>}},
3712+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
3713+ no_ack = true })),
3714+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m2" >>}},
3715+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
3716+ no_ack = true })),
3717+ ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = LargePayload }},
3718+ amqp_channel :call (Ch , # 'basic.get' {queue = DLQ ,
3719+ no_ack = true })),
3720+
3721+ [? assertEqual (# 'queue.delete_ok' {message_count = 0 },
3722+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q }))
3723+ || Q <- [QQ , DLQ ]].
3724+
36803725queue_length_limit_reject_publish (Config ) ->
36813726 [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
36823727
0 commit comments