Skip to content

Commit 16772f3

Browse files
committed
QQ: implement snapshot_installed/4 callback
1 parent 9225ce1 commit 16772f3

File tree

5 files changed

+130
-25
lines changed

5 files changed

+130
-25
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
init/1,
4444
apply/3,
4545
live_indexes/1,
46+
snapshot_installed/4,
4647
state_enter/2,
4748
tick/2,
4849
overview/1,
@@ -487,7 +488,8 @@ apply(#{index := Idx} = Meta,
487488
apply(#{index := Index}, #purge{},
488489
#?STATE{messages_total = Total,
489490
returns = Returns,
490-
ra_indexes = Indexes0
491+
ra_indexes = Indexes0,
492+
msg_bytes_enqueue = MsgBytesEnqueue
491493
} = State0) ->
492494
NumReady = messages_ready(State0),
493495
Indexes = case Total of
@@ -514,7 +516,9 @@ apply(#{index := Index}, #purge{},
514516
returns = lqueue:new(),
515517
msg_bytes_enqueue = 0
516518
},
517-
Effects0 = [{aux, force_checkpoint}, garbage_collection],
519+
Effects0 = [{aux, {bytes_out, MsgBytesEnqueue}},
520+
{aux, force_checkpoint},
521+
garbage_collection],
518522
Reply = {purge, NumReady},
519523
{State, _, Effects} = evaluate_limit(Index, false, State0,
520524
State1, Effects0),
@@ -681,14 +685,56 @@ apply(_Meta, Cmd, State) ->
681685
?LOG_DEBUG("rabbit_fifo: unhandled command ~W", [Cmd, 10]),
682686
{State, ok, []}.
683687

684-
-spec live_indexes(state()) ->
685-
[ra:index()].
686-
live_indexes(#?STATE{returns = Returns,
688+
-spec live_indexes(state()) -> [ra:index()].
689+
live_indexes(#?STATE{cfg = #cfg{},
690+
returns = Returns,
687691
messages = Messages,
692+
ra_indexes = Indexes,
688693
dlx = Dlx}) ->
689694
DlxIndexes = rabbit_fifo_dlx:live_indexes(Dlx),
690695
RtnIndexes = [I || ?MSG(I, _) <- lqueue:to_list(Returns)],
691-
DlxIndexes ++ RtnIndexes ++ rabbit_fifo_q:indexes(Messages).
696+
CheckedIdxs = rabbit_fifo_index:indexes(Indexes),
697+
CheckedIdxs ++
698+
DlxIndexes ++
699+
RtnIndexes ++
700+
rabbit_fifo_q:indexes(Messages).
701+
702+
703+
-spec snapshot_installed(Meta, State, OldMeta, OldState) ->
704+
ra_machine:effects() when
705+
Meta :: ra_snapshot:meta(),
706+
State :: state(),
707+
OldMeta :: ra_snapshot:meta(),
708+
OldState :: state().
709+
snapshot_installed(_Meta, #?MODULE{cfg = #cfg{resource = QR},
710+
consumers = Consumers} = State,
711+
_OldMeta, _OldState) ->
712+
%% here we need to redliver all pending consumer messages
713+
%% to local consumers
714+
%% TODO: with some additional state (raft indexes assigned to consumer)
715+
%% we could reduce the number of resends but it is questionable if this
716+
%% complexity is worth the effort. rabbit_fifo_index will de-duplicate
717+
%% deliveries anyway
718+
SendAcc = maps:fold(
719+
fun (_ConsumerKey, #consumer{cfg = #consumer_cfg{tag = Tag,
720+
pid = Pid},
721+
checked_out = Checked},
722+
Acc) ->
723+
case node(Pid) == node() of
724+
true ->
725+
Acc#{{Tag, Pid} => maps:to_list(Checked)};
726+
false ->
727+
Acc
728+
end
729+
end, #{}, Consumers),
730+
?LOG_DEBUG("~ts: rabbit_fifo: install snapshot sending ~p",
731+
[rabbit_misc:rs(QR), SendAcc]),
732+
Effs = add_delivery_effects([], SendAcc, State),
733+
?LOG_DEBUG("~ts: rabbit_fifo: effs ~p",
734+
[rabbit_misc:rs(QR), Effs]),
735+
Effs.
736+
737+
692738

