Skip to content

Commit 27641a7

Browse files
committed
Merge pull request #1994 from rabbitmq/rabbitmq-server-1980
Reset overflow queue property to drop-head if policy is unset. (cherry picked from commit 80df800)
1 parent 415a6a5 commit 27641a7

File tree

2 files changed

+109
-3
lines changed

2 files changed

+109
-3
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,12 @@ init_max_bytes(MaxBytes, State) ->
446446
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
447447
State1.
448448

449-
init_overflow(undefined, State) ->
449+
%% Reset overflow to default 'drop-head' value if it's undefined.
450+
init_overflow(undefined, #q{overflow = 'drop-head'} = State) ->
450451
State;
452+
init_overflow(undefined, State) ->
453+
{_Dropped, State1} = maybe_drop_head(State#q{overflow = 'drop-head'}),
454+
State1;
451455
init_overflow(Overflow, State) ->
452456
OverflowVal = binary_to_existing_atom(Overflow, utf8),
453457
case OverflowVal of

test/confirms_rejects_SUITE.erl

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ all() ->
1313
groups() ->
1414
[
1515
{parallel_tests, [parallel], [
16-
confirms_rejects_conflict
16+
confirms_rejects_conflict,
17+
policy_resets_to_default
1718
]}
1819
].
1920

@@ -40,14 +41,28 @@ end_per_group(Group, Config) ->
4041
rabbit_ct_client_helpers:teardown_steps() ++
4142
rabbit_ct_broker_helpers:teardown_steps()).
4243

43-
init_per_testcase(Testcase, Config) ->
44+
init_per_testcase(policy_resets_to_default = Testcase, Config) ->
45+
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
46+
rabbit_ct_helpers:testcase_started(
47+
rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase);
48+
init_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
4449
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
4550
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
4651

4752
rabbit_ct_helpers:testcase_started(
4853
rabbit_ct_helpers:set_config(Config, [{conn, Conn}, {conn1, Conn1}]),
4954
Testcase).
5055

56+
end_per_testcase(policy_resets_to_default = Testcase, Config) ->
57+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
58+
amqp_channel:call(Ch, #'queue.delete'{queue = <<"policy_resets_to_default">>}),
59+
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
60+
61+
Conn = ?config(conn, Config),
62+
63+
rabbit_ct_client_helpers:close_connection(Conn),
64+
65+
rabbit_ct_helpers:testcase_finished(Config, Testcase);
5166
end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
5267
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
5368
amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}),
@@ -61,6 +76,8 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
6176

6277
rabbit_ct_helpers:testcase_finished(Config, Testcase).
6378

79+
80+
6481
confirms_rejects_conflict(Config) ->
6582
Conn = ?config(conn, Config),
6683
Conn1 = ?config(conn1, Config),
@@ -120,6 +137,91 @@ confirms_rejects_conflict(Config) ->
120137
{error, E} -> error(E)
121138
end.
122139

140+
policy_resets_to_default(Config) ->
141+
Conn = ?config(conn, Config),
142+
{ok, Ch} = amqp_connection:open_channel(Conn),
143+
QueueName = <<"policy_resets_to_default">>,
144+
145+
amqp_channel:call(Ch, #'confirm.select'{}),
146+
amqp_channel:register_confirm_handler(Ch, self()),
147+
148+
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
149+
durable = true
150+
}),
151+
MaxLength = 2,
152+
rabbit_ct_broker_helpers:set_policy(
153+
Config, 0,
154+
QueueName, QueueName, <<"queues">>,
155+
[{<<"max-length">>, MaxLength}, {<<"overflow">>, <<"reject-publish">>}]),
156+
157+
timer:sleep(1000),
158+
159+
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
160+
#amqp_msg{payload = <<"HI">>})
161+
|| _ <- lists:seq(1, MaxLength)],
162+
163+
assert_acks(MaxLength),
164+
165+
#'queue.declare_ok'{message_count = MaxLength} =
166+
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
167+
durable = true}),
168+
169+
RejectedMessage = <<"HI-rejected">>,
170+
amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
171+
#amqp_msg{payload = RejectedMessage}),
172+
173+
assert_nack(),
174+
175+
rabbit_ct_broker_helpers:set_policy(
176+
Config, 0,
177+
QueueName, QueueName, <<"queues">>,
178+
[{<<"max-length">>, MaxLength}]),
179+
180+
NotRejectedMessage = <<"HI-not-rejected">>,
181+
amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
182+
#amqp_msg{payload = NotRejectedMessage}),
183+
184+
assert_ack(),
185+
186+
#'queue.declare_ok'{message_count = MaxLength} =
187+
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
188+
durable = true}),
189+
190+
Msgs = consume_all_messages(Ch, QueueName),
191+
case {lists:member(RejectedMessage, Msgs), lists:member(NotRejectedMessage, Msgs)} of
192+
{true, _} -> error({message_should_be_rejected, RejectedMessage});
193+
{_, false} -> error({message_should_be_enqueued, NotRejectedMessage});
194+
_ -> ok
195+
end.
196+
197+
consume_all_messages(Ch, QueueName) ->
198+
consume_all_messages(Ch, QueueName, []).
199+
200+
consume_all_messages(Ch, QueueName, Msgs) ->
201+
case amqp_channel:call(Ch, #'basic.get'{queue = QueueName, no_ack = true}) of
202+
{#'basic.get_ok'{}, #amqp_msg{payload = Msg}} ->
203+
consume_all_messages(Ch, QueueName, [Msg | Msgs]);
204+
#'basic.get_empty'{} -> Msgs
205+
end.
206+
207+
assert_ack() ->
208+
receive {'basic.ack', _, _} -> ok
209+
after 10000 -> error(timeout_waiting_for_ack)
210+
end,
211+
clean_acks_mailbox().
212+
213+
assert_nack() ->
214+
receive {'basic.nack', _, _, _} -> ok
215+
after 10000 -> error(timeout_waiting_for_nack)
216+
end,
217+
clean_acks_mailbox().
218+
219+
assert_acks(N) ->
220+
receive {'basic.ack', N, _} -> ok
221+
after 10000 -> error({timeout_waiting_for_ack, N})
222+
end,
223+
clean_acks_mailbox().
224+
123225
validate_acks_mailbox() ->
124226
Result = validate_acks_mailbox({0, ok}),
125227
clean_acks_mailbox(),

0 commit comments

Comments
 (0)