@@ -101,36 +101,43 @@ deduplicate_message(Config) ->
101101 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
102102 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
103103
104- timer :sleep (1000 ),
105-
106104 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
107105 # 'basic.get_empty' {} = amqp_channel :call (Channel , Get ),
108106
109107 % % Deduplication header absent
110108 publish_message (Channel , <<" test" >>),
111109 publish_message (Channel , <<" test" >>),
112110
113- timer :sleep (1000 ),
114-
115111 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
116112 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
117113
118114deduplicate_message_ttl (Config ) ->
115+ Get = # 'basic.get' {queue = <<" test" >>},
119116 Channel = rabbit_ct_client_helpers :open_channel (Config ),
120117
121118 # 'exchange.declare_ok' {} = amqp_channel :call (
122119 Channel , make_exchange (<<" test" >>, 10 , 1000 )),
123120 bind_new_queue (Channel , <<" test" >>, <<" test" >>),
124121
122+ % % Exchange default TTL
125123 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
126124 timer :sleep (2000 ),
127125 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
128126
129- Get = # 'basic.get' {queue = <<" test" >>},
127+ {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
128+ {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
129+
130+ % % Message TTL override
131+ Headers = [{<<" x-cache-ttl" >>, long , 500 }],
132+ publish_message (Channel , <<" test" >>, " deduplicate-that" , Headers ),
133+ timer :sleep (800 ),
134+ publish_message (Channel , <<" test" >>, " deduplicate-that" , Headers ),
135+
130136 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
131137 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
132138
133139deduplicate_message_cache_overflow (Config ) ->
140+ Get = # 'basic.get' {queue = <<" test" >>},
134141 Channel = rabbit_ct_client_helpers :open_channel (Config ),
135142
136143 # 'exchange.declare_ok' {} = amqp_channel :call (
@@ -141,9 +148,6 @@ deduplicate_message_cache_overflow(Config) ->
141148 publish_message (Channel , <<" test" >>, " deduplicate-that" ),
142149 publish_message (Channel , <<" test" >>, " deduplicate-this" ),
143150
144- timer :sleep (1000 ),
145-
146- Get = # 'basic.get' {queue = <<" test" >>},
147151 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
148152 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
149153 {# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
@@ -172,7 +176,11 @@ publish_message(Ch, Ex) ->
172176 amqp_channel :cast (Ch , Publish , Msg ).
173177
174178publish_message (Ch , Ex , D ) ->
179+ publish_message (Ch , Ex , D , []).
180+
181+ publish_message (Ch , Ex , D , H ) ->
182+ Headers = [{<<" x-deduplication-header" >>, longstr , D }] ++ H ,
175183 Publish = # 'basic.publish' {exchange = Ex , routing_key = <<" #" >>},
176- Props = # 'P_basic' {headers = [{<< " x-deduplication-header " >>, longstr , D }] },
184+ Props = # 'P_basic' {headers = Headers },
177185 Msg = # amqp_msg {props = Props , payload = <<" payload" >>},
178186 amqp_channel :cast (Ch , Publish , Msg ).
0 commit comments