Skip to content

Commit bbbf3c4

Browse files
authored
Requests keys and hashes off loop (#818)
See #817 Look to resolve deadlock by handling request in sender loop rather than blocking riak_repl_aae_sink waiting for this result.
1 parent bae8702 commit bbbf3c4

File tree

3 files changed

+64
-22
lines changed

3 files changed

+64
-22
lines changed

src/riak_repl_aae_sink.erl

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,11 @@ process_msg(?MSG_GET_AAE_BUCKET, {Level,BucketNum,IndexN}, State=#state{tree_pid
138138
ResponseMsg = riak_kv_index_hashtree:exchange_bucket(IndexN, Level, BucketNum, TreePid),
139139
send_reply(ResponseMsg, State);
140140

141-
process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State=#state{tree_pid=TreePid}) ->
142-
ResponseMsg = riak_kv_index_hashtree:exchange_segment(IndexN, SegmentNum, TreePid),
143-
send_reply(ResponseMsg, State);
141+
process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State) ->
142+
riak_repl_stats:aae_segments_requested(),
143+
State#state.sender !
144+
{?MSG_GET_AAE_SEGMENT, {SegmentNum, IndexN, State#state.tree_pid}},
145+
{noreply, State};
144146

145147
%% no reply
146148
process_msg(?MSG_PUT_OBJ, {fs_diff_obj, BObj}, State) ->
@@ -207,7 +209,15 @@ sender_init(Transport, Socket) ->
207209
sender_loop({Transport, Socket}).
208210

209211
sender_loop(State={Transport, Socket}) ->
210-
receive Msg ->
212+
receive
213+
{?MSG_GET_AAE_SEGMENT, {SegmentNum, IndexN, TreePid}} ->
214+
KeysHashes =
215+
riak_kv_index_hashtree:exchange_segment(
216+
IndexN, SegmentNum, TreePid),
217+
riak_repl_stats:keys_hashes_returned(length(KeysHashes)),
218+
DataBin = term_to_binary(KeysHashes),
219+
ok = Transport:send(Socket, <<?MSG_REPLY:8, DataBin/binary>>);
220+
Msg ->
211221
ok = Transport:send(Socket, Msg)
212222
end,
213223
?MODULE:sender_loop(State).

src/riak_repl_aae_source.erl

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ cancel_fullsync(Pid) ->
9999
%%%===================================================================
100100

101101
init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) ->
102-
lager:debug("AAE fullsync source worker started for partition ~p",
103-
[Partition]),
102+
lager:info(
103+
"AAE fullsync source worker started for partition ~p", [Partition]),
104104

105105
Ver = riak_repl_util:deduce_wire_version_from_proto(Proto),
106106
{_, ClientVer, _} = Proto,
@@ -285,7 +285,9 @@ update_trees(tree_built, State = #state{indexns=IndexNs}) ->
285285
NeededBuilts ->
286286
%% Trees built now we can estimate how many keys
287287
{ok, EstimatedNrKeys} = riak_kv_index_hashtree:estimate_keys(State#state.tree_pid),
288-
lager:debug("EstimatedNrKeys ~p for partition ~p", [EstimatedNrKeys, State#state.index]),
288+
lager:info(
289+
"EstimatedNrKeys ~p for partition ~p",
290+
[EstimatedNrKeys, State#state.index]),
289291

290292
lager:debug("Moving to key exchange state"),
291293
key_exchange(init, State#state{built=Built, estimated_nr_keys = EstimatedNrKeys});
@@ -320,7 +322,7 @@ key_exchange(cancel_fullsync, State) ->
320322
{stop, normal, State};
321323
key_exchange(finish_fullsync, State=#state{owner=Owner}) ->
322324
send_complete(State),
323-
lager:debug("AAE fullsync source completed partition ~p",
325+
lager:info("AAE fullsync source completed partition ~p",
324326
[State#state.index]),
325327
riak_repl2_fssource:fullsync_complete(Owner),
326328
%% TODO: Why stay in key_exchange? Should we stop instead?
@@ -341,8 +343,9 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster,
341343
tree_pid=TreePid,
342344
exchange=Exchange,
343345
indexns=[IndexN|_IndexNs]}) ->
344-
lager:debug("Starting fullsync key exchange with ~p for ~p/~p",
345-
[Cluster, Partition, IndexN]),
346+
lager:info(
347+
"Starting fullsync key exchange with ~p for ~p/~p",
348+
[Cluster, Partition, IndexN]),
346349

347350
SourcePid = self(),
348351

@@ -396,14 +399,20 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster,
396399
end,
397400

398401
%% TODO: Add stats for AAE
399-
lager:debug("Starting compare for partition ~p", [Partition]),
400-
spawn_link(fun() ->
401-
StageStart=os:timestamp(),
402-
Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid),
403-
lager:debug("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)",
404-
[State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]),
405-
gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2})
406-
end),
402+
lager:info("Starting compare for partition ~p", [Partition]),
403+
spawn_link(
404+
fun() ->
405+
StageStart = os:timestamp(),
406+
Exchange2 =
407+
riak_kv_index_hashtree:compare(
408+
IndexN, Remote, AccFun, Exchange, TreePid),
409+
lager:info(
410+
"Full-sync with site ~p; fullsync difference generator for ~p completion_time=~p secs",
411+
[State#state.cluster,
412+
Partition,
413+
riak_repl_util:elapsed_secs(StageStart)]),
414+
gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2})
415+
end),
407416

408417
%% wait for differences from bloom_folder or to be done
409418
{next_state, compute_differences, State}.

src/riak_repl_stats.erl

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
clear_rt_dirty/0,
4242
touch_rt_dirty_file/0,
4343
remove_rt_dirty_file/0,
44-
is_rt_dirty/0]).
44+
is_rt_dirty/0,
45+
aae_segments_requested/0,
46+
keys_hashes_returned/1]).
4547

4648
-define(APP, riak_repl).
4749

@@ -107,6 +109,12 @@ elections_elected() ->
107109
elections_leader_changed() ->
108110
increment_counter(elections_leader_changed).
109111

112+
aae_segments_requested() ->
113+
increment_counter(aae_segments_requested).
114+
115+
keys_hashes_returned(Length) ->
116+
increment_counter(keys_hashes_returned, Length).
117+
110118
%% If any source errors are detected, write a file out to persist this status
111119
%% across restarts
112120
rt_source_errors() ->
@@ -212,7 +220,9 @@ stats() ->
212220
{last_server_bytes_recv, gauge},
213221
{rt_source_errors, counter},
214222
{rt_sink_errors, counter},
215-
{rt_dirty, counter}].
223+
{rt_dirty, counter},
224+
{aae_segments_requested, counter},
225+
{keys_hashes_returned, counter}].
216226

217227
increment_counter(Name) ->
218228
increment_counter(Name, 1).
@@ -427,11 +437,24 @@ test_check_stats() ->
427437
{server_tx_kbps,[]},
428438
{rt_source_errors,1},
429439
{rt_sink_errors, 1},
430-
{rt_dirty, 2}],
440+
{rt_dirty, 2},
441+
{aae_segments_requested, 0},
442+
{keys_hashes_returned, 0}],
431443
Result = get_stats(),
432444

433445
?assertEqual(Expected,
434-
[{K1, V} || {K1, V} <- Result, {K2, _} <- Expected, K1 == K2]).
446+
[{K1, V} || {K1, V} <- Result, {K2, _} <- Expected, K1 == K2]),
447+
448+
riak_repl_stats:aae_segments_requested(),
449+
riak_repl_stats:keys_hashes_returned(100),
450+
451+
UpdResult = get_stats(),
452+
?assertMatch(
453+
{aae_segments_requested, 1},
454+
lists:keyfind(aae_segments_requested, 1, UpdResult)),
455+
?assertMatch(
456+
{keys_hashes_returned, 100},
457+
lists:keyfind(keys_hashes_returned, 1, UpdResult)).
435458

436459

437460
test_report() ->

0 commit comments

Comments
 (0)