Skip to content

Commit af74ad6

Browse files
committed
fixes
1 parent f81afc1 commit af74ad6

File tree

4 files changed

+57
-9
lines changed

4 files changed

+57
-9
lines changed

src/ra_server.erl

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
promote_checkpoint/2,
5252
checkpoint/3,
5353
persist_last_applied/1,
54+
peers/1,
5455
update_peer/3,
5556
update_disconnected_peers/3,
5657
handle_down/5,
@@ -168,7 +169,10 @@
168169
{cast, ra_server_id(), term()} |
169170
{send_vote_requests, [{ra_server_id(),
170171
#request_vote_rpc{} | #pre_vote_rpc{}}]} |
171-
{send_rpc, ra_server_id(), #append_entries_rpc{}} |
172+
{send_rpc, ra_server_id(),
173+
#append_entries_rpc{} |
174+
#heartbeat_rpc{} |
175+
#info_rpc{}} |
172176
{send_snapshot, To :: ra_server_id(),
173177
{Module :: module(), Ref :: term(),
174178
LeaderId :: ra_server_id(), Term :: ra_term()}} |
@@ -477,7 +481,7 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
477481
?WARN("~ts: saw append_entries_reply from unknown peer ~w",
478482
[LogId, PeerId]),
479483
{leader, State0, []};
480-
Peer0 = #{match_index := MI, next_index := NI} ->
484+
#{match_index := MI, next_index := NI} = Peer0 ->
481485
Peer = Peer0#{match_index => max(MI, LastIdx),
482486
next_index => max(NI, NextIdx)},
483487
State1 = put_peer(PeerId, Peer, State0),
@@ -1639,15 +1643,19 @@ handle_receive_snapshot(#append_entries_rpc{term = Term} = Msg,
16391643
{follower, update_term(Term, clear_leader_id(State#{log => Log})),
16401644
[{next_event, Msg}]};
16411645
handle_receive_snapshot({ra_log_event, Evt},
1642-
State = #{cfg := #cfg{id = _Id, log_id = LogId},
1643-
log := Log0}) ->
1646+
#{cfg := #cfg{log_id = LogId},
1647+
log := Log0} = State) ->
16441648
?DEBUG("~ts: ~s ra_log_event received: ~w",
16451649
[LogId, ?FUNCTION_NAME, Evt]),
16461650
% simply forward all other events to ra_log
16471651
% whilst the snapshot is being received
16481652
{Log, Effects} = ra_log:handle_event(Evt, Log0),
16491653
{receive_snapshot, State#{log => Log}, Effects};
1650-
handle_receive_snapshot(receive_snapshot_timeout, #{log := Log0} = State) ->
1654+
handle_receive_snapshot(receive_snapshot_timeout,
1655+
#{cfg := #cfg{log_id = LogId},
1656+
log := Log0} = State) ->
1657+
?INFO("~ts: ~s receive snapshot timed out.",
1658+
[LogId, ?FUNCTION_NAME]),
16511659
SnapState0 = ra_log:snapshot_state(Log0),
16521660
SnapState = ra_snapshot:abort_accept(SnapState0),
16531661
Log = ra_log:set_snapshot_state(SnapState, Log0),
@@ -3312,7 +3320,8 @@ heartbeat_rpc_effects(Peers, Id, Term, QueryIndex) ->
33123320
end,
33133321
maps:to_list(Peers)).
33143322

3315-
heartbeat_rpc_effect_for_peer(PeerId, Peer, Id, Term, QueryIndex) ->
3323+
heartbeat_rpc_effect_for_peer(PeerId, #{status := normal} = Peer,
3324+
Id, Term, QueryIndex) ->
33163325
case maps:get(query_index, Peer, 0) < QueryIndex of
33173326
true ->
33183327
{true,
@@ -3322,7 +3331,9 @@ heartbeat_rpc_effect_for_peer(PeerId, Peer, Id, Term, QueryIndex) ->
33223331
leader_id = Id}}};
33233332
false ->
33243333
false
3325-
end.
3334+
end;
3335+
heartbeat_rpc_effect_for_peer(_PeerId, _Peer, _Id, _Term, _QueryIndex) ->
3336+
false.
33263337

33273338
heartbeat_rpc_quorum(NewQueryIndex, PeerId,
33283339
#{queries_waiting_heartbeats := Waiting0} = State) ->

src/ra_server_proc.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,6 +1412,8 @@ handle_effect(_RaftState, {send_rpc, To, Rpc}, _,
14121412
incr_counter(Conf, ?C_RA_SRV_MSGS_SENT, 1),
14131413
Self ! {update_peer, To, #{status => normal}}
14141414
end),
1415+
?DEBUG("~ts: temporarily suspending peer ~w due to full distribution buffer",
1416+
[log_id(State0), To]),
14151417
{update_peer(To, #{status => suspended}, State0), Actions};
14161418
noconnect ->
14171419
%% for noconnects just allow it to pipeline and catch up later
@@ -1544,6 +1546,8 @@ handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, _Id, Term}},
15441546
%% node is connected
15451547
%% leader effect only
15461548
Machine = ra_server:machine(SS0),
1549+
%% temporary assertion
1550+
#{To := #{status := normal}} = ra_server:peers(SS0),
15471551
Id = ra_server:id(SS0),
15481552
Pid = spawn(fun () ->
15491553
try send_snapshots(Id, Term, To,

test/ra_log_2_SUITE.erl

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1353,6 +1353,34 @@ oldcheckpoints_deleted_after_snapshot_install(Config) ->
13531353

13541354
ok.
13551355

1356+
snapshot_installation_with_live_indexes(Config) ->
1357+
Log0 = ra_log_init(Config),
1358+
{0, 0} = ra_log:last_index_term(Log0),
1359+
Log1 = assert_log_events(write_n(1, 10, 2, Log0),
1360+
fun (L) ->
1361+
LW = ra_log:last_written(L),
1362+
{9, 2} == LW
1363+
end),
1364+
1365+
Log2 = Log1,
1366+
1367+
%% create snapshot chunk
1368+
Meta = meta(15, 2, [?N1]),
1369+
Chunk = create_snapshot_chunk(Config, Meta, #{}),
1370+
SnapState0 = ra_log:snapshot_state(Log2),
1371+
{ok, SnapState1} = ra_snapshot:begin_accept(Meta, SnapState0),
1372+
Machine = {machine, ?MODULE, #{}},
1373+
{SnapState, _, LiveIndexes, AEffs} = ra_snapshot:complete_accept(Chunk, 1, Machine,
1374+
SnapState1),
1375+
run_effs(AEffs),
1376+
{ok, Log3, Effs4} = ra_log:install_snapshot({15, 2}, ?MODULE, LiveIndexes,
1377+
ra_log:set_snapshot_state(SnapState, Log2)),
1378+
1379+
run_effs(Effs4),
1380+
{15, _} = ra_log:last_index_term(Log3),
1381+
{15, _} = ra_log:last_written(Log3),
1382+
ok.
1383+
13561384
snapshot_installation(Config) ->
13571385
Log0 = ra_log_init(Config),
13581386
{0, 0} = ra_log:last_index_term(Log0),
@@ -1794,7 +1822,12 @@ sparse_write(Config) ->
17941822
SnapState1),
17951823
run_effs(AEffs),
17961824
Log3 = ra_log:set_snapshot_state(SnapState, Log2),
1797-
{ok, Log4, _} = ra_log:install_snapshot({15, 2}, ?MODULE, LiveIndexes, Log3),
1825+
{ok, Log4, _} = ra_log:install_snapshot({15, 2}, ?MODULE,
1826+
LiveIndexes, Log3),
1827+
1828+
ct:pal("overview Log4 ~p", [ra_log:overview(Log4)]),
1829+
?assertEqual(16, ra_log:next_index(Log4)),
1830+
17981831
{ok, Log} = ra_log:write([{16, 1, <<>>}], Log4),
17991832
{ResFinal, _} = ra_log:sparse_read(LiveIndexes, Log),
18001833
?assertMatch([{3, _, _},

test/unit_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ mods() ->
2121
ra_flru,
2222
ra_lib,
2323
ra_log,
24-
ra_log_reader,
24+
ra_log_segments,
2525
ra_log_segment,
2626
ra_monitors,
2727
ra_server,

0 commit comments

Comments
 (0)