Skip to content

Commit 6507c1d

Browse files
author
Doug Rohrer
committed
WIP
1 parent 5f6a47a commit 6507c1d

File tree

2 files changed

+33
-45
lines changed

2 files changed

+33
-45
lines changed

intercepts/riak_core_vnode_master_intercepts.erl

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,6 @@
88
forwardable :: boolean(),
99
opts = [] :: list()}).
1010

11-
-record(fitting,
12-
{
13-
pid :: pid(),
14-
ref :: reference(),
15-
chashfun,
16-
nval
17-
}).
18-
19-
-record(cmd_enqueue, {fitting :: #fitting{},
20-
input :: term(),
21-
timeout,
22-
usedpreflist}).
23-
2411
-define(M, riak_core_vnode_master_orig).
2512

2613

@@ -47,31 +34,25 @@ stop_vnode_after_bloom_fold_request_succeeds(IndexNode, Req, Sender, VMaster) ->
4734
false -> ?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster)
4835
end.
4936

50-
stop_pipe_vnode_after_request_sent(IndexNode, Req, Sender, VMaster) ->
51-
case Req of
52-
#cmd_enqueue{} = _Req ->
53-
%% ?I_INFO("Intercepting riak_core_vnode_master:command_returning_vnode"),
54-
random:seed(os:timestamp()),
55-
case random:uniform(20) of
56-
5 ->
57-
%% Simulate what happens when a VNode completes handoff between command_returning_vnode
58-
%% and the fold attempting to start - other attempts to intercept and slow
59-
%% certain parts of Riak to invoke the particular race condition were unsuccessful
60-
?I_INFO("Replaced VNode with spawned function in command_returning_vnode"),
61-
Runner = self(),
62-
VNodePid = spawn(fun() ->
63-
Runner ! go,
64-
exit(normal)
65-
end),
66-
receive
67-
go -> ok
68-
end,
69-
%% Still need to send the work
70-
?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster),
71-
{ok, VNodePid};
72-
_ ->
73-
?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster)
74-
end;
37+
return_dead_process_pid_from_get_vnode_pid(Node, Index, VNodeMod = riak_pipe_vnode) ->
38+
%% ?I_INFO("Intercepting riak_core_vnode_master:get_vnode_pid"),
39+
random:seed(os:timestamp()),
40+
case random:uniform(100) of
41+
7 ->
42+
%% Simulate what happens when a VNode completes handoff between get_vnode_pid
43+
%% and the fold attempting to start - other attempts to intercept and slow
44+
%% certain parts of Riak to invoke the particular race condition were unsuccessful
45+
?I_INFO("Replaced VNode with spawned function in get_vnode_pid"),
46+
VNodePid = spawn(fun() ->
47+
ok
48+
end),
49+
MonRef = erlang:monitor(VNodePid),
50+
receive
51+
{'DOWN', MonRef, process, VNodePid, _Reason} -> ok
52+
end,
53+
{ok, VNodePid};
7554
_ ->
76-
?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster)
77-
end.
55+
?M:get_vnode_pid_orig(Node, Index, VNodeMod)
56+
end;
57+
return_dead_process_pid_from_get_vnode_pid(Node, Index, VNodeMod) ->
58+
?M:get_vnode_pid_orig(Node, Index, VNodeMod).

tests/pipe_verify_handoff_blocking.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ confirm() ->
110110

111111
ok = rt:wait_until_transfers_complete(Nodes),
112112

113+
lager:info("Check input count"),
113114
FillerInputCount = stop_fillers(Fillers),
114115

115116
%% if we make it this far, then no filler ever saw the vnode_down
@@ -118,7 +119,7 @@ confirm() ->
118119
%% with `{error,[{vnode_down,noproc}]}}` errors, as the `noproc` case
119120
%% should be handled similarly to the `normal` exit case in
120121
%% `riak_pipe_vnode:queue_work_wait`
121-
122+
lager:info("Check pipe status"),
122123
_Status2 = pipe_status(Primary, Pipe),
123124

124125
lager:info("Send eoi and collect results"),
@@ -138,8 +139,8 @@ confirm() ->
138139
set_up_vnode_crashing_intercept(Primary) ->
139140
lager:info("Add intercept to kill vnode before calling the wait function"),
140141
rt_intercept:add(Primary, {riak_core_vnode_master,
141-
[{{command_return_vnode, 4},
142-
stop_pipe_vnode_after_request_sent}]}).
142+
[{{get_vnode_pid, 3},
143+
return_dead_process_pid_from_get_vnode_pid}]}).
143144

144145
%% queue filling
145146

@@ -177,8 +178,14 @@ queue_filler(Node, Pipe, Inputs, Count) ->
177178
{stop, Owner} -> Owner ! {done, Count}
178179
after 0 ->
179180
{{value, I}, Q} = queue:out(Inputs),
180-
ok = rpc:call(Node, riak_pipe, queue_work, [Pipe, I]),
181-
queue_filler(Node, Pipe, queue:in(I, Q), Count+1)
181+
case rpc:call(Node, riak_pipe, queue_work, [Pipe, I], 40000) of
182+
ok ->
183+
lager:info("Received response from queue_work"),
184+
queue_filler(Node, Pipe, queue:in(I, Q), Count+1);
185+
_ ->
186+
lager:info("Timed out waiting for response from queue_work"),
187+
queue_filler(Node, Pipe, queue:in(I, Q), Count+1)
188+
end
182189
end.
183190

184191
%% @doc tell all fillers to stop and collect and sum their send counts

0 commit comments

Comments
 (0)