Skip to content

Commit 34d0356

Browse files
Merge pull request #8282 from rabbitmq/qq-queue-federation-fixes
Fix quorum queue queue federation bugs
2 parents 0d85941 + 509ec9a commit 34d0356

File tree

5 files changed

+41
-23
lines changed

5 files changed

+41
-23
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -861,8 +861,10 @@ state_enter0(leader, #?MODULE{consumers = Cons,
861861
Mons = [{monitor, process, P} || P <- Pids],
862862
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
863863
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
864-
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
865-
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation,
864+
FHReservation = [{mod_call, rabbit_quorum_queue,
865+
file_handle_leader_reservation, [Resource]}],
866+
NotifyDecs = notify_decorators_startup(Resource),
867+
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs],
866868
case BLH of
867869
undefined ->
868870
Effects;
@@ -1299,9 +1301,8 @@ query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) ->
12991301
MaxActivePriority = maps:fold(
13001302
fun(_, #consumer{credit = C,
13011303
status = up,
1302-
cfg = #consumer_cfg{priority = P0}},
1304+
cfg = #consumer_cfg{priority = P}},
13031305
MaxP) when C > 0 ->
1304-
P = -P0,
13051306
case MaxP of
13061307
empty -> P;
13071308
MaxP when MaxP > P -> MaxP;
@@ -2477,6 +2478,10 @@ notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
24772478
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
24782479
[QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}.
24792480

2481+
notify_decorators_startup(QName) ->
2482+
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
2483+
[QName, startup, []]}.
2484+
24802485
convert(To, To, State) ->
24812486
State;
24822487
convert(0, To, State) ->

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -655,11 +655,10 @@ state_enter_file_handle_leader_reservation_test(_) ->
655655

656656
Resource = {resource, <<"/">>, queue, <<"test">>},
657657
Effects = rabbit_fifo:state_enter(leader, S0),
658-
?assertMatch([
659-
{mod_call, m, f, [a, the_name]},
660-
_Timer,
661-
{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
662-
], Effects),
658+
?assertMatch([{mod_call, m, f, [a, the_name]},
659+
_Timer,
660+
{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
661+
| _], Effects),
663662
ok.
664663

665664
state_enter_file_handle_other_reservation_test(_) ->
@@ -1161,20 +1160,22 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(C) ->
11611160
Meta = meta(C, 1),
11621161
% adding some consumers
11631162
AddConsumer = fun({CTag, ChannelId}, State) ->
1164-
{NewState, _, _} = apply(
1165-
Meta,
1166-
make_checkout({CTag, ChannelId},
1167-
{once, 1, simple_prefetch}, #{}),
1168-
State),
1169-
NewState
1163+
{NewState, _, _} = apply(
1164+
Meta,
1165+
make_checkout({CTag, ChannelId},
1166+
{once, 1, simple_prefetch}, #{}),
1167+
State),
1168+
NewState
11701169
end,
1171-
State1 = lists:foldl(AddConsumer, State0,
1172-
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
1170+
State1 = lists:foldl(AddConsumer, State0, [{<<"ctag1">>, Pid1},
1171+
{<<"ctag2">>, Pid2},
1172+
{<<"ctag3">>, Pid2},
1173+
{<<"ctag4">>, Pid3}]),
11731174

11741175
Effects = rabbit_fifo:state_enter(leader, State1),
11751176
%% 2 effects for each consumer process (channel process), 1 effect for the node,
11761177
%% 1 effect for file handle reservation
1177-
?assertEqual(2 * 3 + 1 + 1 + 1, length(Effects)).
1178+
?assertEqual(2 * 3 + 1 + 1 + 1 + 1, length(Effects)).
11781179

11791180
single_active_consumer_state_enter_eol_include_waiting_consumers_test(C) ->
11801181
Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),

deps/rabbitmq_federation/src/rabbit_federation_link_util.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,13 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
140140
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
141141
rabbit_federation_status:report(
142142
Upstream, UParams, XorQName, clean_reason(E)),
143+
Reason = case E of
144+
{error, Value} -> Value;
145+
Other -> Other
146+
end,
143147
log_warning(XorQName, "did not connect to ~ts. Reason: ~tp",
144148
[rabbit_federation_upstream:params_to_string(UParams),
145-
E]),
149+
Reason]),
146150
{stop, {shutdown, restart}, State};
147151

148152
connection_error(remote, E, Upstream, UParams, XorQName, State) ->

deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,11 @@ handle_cast(pause, State = #state{run = false}) ->
104104
handle_cast(pause, State = #not_started{}) ->
105105
{noreply, State#not_started{run = false}};
106106

107-
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) ->
107+
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream = #upstream{
108+
name = UpName, queue_name = QName
109+
}}) ->
110+
rabbit_log_federation:debug("Federation link of ~s (upstream: '~s'): asked to pause",
111+
[QName, UpName]),
108112
cancel(Ch, Upstream),
109113
{noreply, State#state{run = false}};
110114

@@ -305,18 +309,22 @@ visit_match(_ ,_) ->
305309
consumer_tag(#upstream{consumer_tag = ConsumerTag}) ->
306310
ConsumerTag.
307311

308-
consume(Ch, Upstream, UQueue) ->
312+
consume(Ch, Upstream = #upstream{name = UpName}, UQueue) ->
309313
ConsumerTag = consumer_tag(Upstream),
310314
NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
315+
rabbit_log_federation:debug("Federation link of ~ts: will consume from the upstream '~ts'",
316+
[rabbit_misc:rs(amqqueue:get_name(UQueue)), UpName]),
311317
amqp_channel:cast(
312318
Ch, #'basic.consume'{queue = name(UQueue),
313319
no_ack = NoAck,
314320
nowait = true,
315321
consumer_tag = ConsumerTag,
316322
arguments = [{<<"x-priority">>, long, -1}]}).
317323

318-
cancel(Ch, Upstream) ->
324+
cancel(Ch, Upstream = #upstream{name = UpName, queue_name = QName}) ->
319325
ConsumerTag = consumer_tag(Upstream),
326+
rabbit_log_federation:debug("Federation queue '~ts' link: will cancel consumer '~ts' on upstream '~ts'",
327+
[QName, ConsumerTag, UpName]),
320328
amqp_channel:cast(Ch, #'basic.cancel'{nowait = true,
321329
consumer_tag = ConsumerTag}).
322330

deps/rabbitmq_federation/src/rabbit_federation_upstream.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ params_table(SafeURI, XorQ) ->
6161

6262
params_to_string(#upstream_params{safe_uri = SafeURI,
6363
x_or_q = XorQ}) ->
64-
print("~ts on ~ts", [rabbit_misc:rs(r(XorQ)), SafeURI]).
64+
print("~ts on '~ts'", [rabbit_misc:rs(r(XorQ)), SafeURI]).
6565

6666
remove_credentials(URI) ->
6767
list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))).

0 commit comments

Comments
 (0)