693739
convert_v3_to_v4(#{} = _Meta, StateV3) ->
694740
%% TODO: consider emitting release cursors as checkpoints
@@ -3026,28 +3072,29 @@ priority_tag(Msg) ->
30263072
end.
30273073

30283074

3029-
do_checkpoints(MacVer, Ts, #checkpoint{index = _ChIdx,
3030-
timestamp = _SnapTime},
3075+
do_checkpoints(MacVer, Ts, #checkpoint{},
30313076
RaAux, BytesIn, BytesOut, Force) when MacVer >= 8 ->
30323077
do_checkpoints(MacVer, Ts, #snapshot{}, RaAux, BytesIn, BytesOut, Force);
30333078
do_checkpoints(MacVer, Ts, #snapshot{index = _ChIdx,
3034-
timestamp = SnapTime,
3035-
bytes_out = LastBytesOut} = Snap0,
3036-
RaAux, _BytesIn, BytesOut, _Force) when MacVer >= 8 ->
3079+
timestamp = SnapTime,
3080+
bytes_out = LastBytesOut} = Snap0,
3081+
RaAux, _BytesIn, BytesOut, Force) when MacVer >= 8 ->
30373082
LastAppliedIdx = ra_aux:last_applied(RaAux),
30383083
#?STATE{} = MacState = ra_aux:machine_state(RaAux),
30393084
TimeSince = Ts - SnapTime,
30403085
MsgsTot = messages_total(MacState),
3041-
ra_aux:overview(RaAux),
3086+
% ra_aux:overview(RaAux),
30423087
% MaxBytesFactor = max(1, MsgsTot / CheckMaxIndexes),
3088+
% TODO: snapshots also need to be triggered by non settled commands
3089+
% that aren't enqueues
30433090
EnoughDataRemoved = BytesOut - LastBytesOut > ?SNAP_OUT_BYTES,
30443091
{CheckMinInterval, _CheckMinIndexes, _CheckMaxIndexes} =
30453092
persistent_term:get(quorum_queue_checkpoint_config,
30463093
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
30473094
?CHECK_MAX_INDEXES}),
30483095
EnoughTimeHasPassed = TimeSince > CheckMinInterval,
3049-
case (EnoughTimeHasPassed andalso
3050-
EnoughDataRemoved) of
3096+
case (EnoughTimeHasPassed andalso EnoughDataRemoved) orelse
3097+
Force of
30513098
true ->
30523099
{#snapshot{index = LastAppliedIdx,
30533100
timestamp = Ts,

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,8 @@ handle_ra_event(QName, Leader, {applied, Seqs},
687687
end;
688688
handle_ra_event(QName, From, {machine, Del}, State0)
689689
when element(1, Del) == delivery ->
690+
?LOG_DEBUG("~ts: handle delivery , ",
691+
[rabbit_misc:rs(QName)]),
690692
handle_delivery(QName, From, Del, State0);
691693
handle_ra_event(_QName, _From, {machine, Action}, State)
692694
when element(1, Action) =:= credit_reply orelse

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,7 +1998,7 @@ make_ra_conf(Q, ServerId, Membership, MacVersion)
19981998
Membership, MacVersion).
19991999

20002000
make_ra_conf(Q, ServerId, TickTimeout,
2001-
SnapshotInterval, CheckpointInterval,
2001+
_SnapshotInterval, CheckpointInterval,
20022002
Membership, MacVersion) ->
20032003
QName = amqqueue:get_name(Q),
20042004
#resource{name = QNameBin} = QName,
@@ -2009,7 +2009,6 @@ make_ra_conf(Q, ServerId, TickTimeout,
20092009
Formatter = {?MODULE, format_ra_event, [QName]},
20102010
LogCfg = #{uid => UId,
20112011
min_snapshot_interval => 0,
2012-
snapshot_interval => SnapshotInterval,
20132012
min_checkpoint_interval => CheckpointInterval,
20142013
max_checkpoints => 3},
20152014
rabbit_misc:maps_put_truthy(membership, Membership,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ groups() ->
105105
force_checkpoint,
106106
policy_repair,
107107
gh_12635,
108-
replica_states
108+
replica_states,
109+
consumer_message_is_delevered_after_snapshot
109110
]
110111
++ all_tests()},
111112
{cluster_size_5, [], [start_queue,
@@ -1461,19 +1462,19 @@ force_checkpoint_on_queue(Config) ->
14611462
rabbit_ct_helpers:await_condition(
14621463
fun() ->
14631464
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1464-
#{log := #{latest_checkpoint_index := LCI}} = State,
1465+
#{log := #{snapshot_index := LCI}} = State,
14651466
LCI =:= undefined
14661467
end),
14671468
rabbit_ct_helpers:await_condition(
14681469
fun() ->
14691470
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
1470-
#{log := #{latest_checkpoint_index := LCI}} = State,
1471+
#{log := #{snapshot_index := LCI}} = State,
14711472
LCI =:= undefined
14721473
end),
14731474
rabbit_ct_helpers:await_condition(
14741475
fun() ->
14751476
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
1476-
#{log := #{latest_checkpoint_index := LCI}} = State,
1477+
#{log := #{snapshot_index := LCI}} = State,
14771478
LCI =:= undefined
14781479
end),
14791480

@@ -1490,21 +1491,21 @@ force_checkpoint_on_queue(Config) ->
14901491
fun() ->
14911492
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
14921493
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1493-
#{log := #{latest_checkpoint_index := LCI}} = State,
1494+
#{log := #{snapshot_index := LCI}} = State,
14941495
(LCI =/= undefined) andalso (LCI >= N)
14951496
end),
14961497
rabbit_ct_helpers:await_condition(
14971498
fun() ->
14981499
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
14991500
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1500-
#{log := #{latest_checkpoint_index := LCI}} = State,
1501+
#{log := #{snapshot_index := LCI}} = State,
15011502
(LCI =/= undefined) andalso (LCI >= N)
15021503
end),
15031504
rabbit_ct_helpers:await_condition(
15041505
fun() ->
15051506
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
15061507
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1507-
#{log := #{latest_checkpoint_index := LCI}} = State,
1508+
#{log := #{snapshot_index := LCI}} = State,
15081509
(LCI =/= undefined) andalso (LCI >= N)
15091510
end).
15101511

@@ -1780,6 +1781,60 @@ dont_leak_file_handles(Config) ->
17801781
rabbit_ct_client_helpers:close_channel(C),
17811782
ok.
17821783

1784+
consumer_message_is_delevered_after_snapshot(Config) ->
1785+
%% a consumer on a node that received a snapshot should have it's messages
1786+
%% delivered
1787+
[Server0, _Server1, Server2] =
1788+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1789+
1790+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
1791+
[rabbit, quorum_min_checkpoint_interval, 1]),
1792+
1793+
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
1794+
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
1795+
QQ = ?config(queue_name, Config),
1796+
RaName = ra_name(QQ),
1797+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1798+
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1799+
1800+
%% stop server on a follower node
1801+
ok = rpc:call(Server2, ra, stop_server, [quorum_queues, {RaName, Server2}]),
1802+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
1803+
%% create a consumer
1804+
qos(Ch2, 2, false),
1805+
subscribe(Ch2, QQ, false),
1806+
1807+
%% publish some messages and make sure a snapshot has been taken
1808+
Msg = crypto:strong_rand_bytes(13_000),
1809+
1810+
[publish(Ch0, QQ, Msg) || _ <- lists:seq(1, 5000)],
1811+
amqp_channel:wait_for_confirms(Ch0, 5),
1812+
%% need to sleep here a bit as QQs wont take
1813+
%% snapshots more often than once every second
1814+
timer:sleep(1100),
1815+
1816+
%% then purge
1817+
#'queue.purge_ok'{} = amqp_channel:call(Ch0, #'queue.purge'{queue = QQ}),
1818+
1819+
rabbit_ct_helpers:await_condition_ignoring_exceptions(
1820+
fun () ->
1821+
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview,
1822+
[{RaName, Server0}]),
1823+
undefined =/= maps:get(snapshot_index, Log)
1824+
end),
1825+
%% restart stopped member
1826+
ok = rpc:call(Server2, ra, restart_server, [quorum_queues, {RaName, Server2}]),
1827+
1828+
%% messages should be delivered
1829+
receive
1830+
{#'basic.deliver'{delivery_tag = _DeliveryTag}, _} ->
1831+
ok
1832+
after 30000 ->
1833+
flush(1),
1834+
ct:fail("expected messages were not delivered")
1835+
end,
1836+
ok.
1837+
17831838
gh_12635(Config) ->
17841839
check_quorum_queues_v4_compat(Config),
17851840

@@ -1788,7 +1843,7 @@ gh_12635(Config) ->
17881843
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
17891844

17901845
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
1791-
[rabbit, quorum_min_checkpoint_interval, 1]),
1846+
[rabbit, quorum_snapshot_interval, 1]),
17921847

17931848
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
17941849
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
@@ -1810,7 +1865,7 @@ gh_12635(Config) ->
18101865
rabbit_ct_helpers:await_condition(
18111866
fun () ->
18121867
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1813-
undefined =/= maps:get(latest_checkpoint_index, Log)
1868+
undefined =/= maps:get(snapshot_index, Log)
18141869
end),
18151870

18161871
%% publish 1 more message

deps/rabbit/test/rabbit_fifo_q_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ basics(_Config) ->
7777
{no, ?MSG(4)},
7878
{hi, ?MSG(5)}
7979
]),
80+
81+
?assertEqual([1,2,3,4,5], lists:sort(rabbit_fifo_q:indexes(Q1))),
8082
{?MSG(1), Q2} = rabbit_fifo_q:out(Q1),
8183
{?MSG(3), Q3} = rabbit_fifo_q:out(Q2),
8284
{?MSG(2), Q4} = rabbit_fifo_q:out(Q3),

0 commit comments

Comments
 (0)