@@ -30,7 +30,9 @@ groups() ->
3030 [
3131 {cluster_size_2 , [], [
3232 policy_ttl ,
33- operator_policy_ttl
33+ operator_policy_ttl ,
34+ operator_retroactive_policy_ttl ,
35+ operator_retroactive_policy_publish_ttl
3436 ]}
3537 ].
3638
@@ -112,6 +114,49 @@ operator_policy_ttl(Config) ->
112114 rabbit_ct_client_helpers :close_connection (Conn ),
113115 passed .
114116
117+ operator_retroactive_policy_ttl (Config ) ->
118+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
119+ Q = <<" policy_ttl-queue" >>,
120+ declare (Ch , Q ),
121+ publish (Ch , Q , lists :seq (1 , 50 )),
122+ % Operator policy will override
123+ rabbit_ct_broker_helpers :set_operator_policy (Config , 0 , <<" ttl-policy-op" >>,
124+ <<" policy_ttl-queue" >>, <<" all" >>, [{<<" message-ttl" >>, 1 }]),
125+
126+ % % Old messages are not expired
127+ timer :sleep (50 ),
128+ get_messages (50 , Ch , Q ),
129+ delete (Ch , Q ),
130+
131+ rabbit_ct_broker_helpers :clear_operator_policy (Config , 0 , <<" ttl-policy-op" >>),
132+
133+ rabbit_ct_client_helpers :close_channel (Ch ),
134+ rabbit_ct_client_helpers :close_connection (Conn ),
135+ passed .
136+
137+ operator_retroactive_policy_publish_ttl (Config ) ->
138+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
139+ Q = <<" policy_ttl-queue" >>,
140+ declare (Ch , Q ),
141+ publish (Ch , Q , lists :seq (1 , 50 )),
142+ % Operator policy will override
143+ rabbit_ct_broker_helpers :set_operator_policy (Config , 0 , <<" ttl-policy-op" >>,
144+ <<" policy_ttl-queue" >>, <<" all" >>, [{<<" message-ttl" >>, 1 }]),
145+
146+ % % Old messages are not expired, new ones only expire when they get to the head of
147+ % % the queue
148+ publish (Ch , Q , lists :seq (1 , 25 )),
149+ timer :sleep (50 ),
150+ [[<<" policy_ttl-queue" >>, <<" 75" >>]] = rabbit_ct_broker_helpers :rabbitmqctl_list (Config , 0 , [" list_queues" ]),
151+ get_messages (50 , Ch , Q ),
152+ delete (Ch , Q ),
153+
154+ rabbit_ct_broker_helpers :clear_operator_policy (Config , 0 , <<" ttl-policy-op" >>),
155+
156+ rabbit_ct_client_helpers :close_channel (Ch ),
157+ rabbit_ct_client_helpers :close_connection (Conn ),
158+ passed .
159+
115160% %----------------------------------------------------------------------------
116161
117162
@@ -154,4 +199,14 @@ consume(Ch, Q, Ack) ->
154199get_empty (Ch , Q ) ->
155200 # 'basic.get_empty' {} = amqp_channel :call (Ch , # 'basic.get' {queue = Q }).
156201
202+ get_messages (0 , Ch , Q ) ->
203+ get_empty (Ch , Q );
204+ get_messages (Number , Ch , Q ) ->
205+ case amqp_channel :call (Ch , # 'basic.get' {queue = Q }) of
206+ {# 'basic.get_ok' {}, _ } ->
207+ get_messages (Number - 1 , Ch , Q );
208+ # 'basic.get_empty' {} ->
209+ exit (failed )
210+ end .
211+
157212% %----------------------------------------------------------------------------
0 commit comments