@@ -75,59 +75,58 @@ deduplicate_message(Config) ->
7575 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
7676 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
7777
78- timer :sleep (1000 ),
79-
8078 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
8179 # 'basic.get_empty' {} = amqp_channel :call (Channel , Get ),
8280
8381 % % Deduplication header absent
8482 publish_message (Channel , <<" test" >>),
8583 publish_message (Channel , <<" test" >>),
8684
87- timer :sleep (1000 ),
88-
8985 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
9086 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
9187
9288deduplicate_message_ttl (Config ) ->
89+ Get = # 'basic.get' {queue = <<" test" >>},
9390 Channel = rabbit_ct_client_helpers :open_channel (Config ),
9491
9592 Args = [{<<" x-message-ttl" >>, long , 1000 }],
9693 # 'queue.declare_ok' {} = amqp_channel :call (Channel ,
9794 make_queue (<<" test" >>, Args )),
9895 bind_new_exchange (Channel , <<" test" >>, <<" test" >>),
9996
97+ % % Queue default TTL
10098 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
10199 timer :sleep (2000 ),
102100 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
103- timer :sleep (500 ),
104101
105- Get = # 'basic.get' {queue = <<" test" >>},
102+ {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
103+
104+ % % Message TTL override
105+ publish_message (Channel , <<" test" >>, " deduplicate-that" , <<" 500" >>),
106+ timer :sleep (800 ),
107+ publish_message (Channel , <<" test" >>, " deduplicate-that" , <<" 500" >>),
108+
106109 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
107110
108111message_acknowledged (Config ) ->
112+ Get = # 'basic.get' {queue = <<" test" >>},
109113 Channel = rabbit_ct_client_helpers :open_channel (Config ),
110114
111115 # 'queue.declare_ok' {} = amqp_channel :call (Channel , make_queue (<<" test" >>)),
112116 bind_new_exchange (Channel , <<" test" >>, <<" test" >>),
113117
114118 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
115119
116- timer :sleep (2000 ),
117-
118- Get = # 'basic.get' {queue = <<" test" >>},
119120 {# 'basic.get_ok' {delivery_tag = Tag }, _ } = amqp_channel :call (Channel , Get ),
120121
121122 amqp_channel :cast (Channel , # 'basic.ack' {delivery_tag = Tag }),
122123
123124 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
124125
125- timer :sleep (2000 ),
126-
127- Get = # 'basic.get' {queue = <<" test" >>},
128126 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
129127
130128queue_overflow (Config ) ->
129+ Get = # 'basic.get' {queue = <<" test" >>},
131130 Channel = rabbit_ct_client_helpers :open_channel (Config ),
132131
133132 Args = [{<<" x-max-length" >>, long , 1 }],
@@ -139,9 +138,6 @@ queue_overflow(Config) ->
139138 publish_message (Channel , <<" test" >>, " deduplicate-that" ),
140139 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
141140
142- timer :sleep (1000 ),
143-
144- Get = # 'basic.get' {queue = <<" test" >>},
145141 {# 'basic.get_ok' {},
146142 # amqp_msg {props = # 'P_basic' {headers = [
147143 {<<" x-deduplication-header" >>,
@@ -177,7 +173,14 @@ publish_message(Ch, Ex) ->
177173 amqp_channel :cast (Ch , Publish , Msg ).
178174
179175publish_message (Ch , Ex , D ) ->
180- Publish = # 'basic.publish' {exchange = Ex , routing_key = <<" #" >>},
181176 Props = # 'P_basic' {headers = [{<<" x-deduplication-header" >>, longstr , D }]},
177+ Publish = # 'basic.publish' {exchange = Ex , routing_key = <<" #" >>},
178+ Msg = # amqp_msg {props = Props , payload = <<" payload" >>},
179+ amqp_channel :cast (Ch , Publish , Msg ).
180+
181+ publish_message (Ch , Ex , D , E ) ->
182+ Props = # 'P_basic' {headers = [{<<" x-deduplication-header" >>, longstr , D }],
183+ expiration = E },
184+ Publish = # 'basic.publish' {exchange = Ex , routing_key = <<" #" >>},
182185 Msg = # amqp_msg {props = Props , payload = <<" payload" >>},
183186 amqp_channel :cast (Ch , Publish , Msg ).
0 commit comments