-
Notifications
You must be signed in to change notification settings - Fork 4k
Periodically check for unapplied policies on QQs #12412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
f9179d1
b408351
fabe54d
ec87ef1
ccd8548
dc9ab1d
51abb5c
df14b4a
42b58c7
3b5069f
9dc9f97
2577b7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,7 +95,8 @@ groups() -> | |
| single_active_consumer_priority, | ||
| force_shrink_member_to_current_member, | ||
| force_all_queues_shrink_member_to_current_member, | ||
| force_vhost_queues_shrink_member_to_current_member | ||
| force_vhost_queues_shrink_member_to_current_member, | ||
| policy_repair | ||
| ] | ||
| ++ all_tests()}, | ||
| {cluster_size_5, [], [start_queue, | ||
|
|
@@ -1300,6 +1301,159 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> | |
| ?assertEqual(3, length(Nodes0)) | ||
| end || Q <- QQs, VHost <- VHosts]. | ||
|
|
||
| % Tests that, if the process of a QQ is dead in the moment of declaring a policy | ||
| % that affects such queue, when the process is made available again, the policy | ||
| % will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863) | ||
| policy_repair(Config) -> | ||
| [Server0, _Server1, _Server2] = Servers = | ||
| rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||
| Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), | ||
| #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), | ||
|
|
||
| QQ = ?config(queue_name, Config), | ||
| ?assertEqual({'queue.declare_ok', QQ, 0, 0}, | ||
| declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), | ||
| RaName = ra_name(QQ), | ||
| ExpectedMaxLength1 = 10, | ||
| Priority1 = 1, | ||
| ok = rabbit_ct_broker_helpers:rpc( | ||
| Config, | ||
| 0, | ||
| rabbit_policy, | ||
| set, | ||
| [ | ||
| <<"/">>, | ||
| <<QQ/binary, "_1">>, | ||
| QQ, | ||
| [{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}], | ||
| Priority1, | ||
| <<"quorum_queues">>, | ||
| <<"acting-user">> | ||
| ]), | ||
|
|
||
| % Wait for the policy to apply | ||
| QueryFun = fun rabbit_fifo:overview/1, | ||
| ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _}, | ||
| rpc:call(Server0, ra, local_query, [RaName, QueryFun]), | ||
| ?DEFAULT_AWAIT), | ||
|
|
||
| % Check the policy has been applied | ||
| % Insert MaxLength1 + some messages but after consuming all messages only | ||
| % MaxLength1 are retrieved. | ||
| % Checking twice to ensure consistency | ||
| publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1), | ||
| % +1 because QQs let one pass | ||
| wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1), | ||
| fail = publish_confirm(Ch, QQ), | ||
| fail = publish_confirm(Ch, QQ), | ||
| consume_all(Ch, QQ), | ||
|
|
||
| % Set higher priority policy, allowing more messages | ||
| ExpectedMaxLength2 = 20, | ||
| Priority2 = 2, | ||
| ok = rabbit_ct_broker_helpers:rpc( | ||
| Config, | ||
| 0, | ||
| rabbit_policy, | ||
| set, | ||
| [ | ||
| <<"/">>, | ||
| <<QQ/binary, "_2">>, | ||
| QQ, | ||
| [{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}], | ||
| Priority2, | ||
| <<"quorum_queues">>, | ||
| <<"acting-user">> | ||
| ]), | ||
|
|
||
| % Wait for the policy to apply | ||
| ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _}, | ||
| rpc:call(Server0, ra, local_query, [RaName, QueryFun]), | ||
| ?DEFAULT_AWAIT), | ||
|
|
||
| % Check the policy has been applied | ||
| % Insert MaxLength2 + some messages but after consuming all messages only | ||
| % MaxLength2 are retrieved. | ||
| % Checking twice to ensure consistency. | ||
| % + 1 because QQs let one pass | ||
| publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1), | ||
| wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1), | ||
| fail = publish_confirm(Ch, QQ), | ||
| fail = publish_confirm(Ch, QQ), | ||
| consume_all(Ch, QQ), | ||
|
|
||
| % Ensure the queue process is unavailable | ||
| lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers), | ||
|
|
||
| % Add policy with higher priority, allowing even more messages. | ||
| ExpectedMaxLength3 = 30, | ||
| Priority3 = 3, | ||
| ok = rabbit_ct_broker_helpers:rpc( | ||
| Config, | ||
| 0, | ||
| rabbit_policy, | ||
| set, | ||
| [ | ||
| <<"/">>, | ||
| <<QQ/binary, "_3">>, | ||
| QQ, | ||
| [{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}], | ||
| Priority3, | ||
| <<"quorum_queues">>, | ||
| <<"acting-user">> | ||
| ]), | ||
|
|
||
| % Restart the queue process. | ||
| {ok, Queue} = | ||
| rabbit_ct_broker_helpers:rpc( | ||
| Config, | ||
| 0, | ||
| rabbit_amqqueue, | ||
| lookup, | ||
| [{resource, <<"/">>, queue, QQ}]), | ||
| lists:foreach( | ||
| fun(Srv) -> | ||
| rabbit_ct_broker_helpers:rpc( | ||
| Config, | ||
| Srv, | ||
| rabbit_quorum_queue, | ||
| recover, | ||
| [foo, [Queue]] | ||
| ) | ||
| end, | ||
| Servers), | ||
|
|
||
| % Wait for the queue to be available again. | ||
| lists:foreach(fun(Srv) -> | ||
| rabbit_ct_helpers:await_condition( | ||
| fun () -> | ||
| is_pid( | ||
| rabbit_ct_broker_helpers:rpc( | ||
| Config, | ||
| Srv, | ||
| erlang, | ||
| whereis, | ||
| [RaName])) | ||
| end) | ||
| end, | ||
| Servers), | ||
|
|
||
| % Wait for the policy to apply | ||
| ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _}, | ||
| rpc:call(Server0, ra, local_query, [RaName, QueryFun]), | ||
| ?DEFAULT_AWAIT), | ||
|
|
||
| % Check the policy has been applied | ||
| % Insert MaxLength3 + some messages but after consuming all messages only | ||
| % MaxLength3 are retrieved. | ||
| % Checking twice to ensure consistency. | ||
| % + 1 because QQs let one pass | ||
| publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1), | ||
| wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1), | ||
| fail = publish_confirm(Ch, QQ), | ||
| fail = publish_confirm(Ch, QQ), | ||
| consume_all(Ch, QQ). | ||
|
|
||
| priority_queue_fifo(Config) -> | ||
| %% testing: if hi priority messages are published before lo priority | ||
| %% messages they are always consumed first (fifo) | ||
|
|
@@ -4333,3 +4487,28 @@ lists_interleave([Item | Items], List) | |
| {Left, Right} = lists:split(2, List), | ||
| Left ++ [Item | lists_interleave(Items, Right)]. | ||
|
|
||
| publish_confirm_many(Ch, Queue, Count) -> | ||
| lists:foreach(fun(_) -> publish_confirm(Ch, Queue) end, lists:seq(1, Count)). | ||
|
|
||
| consume_all(Ch, QQ) -> | ||
| Consume = fun C(Acc) -> | ||
| case amqp_channel:call(Ch, #'basic.get'{queue = QQ}) of | ||
| {#'basic.get_ok'{}, Msg} -> | ||
| C([Msg | Acc]); | ||
| _ -> | ||
| Acc | ||
| end | ||
| end, | ||
| Consume([]). | ||
|
|
||
| ensure_qq_proc_dead(Config, Server, RaName) -> | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the target process recovers in fewer than 500ms, this function will loop forever.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ra supervisor has a max restart intensity of 2 restarts per 5 seconds https://github.com/rabbitmq/ra/blob/main/src/ra_server_sup.erl#L36-L37. So supervisor will give up eventually. |
||
| case rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [RaName]) of | ||
| undefined -> | ||
| ok; | ||
| Pid -> | ||
| rabbit_ct_broker_helpers:rpc(Config, Server, erlang, exit, [Pid, kill]), | ||
| %% Give some time for the supervisor to restart the process | ||
| timer:sleep(500), | ||
| ensure_qq_proc_dead(Config, Server, RaName) | ||
| end. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.