Skip to content

Commit 463c479

Browse files
Merge pull request #2831 from rabbitmq/rabbitmq-server-2756-3.8.x
Add federation support for quorum queues
2 parents d428d12 + 315d162 commit 463c479

File tree

8 files changed

+233
-91
lines changed

8 files changed

+233
-91
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1853,6 +1853,8 @@ basic_cancel(Q, ChPid,
18531853
notify_decorators(Q) when ?amqqueue_is_classic(Q) ->
18541854
QPid = amqqueue:get_pid(Q),
18551855
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]});
1856+
notify_decorators(Q) when ?amqqueue_is_quorum(Q) ->
1857+
rabbit_quorum_queue:notify_decorators(Q);
18561858
notify_decorators(_Q) ->
18571859
%% Not supported by any other queue type
18581860
ok.

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
query_single_active_consumer/1,
4242
query_in_memory_usage/1,
4343
query_peek/2,
44+
query_notify_decorators_info/1,
4445
usage/1,
4546

4647
zero/1,
@@ -241,7 +242,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
241242
{State1, ok, Effects} =
242243
checkout(Meta, State0,
243244
State0#?MODULE{service_queue = ServiceQueue,
244-
consumers = Cons}, []),
245+
consumers = Cons}, [], false),
245246
Response = {send_credit_reply, messages_ready(State1)},
246247
%% by this point all checkouts for the updated credit value
247248
%% should be processed so we can evaluate the drain
@@ -299,7 +300,8 @@ apply(#{index := Index,
299300
Exists = maps:is_key(ConsumerId, Consumers),
300301
case messages_ready(State0) of
301302
0 ->
302-
update_smallest_raft_index(Index, {dequeue, empty}, State0, []);
303+
update_smallest_raft_index(Index, {dequeue, empty}, State0,
304+
[notify_decorators_effect(State0)]);
303305
_ when Exists ->
304306
%% a dequeue using the same consumer_id isn't possible at this point
305307
{State0, {dequeue, empty}};
@@ -330,8 +332,8 @@ apply(#{index := Index,
330332
{{dequeue, {MsgId, Msg}, Ready-1}, Effects1}
331333

332334
end,
333-
334-
case evaluate_limit(Index, false, State0, State4, Effects2) of
335+
NotifyEffect = notify_decorators_effect(State4),
336+
case evaluate_limit(Index, false, State0, State4, [NotifyEffect | Effects2]) of
335337
{State, true, Effects} ->
336338
update_smallest_raft_index(Index, Reply, State, Effects);
337339
{State, false, Effects} ->
@@ -456,6 +458,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
456458
% Monitor the node so that we can "unsuspect" these processes when the node
457459
% comes back, then re-issue all monitors and discover the final fate of
458460
% these processes
461+
459462
Effects = case maps:size(State#?MODULE.consumers) of
460463
0 ->
461464
[{aux, inactive}, {monitor, node, Node}];
@@ -959,6 +962,21 @@ query_peek(Pos, State0) when Pos > 0 ->
959962
query_peek(Pos-1, State)
960963
end.
961964

965+
query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) ->
966+
MaxActivePriority = maps:fold(fun(_, #consumer{credit = C,
967+
status = up,
968+
priority = P0}, MaxP) when C > 0 ->
969+
P = -P0,
970+
case MaxP of
971+
empty -> P;
972+
MaxP when MaxP > P -> MaxP;
973+
_ -> P
974+
end;
975+
(_, _, MaxP) ->
976+
MaxP
977+
end, empty, Consumers),
978+
IsEmpty = (messages_ready(State) == 0),
979+
{MaxActivePriority, IsEmpty}.
962980

963981
-spec usage(atom()) -> float().
964982
usage(Name) when is_atom(Name) ->
@@ -1065,11 +1083,13 @@ cancel_consumer0(Meta, ConsumerId,
10651083
#{ConsumerId := Consumer} ->
10661084
{S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
10671085
S0, Effects0, Reason),
1086+
10681087
%% The effects are emitted before the consumer is actually removed
10691088
%% if the consumer has unacked messages. This is a bit weird but
10701089
%% in line with what classic queues do (from an external point of
10711090
%% view)
10721091
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
1092+
10731093
case maps:size(S#?MODULE.consumers) of
10741094
0 ->
10751095
{S, [{aux, inactive} | Effects]};
@@ -1132,7 +1152,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
11321152
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
11331153
{ok, State1, Effects1} ->
11341154
State2 = append_to_master_index(RaftIdx, State1),
1135-
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
1155+
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
11361156
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
11371157
{duplicate, State, Effects} ->
11381158
{State, ok, Effects}
@@ -1290,7 +1310,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
12901310
_ ->
12911311
State1
12921312
end,
1293-
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
1313+
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
12941314
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
12951315

12961316
% used to processes messages that are finished
@@ -1334,7 +1354,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
13341354
Discarded = maps:with(MsgIds, Checked0),
13351355
{State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
13361356
Effects0, State0),
1337-
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
1357+
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
13381358
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
13391359

13401360
dead_letter_effects(_Reason, _Discarded,
@@ -1366,9 +1386,10 @@ dead_letter_effects(Reason, Discarded,
13661386
end} | Effects].
13671387

13681388
cancel_consumer_effects(ConsumerId,
1369-
#?MODULE{cfg = #cfg{resource = QName}}, Effects) ->
1389+
#?MODULE{cfg = #cfg{resource = QName}} = State, Effects) ->
13701390
[{mod_call, rabbit_quorum_queue,
1371-
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
1391+
cancel_consumer_handler, [QName, ConsumerId]},
1392+
notify_decorators_effect(State) | Effects].
13721393

13731394
update_smallest_raft_index(Idx, State, Effects) ->
13741395
update_smallest_raft_index(Idx, ok, State, Effects).
@@ -1503,14 +1524,30 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
15031524
end, {State, Effects0}, Checked).
15041525

15051526
%% checkout new messages to consumers
1506-
checkout(#{index := Index} = Meta, OldState, State0, Effects0) ->
1527+
checkout(Meta, OldState, State, Effects) ->
1528+
checkout(Meta, OldState, State, Effects, true).
1529+
1530+
checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0,
1531+
Effects0, HandleConsumerChanges) ->
15071532
{State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
15081533
Effects0, {#{}, #{}}),
15091534
case evaluate_limit(Index, false, OldState, State1, Effects1) of
15101535
{State, true, Effects} ->
1511-
update_smallest_raft_index(Index, State, Effects);
1536+
case maybe_notify_decorators(State, HandleConsumerChanges) of
1537+
{true, {MaxActivePriority, IsEmpty}} ->
1538+
NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty),
1539+
update_smallest_raft_index(Index, State, [NotifyEffect | Effects]);
1540+
false ->
1541+
update_smallest_raft_index(Index, State, Effects)
1542+
end;
15121543
{State, false, Effects} ->
1513-
{State, ok, Effects}
1544+
case maybe_notify_decorators(State, HandleConsumerChanges) of
1545+
{true, {MaxActivePriority, IsEmpty}} ->
1546+
NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty),
1547+
{State, ok, [NotifyEffect | Effects]};
1548+
false ->
1549+
{State, ok, Effects}
1550+
end
15141551
end.
15151552

15161553
checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
@@ -2129,3 +2166,16 @@ get_priority_from_args(#{args := Args}) ->
21292166
end;
21302167
get_priority_from_args(_) ->
21312168
0.
2169+
2170+
maybe_notify_decorators(_, false) ->
2171+
false;
2172+
maybe_notify_decorators(State, _) ->
2173+
{true, query_notify_decorators_info(State)}.
2174+
2175+
notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) ->
2176+
{MaxActivePriority, IsEmpty} = query_notify_decorators_info(State),
2177+
notify_decorators_effect(QName, MaxActivePriority, IsEmpty).
2178+
2179+
notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
2180+
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
2181+
[QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
repair_amqqueue_nodes/2
4343
]).
4444
-export([reclaim_memory/2]).
45+
-export([notify_decorators/1,
46+
notify_decorators/3,
47+
spawn_notify_decorators/3]).
4548

4649
-include_lib("stdlib/include/qlc.hrl").
4750
-include("rabbit.hrl").
@@ -136,6 +139,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
136139
ra_machine_config(NewQ)),
137140
%% force a policy change to ensure the latest config is
138141
%% updated even when running the machine version from 0
142+
notify_decorators(QName, startup),
139143
rabbit_event:notify(queue_created,
140144
[{name, QName},
141145
{durable, Durable},
@@ -311,17 +315,13 @@ filter_quorum_critical(Queues, ReplicaStates) ->
311315

312316
-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
313317
is_policy_applicable(_Q, Policy) ->
314-
Applicable = [<<"max-length">>,
315-
<<"max-length-bytes">>,
316-
<<"overflow">>,
317-
<<"expires">>,
318-
<<"max-in-memory-length">>,
319-
<<"max-in-memory-bytes">>,
320-
<<"delivery-limit">>,
321-
<<"dead-letter-exchange">>,
322-
<<"dead-letter-routing-key">>],
318+
NotApplicable = [ %% Classic policies
319+
<<"message-ttl">>, <<"max-priority">>, <<"queue-mode">>,
320+
<<"single-active-consumer">>, <<"ha-mode">>, <<"ha-params">>,
321+
<<"ha-sync-mode">>, <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
322+
<<"queue-master-locator">>],
323323
lists:all(fun({P, _}) ->
324-
lists:member(P, Applicable)
324+
not lists:member(P, NotApplicable)
325325
end, Policy).
326326

327327
rpc_delete_metrics(QName) ->
@@ -335,6 +335,11 @@ spawn_deleter(QName) ->
335335
delete(Q, false, false, <<"expired">>)
336336
end).
337337

338+
spawn_notify_decorators(QName, Fun, Args) ->
339+
spawn(fun () ->
340+
notify_decorators(QName, Fun, Args)
341+
end).
342+
338343
handle_tick(QName,
339344
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
340345
Nodes) ->
@@ -525,6 +530,7 @@ delete(Q,
525530
after Timeout ->
526531
ok = force_delete_queue(Servers)
527532
end,
533+
notify_decorators(QName, shutdown),
528534
ok = delete_queue_data(QName, ActingUser),
529535
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
530536
?RPC_TIMEOUT),
@@ -546,6 +552,7 @@ delete(Q,
546552
" Attempting force delete.",
547553
[rabbit_misc:rs(QName), Errs]),
548554
ok = force_delete_queue(Servers),
555+
notify_decorators(QName, shutdown),
549556
delete_queue_data(QName, ActingUser),
550557
{ok, ReadyMsgs}
551558
end
@@ -1525,3 +1532,26 @@ update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
15251532
overflow(undefined, Def) -> Def;
15261533
overflow(<<"reject-publish">>, _Def) -> reject_publish;
15271534
overflow(<<"drop-head">>, _Def) -> drop_head.
1535+
1536+
-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
1537+
notify_decorators(Q) when ?is_amqqueue(Q) ->
1538+
QName = amqqueue:get_name(Q),
1539+
QPid = amqqueue:get_pid(Q),
1540+
case ra:local_query(QPid, fun rabbit_fifo:query_notify_decorators_info/1) of
1541+
{ok, {_, {MaxActivePriority, IsEmpty}}, _} ->
1542+
notify_decorators(QName, consumer_state_changed, [MaxActivePriority, IsEmpty]);
1543+
_ -> ok
1544+
end.
1545+
1546+
notify_decorators(QName, Event) ->
1547+
notify_decorators(QName, Event, []).
1548+
1549+
notify_decorators(QName, F, A) ->
1550+
%% Look up again in case policy and hence decorators have changed
1551+
case rabbit_amqqueue:lookup(QName) of
1552+
{ok, Q} ->
1553+
Ds = amqqueue:get_decorators(Q),
1554+
[ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)];
1555+
{error, not_found} ->
1556+
ok
1557+
end.

0 commit comments

Comments
 (0)