@@ -757,36 +757,33 @@ stalled(_Type, ?REMOTE(Sender, ?APPEND_ENTRIES(PrevLogIndex, _PrevLogTerm, _Entr
757757 {keep_state , NewState };
758758
759759stalled ({call , From }, ? BOOTSTRAP_COMMAND (# raft_log_pos {index = Index , term = Term } = Position , Config , Data ),
760- # raft_state {name = Name , self = Self , data_dir = PartitionPath , log_view = View , storage = Storage , current_term = CurrentTerm , last_applied = LastApplied } = State0 ) ->
760+ # raft_state {name = Name , self = Self , data_dir = PartitionPath , storage = Storage , current_term = CurrentTerm , last_applied = LastApplied } = State0 ) ->
761761 case Index > LastApplied andalso Term > 0 of
762762 true ->
763763 ? LOG_NOTICE (" Server[~0p , term ~0p , stalled] attempting bootstrap at ~0p :~0p with config ~0p and data ~0P ." ,
764764 [Name , CurrentTerm , Index , Term , Config , Data , 30 ], #{domain => [whatsapp , wa_raft ]}),
765765 Path = filename :join (PartitionPath , io_lib :format (" snapshot.~0p .~0p .bootstrap.tmp" , [Index , Term ])),
766766 try
767767 ok = wa_raft_storage :make_empty_snapshot (Storage , Path , Position , Config , Data ),
768- ok = wa_raft_storage :open_snapshot (Storage , Path , Position ),
769- {ok , NewView } = wa_raft_log :reset (View , Position ),
770- State1 = State0 # raft_state {log_view = NewView , last_applied = Index , commit_index = Index },
771- State2 = load_config (State1 ),
768+ State1 = open_snapshot (Path , Position , State0 ),
772769 case Term > CurrentTerm of
773770 true ->
774- case is_single_member (Self , config (State2 )) of
771+ case is_single_member (Self , config (State1 )) of
775772 true ->
776- State3 = advance_term (? FUNCTION_NAME , Term , node (), State2 ),
773+ State2 = advance_term (? FUNCTION_NAME , Term , node (), State1 ),
777774 ? LOG_NOTICE (" Server[~0p , term ~0p , stalled] switching to leader as sole member after successful bootstrap." ,
778- [Name , State3 # raft_state .current_term ], #{domain => [whatsapp , wa_raft ]}),
779- {next_state , leader , State3 , {reply , From , ok }};
775+ [Name , State2 # raft_state .current_term ], #{domain => [whatsapp , wa_raft ]}),
776+ {next_state , leader , State2 , {reply , From , ok }};
780777 false ->
781- State3 = advance_term (? FUNCTION_NAME , Term , undefined , State2 ),
778+ State2 = advance_term (? FUNCTION_NAME , Term , undefined , State1 ),
782779 ? LOG_NOTICE (" Server[~0p , term ~0p , stalled] switching to follower after successful bootstrap." ,
783- [Name , State3 # raft_state .current_term ], #{domain => [whatsapp , wa_raft ]}),
784- {next_state , follower , State3 , {reply , From , ok }}
780+ [Name , State2 # raft_state .current_term ], #{domain => [whatsapp , wa_raft ]}),
781+ {next_state , follower , State2 , {reply , From , ok }}
785782 end ;
786783 false ->
787784 ? LOG_NOTICE (" Server[~0p , term ~0p , stalled] switching to follower after successful bootstrap." ,
788785 [Name , CurrentTerm ], #{domain => [whatsapp , wa_raft ]}),
789- {next_state , follower , State2 , {reply , From , ok }}
786+ {next_state , follower , State1 , {reply , From , ok }}
790787 end
791788 catch
792789 _ :Reason ->
@@ -803,27 +800,21 @@ stalled({call, From}, ?BOOTSTRAP_COMMAND(#raft_log_pos{index = Index, term = Ter
803800 end ;
804801
805802stalled (Type , ? SNAPSHOT_AVAILABLE_COMMAND (Root , # raft_log_pos {index = SnapshotIndex , term = SnapshotTerm } = SnapshotPos ),
806- # raft_state {name = Name , log_view = View0 , storage = Storage ,
807- current_term = CurrentTerm , last_applied = LastApplied } = State0 ) ->
803+ # raft_state {name = Name , current_term = CurrentTerm , last_applied = LastApplied } = State0 ) ->
808804 case SnapshotIndex > LastApplied orelse LastApplied =:= 0 of
809805 true ->
810806 try
811- ? LOG_NOTICE (" Server[~0p , term ~0p , stalled] applying snapshot ~p : ~p " ,
807+ ? LOG_NOTICE (" Server[~0p , term ~0p , stalled] applying snapshot at ~0p : ~0p . " ,
812808 [Name , CurrentTerm , SnapshotIndex , SnapshotTerm ], #{domain => [whatsapp , wa_raft ]}),
813- ok = wa_raft_storage :open_snapshot (Storage , Root , SnapshotPos ),
814- {ok , View1 } = wa_raft_log :reset (View0 , SnapshotPos ),
815- State1 = State0 # raft_state {log_view = View1 , last_applied = SnapshotIndex , commit_index = SnapshotIndex },
816- State2 = load_config (State1 ),
817- ? LOG_NOTICE (" Server[~0p , term ~0p , stalled] switching to follower after installing snapshot at ~p :~p ." ,
818- [Name , CurrentTerm , SnapshotIndex , SnapshotTerm ], #{domain => [whatsapp , wa_raft ]}),
819- State3 = case SnapshotTerm > CurrentTerm of
820- true -> advance_term (? FUNCTION_NAME , SnapshotTerm , undefined , State2 );
821- false -> State2
809+ State1 = open_snapshot (Root , SnapshotPos , State0 ),
810+ State2 = case SnapshotTerm > CurrentTerm of
811+ true -> advance_term (? FUNCTION_NAME , SnapshotTerm , undefined , State1 );
812+ false -> State1
822813 end ,
823814 % At this point, we assume that we received some cluster membership configuration from
824815 % our peer so it is safe to transition to an operational state.
825816 reply (Type , ok ),
826- {next_state , follower , State3 }
817+ {next_state , follower , State2 }
827818 catch
828819 _ :Reason ->
829820 ? LOG_WARNING (" Server[~0p , term ~0p , stalled] failed to load available snapshot ~p due to ~p " ,
@@ -1633,16 +1624,14 @@ witness(_Type, ?REMOTE(_, ?HANDOVER_FAILED(_)), _State) ->
16331624 keep_state_and_data ;
16341625
16351626witness (Type , ? SNAPSHOT_AVAILABLE_COMMAND (undefined , # raft_log_pos {index = SnapshotIndex , term = SnapshotTerm } = SnapshotPos ),
1636- # raft_state {log_view = View0 , name = Name , current_term = CurrentTerm , last_applied = LastApplied } = State0 ) ->
1627+ # raft_state {name = Name , current_term = CurrentTerm , last_applied = LastApplied } = State ) ->
16371628 case SnapshotIndex > LastApplied orelse LastApplied =:= 0 of
16381629 true ->
16391630 ? LOG_NOTICE (" Server[~0p , term ~0p , witness] accepting snapshot ~p :~p but not loading" ,
16401631 [Name , CurrentTerm , SnapshotIndex , SnapshotTerm ], #{domain => [whatsapp , wa_raft ]}),
1641- {ok , View1 } = wa_raft_log :reset (View0 , SnapshotPos ),
1642- State1 = State0 # raft_state {log_view = View1 , last_applied = SnapshotIndex , commit_index = SnapshotIndex },
1643- State2 = load_config (State1 ),
1632+ NewState = reset_log (SnapshotPos , State ),
16441633 reply (Type , ok ),
1645- {next_state , witness , State2 };
1634+ {next_state , witness , NewState };
16461635 false ->
16471636 ? LOG_NOTICE (" Server[~0p , term ~0p , witness] ignoring available snapshot ~p :~p with index not past ours (~p )" ,
16481637 [Name , CurrentTerm , SnapshotIndex , SnapshotTerm , LastApplied ], #{domain => [whatsapp , wa_raft ]}),
@@ -2410,6 +2399,28 @@ adjust_config(Action, Config, #raft_state{self = Self}) ->
24102399 end
24112400 end .
24122401
2402+ % %------------------------------------------------------------------------------
2403+ % % RAFT Server - State Machine Implementation - Log State
2404+ % %------------------------------------------------------------------------------
2405+
2406+ -spec reset_log (Position :: wa_raft_log :log_pos (), Data :: # raft_state {}) -> # raft_state {}.
2407+ reset_log (# raft_log_pos {index = Index } = Position , # raft_state {log_view = View } = Data ) ->
2408+ {ok , NewView } = wa_raft_log :reset (View , Position ),
2409+ NewData = Data # raft_state {
2410+ log_view = NewView ,
2411+ last_applied = Index ,
2412+ commit_index = Index
2413+ },
2414+ load_config (NewData ).
2415+
2416+ % %------------------------------------------------------------------------------
2417+ % % RAFT Server - State Machine Implementation - Snapshots
2418+ % %------------------------------------------------------------------------------
2419+
2420+ -spec open_snapshot (Root :: string (), Position :: wa_raft_log :log_pos (), Data :: # raft_state {}) -> # raft_state {}.
2421+ open_snapshot (Root , Position , # raft_state {storage = Storage } = Data ) ->
2422+ ok = wa_raft_storage :open_snapshot (Storage , Root , Position ),
2423+ reset_log (Position , Data ).
24132424
24142425% %------------------------------------------------------------------------------
24152426% % RAFT Server - State Machine Implementation - Heartbeat
0 commit comments