1212
1313-include_lib (" kernel/include/logger.hrl" ).
1414-include (" wa_raft.hrl" ).
15+ -include (" wa_raft_rpc.hrl" ).
1516
1617% erlint-ignore dialyzer_override: improper list expected by gen interface
1718-dialyzer ({no_improper_lists , [handle_cast / 2 ]}).
2930
3031% % Internal API
3132-export ([
32- catchup /6
33+ full_replica /5 ,
34+ witness_replica /5
3335]).
3436
3537% % Snapshot catchup server implementation
5153-define (PENDING_KEY (Name , Node ), {pending , Name , Node }).
5254
5355-define (WHICH_TRANSPORTS , which_transports ).
54- -define (CATCHUP (App , Name , Node , Table , Partition , Witness ), {catchup , App , Name , Node , Table , Partition , Witness }).
56+ -define (FULL_REPLICA (App , Name , Node , Table , Partition ), {full_replica , App , Name , Node , Table , Partition }).
57+ -define (WITNESS_REPLICA (App , Name , Node , Table , Partition ), {witness_relica , App , Name , Node , Table , Partition }).
5558
5659-type key () :: {Name :: atom (), Node :: node ()}.
5760-type snapshot_key () :: {Table :: wa_raft :table (), Partition :: wa_raft :partition (), Position :: wa_raft_log :log_pos ()}.
5861
5962-type which_transports () :: ? WHICH_TRANSPORTS .
6063-type call () :: which_transports ().
6164
62- -type catchup () :: ? CATCHUP (atom (), atom (), node (), wa_raft :table (), wa_raft :partition (), boolean ()).
63- -type cast () :: catchup ().
65+ -type full_replica () :: ? FULL_REPLICA (atom (), atom (), node (), wa_raft :table (), wa_raft :partition ()).
66+ -type witness_replica () :: ? WITNESS_REPLICA (atom (), atom (), node (), wa_raft :table (), wa_raft :partition ()).
67+ -type cast () :: full_replica () | witness_replica ().
6468
6569-record (transport , {
6670 app :: atom (),
@@ -98,19 +102,24 @@ start_link() ->
98102which_transports () ->
99103 gen_server :call (? MODULE , ? WHICH_TRANSPORTS ).
100104
101- -spec catchup (
102- App :: atom (),
103- Name :: atom (),
104- Node :: node (),
105- Table :: wa_raft :table (),
106- Partition :: wa_raft :partition (),
107- Witness :: boolean ()
108- ) -> ok .
109- catchup (App , Name , Node , Table , Partition , Witness ) ->
105+ -spec full_replica (App :: atom (), Name :: atom (), Node :: node (), Table :: wa_raft :table (), Partition :: wa_raft :partition ()) -> ok .
106+ full_replica (App , Name , Node , Table , Partition ) ->
110107 try
111108 % Check ETS to avoid putting duplicate requests into the message queue.
112109 ets :insert_new (? MODULE , {? PENDING_KEY (Name , Node )}) andalso
113- gen_server :cast (? MODULE , ? CATCHUP (App , Name , Node , Table , Partition , Witness )),
110+ gen_server :cast (? MODULE , ? FULL_REPLICA (App , Name , Node , Table , Partition )),
111+ ok
112+ catch
113+ error :badarg ->
114+ ok
115+ end .
116+
117+ -spec witness_replica (App :: atom (), Name :: atom (), Node :: node (), Table :: wa_raft :table (), Partition :: wa_raft :partition ()) -> ok .
118+ witness_replica (App , Name , Node , Table , Partition ) ->
119+ try
120+ % Check ETS to avoid putting duplicate requests into the message queue.
121+ ets :insert_new (? MODULE , {? PENDING_KEY (Name , Node )}) andalso
122+ gen_server :cast (? MODULE , ? WITNESS_REPLICA (App , Name , Node , Table , Partition )),
114123 ok
115124 catch
116125 error :badarg ->
@@ -137,15 +146,17 @@ handle_call(Request, From, #state{} = State) ->
137146 {noreply , State }.
138147
139148-spec handle_cast (Request :: cast (), State :: # state {}) -> {noreply , # state {}}.
140- handle_cast (? CATCHUP (App , Name , Node , Table , Partition , Witness ), State0 ) ->
149+ handle_cast (? FULL_REPLICA (App , Name , Node , Table , Partition ), State0 ) ->
141150 % Just immediately remove the pending key from the ETS. Doing this here is simpler
142151 % but permits a bounded number of extra requests to remain in the queue.
143152 ets :delete (? MODULE , ? PENDING_KEY (Name , Node )),
144153 Now = erlang :monotonic_time (millisecond ),
145154 case allowed (Now , Name , Node , State0 ) of
146155 {true , # state {transports = Transports , snapshots = Snapshots , overload_backoffs = OverloadBackoffs } = State1 } ->
147156 try
148- {# raft_log_pos {index = Index , term = Term } = LogPos , Path } = create_snapshot (Table , Partition , Witness ),
157+ StorageRef = wa_raft_storage :registered_name (Table , Partition ),
158+ {ok , # raft_log_pos {index = Index , term = Term } = LogPos } = wa_raft_storage :create_snapshot (StorageRef ),
159+ Path = ? RAFT_SNAPSHOT_PATH (Table , Partition , Index , Term ),
149160 case wa_raft_transport :start_snapshot_transfer (Node , Table , Partition , LogPos , Path , infinity ) of
150161 {error , receiver_overloaded } ->
151162 ? LOG_NOTICE (" destination node ~0p is overloaded, abort new transport for ~0p :~0p and try again later" ,
@@ -176,6 +187,41 @@ handle_cast(?CATCHUP(App, Name, Node, Table, Partition, Witness), State0) ->
176187 {false , State1 } ->
177188 {noreply , State1 }
178189 end ;
190+ handle_cast (? WITNESS_REPLICA (App , Name , Node , Table , Partition ), State0 ) ->
191+ % Just immediately remove the pending key from the ETS. Doing this here is simpler
192+ % but permits a bounded number of extra requests to remain in the queue.
193+ ets :delete (? MODULE , ? PENDING_KEY (Name , Node )),
194+ Now = erlang :monotonic_time (millisecond ),
195+ case allowed (Now , Name , Node , State0 ) of
196+ {true , # state {retry_backoffs = RetryBackoffs } = State1 } ->
197+ try
198+ % If node is a witness, we can avoid creating a full snapshot as witness replicas
199+ % do not maintain the storage state. Instead, issue a SnapshotAvailableCommand
200+ % directly to the witness replica.
201+ StorageRef = wa_raft_storage :registered_name (Table , Partition ),
202+ Position = wa_raft_storage :position (StorageRef ),
203+ ? LOG_NOTICE (" sending abridged snapshot at ~0p to witness ~0p on ~0p " ,
204+ [Position , Name , Node ], #{domain => [whatsapp , wa_raft ]}),
205+
206+ % Until RAFT server supports casted SnapshotAvailableCommand, create an bogus
207+ % reference that we pass off as an alias to absorb the response from the peer
208+ % RAFT server.
209+ % eqwalizer:ignore - improper list expected by gen interface
210+ Tag = [alias | make_ref ()],
211+ Request = ? SNAPSHOT_AVAILABLE_COMMAND (undefined , Position ),
212+ erlang :send ({Name , Node }, {'$gen_call' , {self (), Tag }, Request }, [noconnect , nosuspend ]),
213+ NewRetryBackoff = Now + ? RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS (App ),
214+ NewRetryBackoffs = RetryBackoffs #{{Name , Node } => NewRetryBackoff },
215+ {noreply , State1 # state {retry_backoffs = NewRetryBackoffs }}
216+ catch
217+ _ :_ :S ->
218+ ? LOG_ERROR (" failed to send abridged snapshot for ~0p :~0p to ~0p :~n~p " ,
219+ [Table , Partition , Node , S ], #{domain => [whatsapp , wa_raft ]}),
220+ {noreply , State1 }
221+ end ;
222+ {false , State1 } ->
223+ {noreply , State1 }
224+ end ;
179225handle_cast (Request , # state {} = State ) ->
180226 ? LOG_NOTICE (" received unrecognized cast ~P " , [Request , 25 ], #{domain => [whatsapp , wa_raft ]}),
181227 {noreply , State }.
@@ -264,19 +310,3 @@ delete_snapshot(Table, Partition, #raft_log_pos{index = Index, term = Term}) ->
264310-spec schedule_scan () -> reference ().
265311schedule_scan () ->
266312 erlang :send_after (? SCAN_EVERY_MS , self (), scan ).
267-
268- -spec create_snapshot (
269- Table :: wa_raft :table (),
270- Partition :: wa_raft :partition (),
271- Witness :: boolean ()
272- ) -> {LogPos :: wa_raft_log :log_pos (), string ()}.
273- create_snapshot (Table , Partition , false ) ->
274- StorageRef = wa_raft_storage :registered_name (Table , Partition ),
275- {ok , # raft_log_pos {index = Index , term = Term } = LogPos } = wa_raft_storage :create_snapshot (StorageRef ),
276- Path = ? RAFT_SNAPSHOT_PATH (Table , Partition , Index , Term ),
277- {LogPos , Path };
278- create_snapshot (Table , Partition , true ) ->
279- StorageRef = wa_raft_storage :registered_name (Table , Partition ),
280- {ok , # raft_log_pos {index = Index , term = Term } = LogPos } = wa_raft_storage :create_witness_snapshot (StorageRef ),
281- Path = ? RAFT_SNAPSHOT_PATH (Table , Partition , ? WITNESS_SNAPSHOT_NAME (Index , Term )),
282- {LogPos , Path }.
0 commit comments