Skip to content

Commit d89abfc

Browse files
committed
Support read_entries calls in more states
1 parent e793688 commit d89abfc

File tree

2 files changed

+17
-11
lines changed

2 files changed

+17
-11
lines changed

src/ra_log_wal.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -757,8 +757,8 @@ dump_records(<<_:1/unsigned, 0:1/unsigned, _:22/unsigned,
757757
_EntryData:0/binary,
758758
_Rest/binary>>, Entries) ->
759759
Entries;
760-
dump_records(<<_:1/unsigned, 0:1/unsigned, _:22/unsigned,
761-
IdDataLen:16/unsigned, _:IdDataLen/binary,
760+
dump_records(<<_:1/unsigned, 0:1/unsigned, Id2:22/unsigned,
761+
IdDataLen:16/unsigned, Id:IdDataLen/binary,
762762
Crc:32/integer,
763763
EntryDataLen:32/unsigned,
764764
Idx:64/unsigned, Term:64/unsigned,
@@ -767,19 +767,19 @@ dump_records(<<_:1/unsigned, 0:1/unsigned, _:22/unsigned,
767767
% TODO: recover writers info, i.e. last index seen
768768
case erlang:adler32(<<Idx:64/unsigned, Term:64/unsigned, EntryData/binary>>) of
769769
Crc ->
770-
dump_records(Rest, [{Idx, Term, binary_to_term(EntryData)} | Entries]);
770+
dump_records(Rest, [{{Id, Id2}, {Idx, Term, binary_to_term(EntryData)}} | Entries]);
771771
_ ->
772772
exit({crc_failed_for, Idx, EntryData})
773773
end;
774-
dump_records(<<_:1/unsigned, 1:1/unsigned, _:22/unsigned,
774+
dump_records(<<_:1/unsigned, 1:1/unsigned, Id:22/unsigned,
775775
Crc:32/integer,
776776
EntryDataLen:32/unsigned,
777777
Idx:64/unsigned, Term:64/unsigned,
778778
EntryData:EntryDataLen/binary,
779779
Rest/binary>>, Entries) ->
780780
case erlang:adler32(<<Idx:64/unsigned, Term:64/unsigned, EntryData/binary>>) of
781781
Crc ->
782-
dump_records(Rest, [{Idx, Term, binary_to_term(EntryData)} | Entries]);
782+
dump_records(Rest, [{Id, {Idx, Term, binary_to_term(EntryData)}} | Entries]);
783783
_ ->
784784
exit({crc_failed_for, Idx, EntryData})
785785
end;

src/ra_server_proc.erl

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ leader(_, tick_timeout, State0) ->
609609
{State2, Actions} = ?HANDLE_EFFECTS(RpcEffs ++ Effects ++ [{aux, tick}],
610610
cast, State1#state{server_state = ServerState}),
611611
%% try sending any pending applied notifications again
612-
State = send_applied_notifications(State2, #{}),
612+
State = send_applied_notifications(#{}, State2),
613613
{keep_state, State,
614614
set_tick_timer(State, Actions)};
615615
leader({timeout, Name}, machine_timeout, State0) ->
@@ -689,6 +689,8 @@ candidate(_, tick_timeout, State0) ->
689689
{keep_state, State, set_tick_timer(State, [])};
690690
candidate({call, From}, trigger_election, State) ->
691691
{keep_state, State, [{reply, From, ok}]};
692+
candidate({call, From}, {read_entries, Indexes}, State) ->
693+
read_entries0(From, Indexes, State);
692694
candidate(EventType, Msg, State0) ->
693695
case handle_candidate(Msg, State0) of
694696
{candidate, State1, Effects} ->
@@ -749,6 +751,8 @@ pre_vote(_, tick_timeout, State0) ->
749751
{keep_state, State, set_tick_timer(State, [])};
750752
pre_vote({call, From}, trigger_election, State) ->
751753
{keep_state, State, [{reply, From, ok}]};
754+
pre_vote({call, From}, {read_entries, Indexes}, State) ->
755+
read_entries0(From, Indexes, State);
752756
pre_vote(EventType, Msg, State0) ->
753757
case handle_pre_vote(Msg, State0) of
754758
{pre_vote, State1, Effects} ->
@@ -1087,6 +1091,8 @@ await_condition(info, {Status, Node, InfoList}, State0)
10871091
await_condition(_, tick_timeout, State0) ->
10881092
{State, Actions} = ?HANDLE_EFFECTS([{aux, tick}], cast, State0),
10891093
{keep_state, State, set_tick_timer(State, Actions)};
1094+
await_condition({call, From}, {read_entries, Indexes}, State) ->
1095+
read_entries0(From, Indexes, State);
10901096
await_condition(EventType, Msg, State0) ->
10911097
case handle_await_condition(Msg, State0) of
10921098
{follower, State1, Effects} ->
@@ -1513,7 +1519,7 @@ handle_effect(RaftState, {aux, Cmd}, EventType, State0, Actions0) ->
15131519
{State, Actions0 ++ Actions};
15141520
handle_effect(leader, {notify, Nots}, _, #state{} = State0, Actions) ->
15151521
%% should only be done by leader
1516-
State = send_applied_notifications(State0, Nots),
1522+
State = send_applied_notifications(Nots, State0),
15171523
{State, Actions};
15181524
handle_effect(_AnyState, {cast, To, Msg}, _, State, Actions) ->
15191525
%% TODO: handle send failure
@@ -2223,13 +2229,13 @@ update_peer(PeerId, Update,
22232229
State0#state{server_state =
22242230
ra_server:update_peer(PeerId, Update, ServerState)}.
22252231

2226-
send_applied_notifications(#state{pending_notifys = PendingNots} = State,
2227-
Nots0) when map_size(PendingNots) > 0 ->
2232+
send_applied_notifications(Nots0, #state{pending_notifys = PendingNots} = State)
2233+
when map_size(PendingNots) > 0 ->
22282234
Nots = maps:merge_with(fun(_K, V1, V2) ->
22292235
V1 ++ V2
22302236
end, PendingNots, Nots0),
2231-
send_applied_notifications(State#state{pending_notifys = #{}}, Nots);
2232-
send_applied_notifications(#state{} = State, Nots) ->
2237+
send_applied_notifications(Nots, State#state{pending_notifys = #{}});
2238+
send_applied_notifications(Nots, #state{} = State) ->
22332239
Id = id(State),
22342240
%% any notifications that could not be sent
22352241
%% will be kept and retried

0 commit comments

Comments
 (0)