Skip to content

Commit 4100450

Browse files
authored
Merge pull request #14698 from rabbitmq/mergify/bp/v4.2.x/pr-14672
Use tick-related timeout to repair leader record (backport #14672)
2 parents 0d826e7 + e8afe2b commit 4100450

File tree

3 files changed

+40
-12
lines changed

3 files changed

+40
-12
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
3535
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2]).
3636
-export([on_node_up/1, on_node_down/1]).
37-
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
37+
-export([update/2, update/3, store_queue/1, update_decorators/2, policy_changed/2]).
3838
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
3939
-export([is_match/2, is_in_virtual_host/2]).
4040
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
@@ -298,12 +298,18 @@ do_internal_declare(Q0, false) ->
298298
Queue = rabbit_queue_decorator:set(Q),
299299
rabbit_db_queue:create_or_get(Queue).
300300

301-
-spec update
302-
(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
303-
'not_found' | amqqueue:amqqueue().
301+
-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
302+
'not_found' | amqqueue:amqqueue().
304303

305304
update(Name, Fun) ->
306-
rabbit_db_queue:update(Name, Fun).
305+
update(Name, Fun, #{}).
306+
307+
-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue()),
308+
#{timeout => timeout()}) ->
309+
'not_found' | amqqueue:amqqueue().
310+
311+
update(Name, Fun, Options) ->
312+
rabbit_db_queue:update(Name, Fun, Options).
307313

308314
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
309315
Queue :: amqqueue:amqqueue(),

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
set/1,
3232
delete/2,
3333
update/2,
34+
update/3,
3435
update_decorators/2,
3536
exists/1,
3637
foreach/2
@@ -637,9 +638,23 @@ get_many_in_ets(Table, Names) ->
637638
%% @private
638639

639640
update(QName, Fun) ->
641+
update(QName, Fun, #{}).
642+
643+
-spec update(QName, UpdateFun, Options) -> Ret when
644+
QName :: rabbit_amqqueue:name(),
645+
Queue :: amqqueue:amqqueue(),
646+
UpdateFun :: fun((Queue) -> NewQueue),
647+
NewQueue :: amqqueue:amqqueue(),
648+
Options :: #{timeout => timeout()},
649+
Ret :: Queue | not_found.
650+
%% @doc Updates an existing queue record using `UpdateFun'.
651+
%%
652+
%% @private
653+
654+
update(QName, Fun, Options) ->
640655
rabbit_khepri:handle_fallback(
641656
#{mnesia => fun() -> update_in_mnesia(QName, Fun) end,
642-
khepri => fun() -> update_in_khepri(QName, Fun) end
657+
khepri => fun() -> update_in_khepri(QName, Fun, Options) end
643658
}).
644659

645660
update_in_mnesia(QName, Fun) ->
@@ -648,15 +663,19 @@ update_in_mnesia(QName, Fun) ->
648663
update_in_mnesia_tx(QName, Fun)
649664
end).
650665

666+
651667
update_in_khepri(QName, Fun) ->
668+
update_in_khepri(QName, Fun, #{}).
669+
670+
update_in_khepri(QName, Fun, Options) ->
652671
Path = khepri_queue_path(QName),
653-
Ret1 = rabbit_khepri:adv_get(Path),
672+
Ret1 = rabbit_khepri:adv_get(Path, Options),
654673
case Ret1 of
655674
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
656675
UpdatePath = khepri_path:combine_with_conditions(
657676
Path, [#if_payload_version{version = Vsn}]),
658677
Q1 = Fun(Q),
659-
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
678+
Ret2 = rabbit_khepri:put(UpdatePath, Q1, Options),
660679
case Ret2 of
661680
ok -> Q1;
662681
{error, {khepri, mismatching_node, _}} ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,8 @@ become_leader0(QName, Name) ->
446446
amqqueue:set_pid(Q1, {Name, node()}),
447447
live)
448448
end,
449-
_ = rabbit_amqqueue:update(QName, Fun),
449+
Timeout = max(tick_interval() - 1000, 1000),
450+
_ = rabbit_amqqueue:update(QName, Fun, #{timeout => Timeout}),
450451
case rabbit_amqqueue:lookup(QName) of
451452
{ok, Q0} when ?is_amqqueue(Q0) ->
452453
Nodes = get_nodes(Q0),
@@ -656,7 +657,7 @@ handle_tick(QName,
656657
ok;
657658
repaired ->
658659
?LOG_DEBUG("Repaired quorum queue ~ts amqqueue record",
659-
[rabbit_misc:rs(QName)])
660+
[rabbit_misc:rs(QName)])
660661
end,
661662
ExpectedNodes = rabbit_nodes:list_members(),
662663
case Nodes -- ExpectedNodes of
@@ -1981,8 +1982,7 @@ make_ra_conf(Q, ServerId) ->
19811982

19821983
make_ra_conf(Q, ServerId, Membership, MacVersion)
19831984
when is_integer(MacVersion) ->
1984-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1985-
?TICK_INTERVAL),
1985+
TickTimeout = tick_interval(),
19861986
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
19871987
?SNAPSHOT_INTERVAL),
19881988
CheckpointInterval = application:get_env(rabbit,
@@ -2408,3 +2408,6 @@ queue_vm_stats_sups() ->
24082408
queue_vm_ets() ->
24092409
{[quorum_ets],
24102410
[[ra_log_ets]]}.
2411+
2412+
tick_interval() ->
2413+
application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).

0 commit comments

Comments
 (0)