9999 queries_waiting_heartbeats := queue :queue ({non_neg_integer (),
100100 consistent_query_ref ()}),
101101 pending_consistent_queries := [consistent_query_ref ()],
102- commit_latency => option (non_neg_integer ())
102+ commit_latency => option (non_neg_integer ()),
103+ snapshot_phase => chunk_flag ()
103104 }.
104105
105106-type state () :: ra_server_state ().
@@ -1447,25 +1448,27 @@ handle_follower(#install_snapshot_rpc{term = Term,
14471448 SnapIdx > LastApplied andalso
14481449 % % only install snapshot if the machine version is understood
14491450 MacVer >= SnapMacVer andalso
1450- Num =< 1 ->
1451+ Num =< 1 andalso
1452+ ChunkFlag /= pre ->
14511453 % % only begin snapshot procedure if Idx is higher than the last_applied
14521454 % % index.
1453- ? DEBUG (" ~ts : begin_accept snapshot at index ~b in term ~b " ,
1454- [LogId , SnapIdx , Term ]),
1455+ ? DEBUG (" ~ts : begin_accept snapshot at index ~b in term ~b , phase ~s " ,
1456+ [LogId , SnapIdx , Term , ChunkFlag ]),
14551457 SnapState0 = ra_log :snapshot_state (Log0 ),
14561458 {ok , SS } = ra_snapshot :begin_accept (Meta , SnapState0 ),
14571459 Log1 = ra_log :set_snapshot_state (SS , Log0 ),
14581460
14591461 % % if the snaphost includes pre entries (live entries) then we need
14601462 % % to reset the log to the last applied index to avoid issues
14611463 Log = case ChunkFlag of
1462- pre ->
1464+ init ->
14631465 {ok , L } = ra_log :set_last_index (LastApplied , Log1 ),
14641466 L ;
14651467 _ ->
14661468 Log1
14671469 end ,
14681470 {receive_snapshot , update_term (Term , State0 #{log => Log ,
1471+ snapshot_phase => ChunkFlag ,
14691472 leader_id => LeaderId }),
14701473 [{next_event , Rpc }, {record_leader_msg , LeaderId }]};
14711474handle_follower (# install_snapshot_rpc {term = Term ,
@@ -1540,7 +1543,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15401543 cluster := ClusterIds ,
15411544 term := SnapTerm } = SnapMeta ,
15421545 chunk_state = {Num , ChunkFlag },
1543- data = ChunkOrEntries },
1546+ data = ChunkOrEntries } = Rpc ,
15441547 #{cfg := # cfg {id = Id ,
15451548 log_id = LogId ,
15461549 effective_machine_version = CurEffMacVer ,
@@ -1550,15 +1553,33 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15501553 cluster := Cluster ,
15511554 current_term := CurTerm ,
15521555 last_applied := LastApplied ,
1553- machine_state := OldMacState } = State0 )
1556+ machine_state := OldMacState ,
1557+ snapshot_phase := SnapPhase } = State0 )
15541558 when Term >= CurTerm ->
1555- ? DEBUG (" ~ts : receiving snapshot chunk: ~b / ~w , index ~b , term ~b " ,
1556- [LogId , Num , ChunkFlag , SnapIndex , SnapTerm ]),
15571559 Reply = # install_snapshot_result {term = CurTerm ,
15581560 last_term = SnapTerm ,
15591561 last_index = SnapIndex },
15601562 case ChunkFlag of
1563+ init when SnapPhase == init ->
1564+ % % this is ok, just reply
1565+ {receive_snapshot , State0 , [{reply , Reply }]};
1566+ init ->
1567+ ? DEBUG (" ~ts : receiving snapshot saw unexpected init phase at snapshot"
1568+ " index term {~b , ~b }, current phase ~s restarting
1569+ snapshot receive process" ,
1570+ [LogId , SnapIndex , SnapTerm , SnapPhase ]),
1571+ % % the snapshot sending must have been interrupted and restarted
1572+ % % during the init or pre-phase
1573+ % % abort the snapshot, and revert to follower
1574+ SnapState0 = ra_log :snapshot_state (Log00 ),
1575+ SnapState = ra_snapshot :abort_accept (SnapState0 ),
1576+ Log = ra_log :set_snapshot_state (SnapState , Log00 ),
1577+ {follower , maps :remove (snapshot_phase , State0 #{log => Log }),
1578+ [{next_event , Rpc }]};
15611579 pre when is_list (ChunkOrEntries ) ->
1580+ [{_FstIdx , _ , _ } | _ ] = ChunkOrEntries ,
1581+ % ?DEBUG("~ts: receiving snapshot chunk pre first index ~b snap index ~b, term ~b",
1582+ % [LogId, FstIdx, SnapIndex, SnapTerm]),
15621583 % % reset last index to last applied
15631584 % % as we dont know for sure indexes after last applied
15641585 % % are of the right term
@@ -1568,15 +1589,21 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15681589 {ok , L } = ra_log :write_sparse (E , LstIdx , L0 ),
15691590 {L , I }
15701591 end , {Log00 , LastIdx }, ChunkOrEntries ),
1571- State = update_term (Term , State0 #{log => Log }),
1592+ State = update_term (Term , State0 #{log => Log ,
1593+ snapshot_phase => pre }),
15721594 {receive_snapshot , State , [{reply , Reply }]};
15731595 next ->
1596+ ? DEBUG (" ~ts : receiving snapshot chunk: ~b / ~w , index ~b , term ~b " ,
1597+ [LogId , Num , ChunkFlag , SnapIndex , SnapTerm ]),
15741598 SnapState0 = ra_log :snapshot_state (Log00 ),
15751599 SnapState = ra_snapshot :accept_chunk (ChunkOrEntries , Num , SnapState0 ),
15761600 Log0 = ra_log :set_snapshot_state (SnapState , Log00 ),
1577- State = update_term (Term , State0 #{log => Log0 }),
1601+ State = update_term (Term , State0 #{log => Log0 ,
1602+ snapshot_phase => next }),
15781603 {receive_snapshot , State , [{reply , Reply }]};
15791604 last ->
1605+ ? DEBUG (" ~ts : receiving snapshot chunk: ~b / ~w , index ~b , term ~b " ,
1606+ [LogId , Num , ChunkFlag , SnapIndex , SnapTerm ]),
15801607 SnapState0 = ra_log :snapshot_state (Log00 ),
15811608 {SnapState , MacState , LiveIndexes , Effs0 } =
15821609 ra_snapshot :complete_accept (ChunkOrEntries , Num , Machine ,
@@ -1614,7 +1641,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16141641 MacState ,
16151642 OldMeta ,
16161643 OldMacState ),
1617- State = update_term (Term ,
1644+ State1 = update_term (Term ,
16181645 State0 #{cfg => Cfg ,
16191646 log => Log ,
16201647 commit_index => SnapIndex ,
@@ -1626,6 +1653,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16261653 membership =>
16271654 get_membership (ClusterIds , State0 ),
16281655 machine_state => MacState }),
1656+ State = maps :remove (snapshot_phase , State1 ),
16291657 put_counter (Cfg , ? C_RA_SVR_METRIC_LAST_APPLIED , SnapIndex ),
16301658 % % it was the last snapshot chunk so we can revert back to
16311659 % % follower status
@@ -1645,13 +1673,15 @@ handle_receive_snapshot(#append_entries_rpc{term = Term} = Msg,
16451673 SnapState0 = ra_log :snapshot_state (Log0 ),
16461674 SnapState = ra_snapshot :abort_accept (SnapState0 ),
16471675 Log = ra_log :set_snapshot_state (SnapState , Log0 ),
1648- {follower , update_term (Term , clear_leader_id (State #{log => Log })),
1676+ {follower , maps :remove (snapshot_phase ,
1677+ update_term (Term ,
1678+ clear_leader_id (State #{log => Log }))),
16491679 [{next_event , Msg }]};
16501680handle_receive_snapshot ({ra_log_event , Evt },
1651- #{cfg := # cfg {log_id = LogId },
1681+ #{cfg := # cfg {log_id = _LogId },
16521682 log := Log0 } = State ) ->
1653- ? DEBUG (" ~ts : ~s ra_log_event received: ~w " ,
1654- [LogId , ? FUNCTION_NAME , Evt ]),
1683+ % ?DEBUG("~ts: ~s ra_log_event received: ~w",
1684+ % [LogId, ?FUNCTION_NAME, Evt]),
16551685 % simply forward all other events to ra_log
16561686 % whilst the snapshot is being received
16571687 {Log , Effects } = ra_log :handle_event (Evt , Log0 ),
@@ -1664,7 +1694,7 @@ handle_receive_snapshot(receive_snapshot_timeout,
16641694 SnapState0 = ra_log :snapshot_state (Log0 ),
16651695 SnapState = ra_snapshot :abort_accept (SnapState0 ),
16661696 Log = ra_log :set_snapshot_state (SnapState , Log0 ),
1667- {follower , State #{log => Log }, []};
1697+ {follower , maps : remove ( snapshot_phase , State #{log => Log }) , []};
16681698handle_receive_snapshot (# info_rpc {term = Term } = Msg ,
16691699 #{current_term := CurTerm ,
16701700 cfg := # cfg {log_id = LogId },
@@ -1677,7 +1707,8 @@ handle_receive_snapshot(#info_rpc{term = Term} = Msg,
16771707 SnapState0 = ra_log :snapshot_state (Log0 ),
16781708 SnapState = ra_snapshot :abort_accept (SnapState0 ),
16791709 Log = ra_log :set_snapshot_state (SnapState , Log0 ),
1680- {follower , update_term (Term , clear_leader_id (State #{log => Log })),
1710+ {follower , maps :remove (snapshot_phase ,
1711+ update_term (Term , clear_leader_id (State #{log => Log }))),
16811712 [{next_event , Msg }]};
16821713handle_receive_snapshot (# info_rpc {} = InfoRpc , State ) ->
16831714 InfoReplyEffect = empty_info_reply_effect (State , InfoRpc ),
@@ -1694,7 +1725,8 @@ handle_receive_snapshot(#info_reply{term = Term} = Msg,
16941725 SnapState0 = ra_log :snapshot_state (Log0 ),
16951726 SnapState = ra_snapshot :abort_accept (SnapState0 ),
16961727 Log = ra_log :set_snapshot_state (SnapState , Log0 ),
1697- {follower , update_term (Term , clear_leader_id (State #{log => Log })),
1728+ {follower , maps :remove (snapshot_phase ,
1729+ update_term (Term , clear_leader_id (State #{log => Log }))),
16981730 [{next_event , Msg }]};
16991731handle_receive_snapshot (# info_reply {}, State ) ->
17001732 {receive_snapshot , State , []};
0 commit comments