Skip to content

Commit 341e5f1

Browse files
committed
test: test message deduplication with no duplication header
Signed-off-by: Matteo Cafasso <[email protected]>
1 parent a8810ad commit 341e5f1

File tree

2 files changed

+37
-7
lines changed

2 files changed

+37
-7
lines changed

test/exchange_SUITE.erl

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,30 @@ declare_exchanges(Config) ->
9090
?assertExit(_, amqp_channel:call(Channel, DeclareErr)).
9191

9292
deduplicate_message(Config) ->
93+
Get = #'basic.get'{queue = <<"test">>},
9394
Channel = rabbit_ct_client_helpers:open_channel(Config),
9495

9596
#'exchange.declare_ok'{} = amqp_channel:call(
9697
Channel, make_exchange(<<"test">>, 10, 10000)),
9798
bind_new_queue(Channel, <<"test">>, <<"test">>),
9899

100+
%% Deduplication header present
99101
publish_message(Channel, <<"test">>, "deduplicate-this"),
100102
publish_message(Channel, <<"test">>, "deduplicate-this"),
101103

102-
timer:sleep(2000),
104+
timer:sleep(1000),
105+
106+
{#'basic.get_ok'{}, _} = amqp_channel:call(Channel, Get),
107+
#'basic.get_empty'{} = amqp_channel:call(Channel, Get),
108+
109+
%% Deduplication header absent
110+
publish_message(Channel, <<"test">>),
111+
publish_message(Channel, <<"test">>),
112+
113+
timer:sleep(1000),
103114

104-
Get = #'basic.get'{queue = <<"test">>},
105115
{#'basic.get_ok'{}, _} = amqp_channel:call(Channel, Get),
106-
#'basic.get_empty'{} = amqp_channel:call(Channel, Get).
116+
{#'basic.get_ok'{}, _} = amqp_channel:call(Channel, Get).
107117

108118
deduplicate_message_ttl(Config) ->
109119
Channel = rabbit_ct_client_helpers:open_channel(Config),
@@ -131,7 +141,7 @@ deduplicate_message_cache_overflow(Config) ->
131141
publish_message(Channel, <<"test">>, "deduplicate-that"),
132142
publish_message(Channel, <<"test">>, "deduplicate-this"),
133143

134-
timer:sleep(2000),
144+
timer:sleep(1000),
135145

136146
Get = #'basic.get'{queue = <<"test">>},
137147
{#'basic.get_ok'{}, _} = amqp_channel:call(Channel, Get),
@@ -156,6 +166,11 @@ bind_new_queue(Ch, Ex, Q) ->
156166
Binding = #'queue.bind'{queue = Q, exchange = Ex, routing_key = <<"#">>},
157167
#'queue.bind_ok'{} = amqp_channel:call(Ch, Binding).
158168

169+
publish_message(Ch, Ex) ->
170+
Publish = #'basic.publish'{exchange = Ex, routing_key = <<"#">>},
171+
Msg = #amqp_msg{payload = <<"payload">>},
172+
amqp_channel:cast(Ch, Publish, Msg).
173+
159174
publish_message(Ch, Ex, D) ->
160175
Publish = #'basic.publish'{exchange = Ex, routing_key = <<"#">>},
161176
Props = #'P_basic'{headers = [{<<"x-deduplication-header">>, longstr, D}]},

test/queue_SUITE.erl

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,29 @@ end_per_testcase(Testcase, Config) ->
6565
%% -------------------------------------------------------------------
6666

6767
deduplicate_message(Config) ->
68+
Get = #'basic.get'{queue = <<"test">>},
6869
Channel = rabbit_ct_client_helpers:open_channel(Config),
6970

7071
#'queue.declare_ok'{} = amqp_channel:call(Channel, make_queue(<<"test">>)),
7172
bind_new_exchange(Channel, <<"test">>, <<"test">>),
7273

74+
%% Deduplication header present
7375
publish_message(Channel, <<"test">>, "deduplicate-this"),
7476
publish_message(Channel, <<"test">>, "deduplicate-this"),
7577

76-
timer:sleep(2000),
78+
timer:sleep(1000),
7779

78-
Get = #'basic.get'{queue = <<"test">>},
7980
{#'basic.get_ok'{}, _} = amqp_channel:call(Channel, Get),
80-
#'basic.get_empty'{} = amqp_channel:call(Channel, Get).
81+
#'basic.get_empty'{} = amqp_channel:call(Channel, Get),
82+
83+
%% Deduplication header absent
84+
publish_message(Channel, <<"test">>),
85+
publish_message(Channel, <<"test">>),
86+
87+
timer:sleep(1000),
88+
89+
{#'basic.get_ok'{}, _} = amqp_channel:call(Channel, Get),
90+
{#'basic.get_ok'{}, _} = amqp_channel:call(Channel, Get).
8191

8292
deduplicate_message_ttl(Config) ->
8393
Channel = rabbit_ct_client_helpers:open_channel(Config),
@@ -161,6 +171,11 @@ bind_new_exchange(Ch, Ex, Q) ->
161171
Binding = #'queue.bind'{queue = Q, exchange = Ex, routing_key = <<"#">>},
162172
#'queue.bind_ok'{} = amqp_channel:call(Ch, Binding).
163173

174+
publish_message(Ch, Ex) ->
175+
Publish = #'basic.publish'{exchange = Ex, routing_key = <<"#">>},
176+
Msg = #amqp_msg{payload = <<"payload">>},
177+
amqp_channel:cast(Ch, Publish, Msg).
178+
164179
publish_message(Ch, Ex, D) ->
165180
Publish = #'basic.publish'{exchange = Ex, routing_key = <<"#">>},
166181
Props = #'P_basic'{headers = [{<<"x-deduplication-header">>, longstr, D}]},

0 commit comments

Comments
 (0)