Skip to content

Commit 486ea75

Browse files
author
Doug Rohrer
committed
WIP
1 parent baf7653 commit 486ea75

File tree

2 files changed

+33
-45
lines changed

2 files changed

+33
-45
lines changed

intercepts/riak_core_vnode_master_intercepts.erl

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,6 @@
2828
forwardable :: boolean(),
2929
opts = [] :: list()}).
3030

31-
-record(fitting,
32-
{
33-
pid :: pid(),
34-
ref :: reference(),
35-
chashfun,
36-
nval
37-
}).
38-
39-
-record(cmd_enqueue, {fitting :: #fitting{},
40-
input :: term(),
41-
timeout,
42-
usedpreflist}).
43-
4431
-define(M, riak_core_vnode_master_orig).
4532

4633

@@ -67,31 +54,25 @@ stop_vnode_after_bloom_fold_request_succeeds(IndexNode, Req, Sender, VMaster) ->
6754
false -> ?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster)
6855
end.
6956

70-
stop_pipe_vnode_after_request_sent(IndexNode, Req, Sender, VMaster) ->
71-
case Req of
72-
#cmd_enqueue{} = _Req ->
73-
%% ?I_INFO("Intercepting riak_core_vnode_master:command_returning_vnode"),
74-
random:seed(os:timestamp()),
75-
case random:uniform(20) of
76-
5 ->
77-
%% Simulate what happens when a VNode completes handoff between command_returning_vnode
78-
%% and the fold attempting to start - other attempts to intercept and slow
79-
%% certain parts of Riak to invoke the particular race condition were unsuccessful
80-
?I_INFO("Replaced VNode with spawned function in command_returning_vnode"),
81-
Runner = self(),
82-
VNodePid = spawn(fun() ->
83-
Runner ! go,
84-
exit(normal)
85-
end),
86-
receive
87-
go -> ok
88-
end,
89-
%% Still need to send the work
90-
?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster),
91-
{ok, VNodePid};
92-
_ ->
93-
?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster)
94-
end;
57+
return_dead_process_pid_from_get_vnode_pid(Node, Index, VNodeMod = riak_pipe_vnode) ->
58+
%% ?I_INFO("Intercepting riak_core_vnode_master:get_vnode_pid"),
59+
random:seed(os:timestamp()),
60+
case random:uniform(100) of
61+
7 ->
62+
%% Simulate what happens when a VNode completes handoff between get_vnode_pid
63+
%% and the fold attempting to start - other attempts to intercept and slow
64+
%% certain parts of Riak to invoke the particular race condition were unsuccessful
65+
?I_INFO("Replaced VNode with spawned function in get_vnode_pid"),
66+
VNodePid = spawn(fun() ->
67+
ok
68+
end),
69+
MonRef = erlang:monitor(VNodePid),
70+
receive
71+
{'DOWN', MonRef, process, VNodePid, _Reason} -> ok
72+
end,
73+
{ok, VNodePid};
9574
_ ->
96-
?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster)
97-
end.
75+
?M:get_vnode_pid_orig(Node, Index, VNodeMod)
76+
end;
77+
return_dead_process_pid_from_get_vnode_pid(Node, Index, VNodeMod) ->
78+
?M:get_vnode_pid_orig(Node, Index, VNodeMod).

tests/pipe_verify_handoff_blocking.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ confirm() ->
110110

111111
ok = rt:wait_until_transfers_complete(Nodes),
112112

113+
lager:info("Check input count"),
113114
FillerInputCount = stop_fillers(Fillers),
114115

115116
%% if we make it this far, then no filler ever saw the vnode_down
@@ -118,7 +119,7 @@ confirm() ->
118119
%% with `{error,[{vnode_down,noproc}]}}` errors, as the `noproc` case
119120
%% should be handled similarly to the `normal` exit case in
120121
%% `riak_pipe_vnode:queue_work_wait`
121-
122+
lager:info("Check pipe status"),
122123
_Status2 = pipe_status(Primary, Pipe),
123124

124125
lager:info("Send eoi and collect results"),
@@ -138,8 +139,8 @@ confirm() ->
138139
set_up_vnode_crashing_intercept(Primary) ->
139140
lager:info("Add intercept to kill vnode before calling the wait function"),
140141
rt_intercept:add(Primary, {riak_core_vnode_master,
141-
[{{command_return_vnode, 4},
142-
stop_pipe_vnode_after_request_sent}]}).
142+
[{{get_vnode_pid, 3},
143+
return_dead_process_pid_from_get_vnode_pid}]}).
143144

144145
%% queue filling
145146

@@ -177,8 +178,14 @@ queue_filler(Node, Pipe, Inputs, Count) ->
177178
{stop, Owner} -> Owner ! {done, Count}
178179
after 0 ->
179180
{{value, I}, Q} = queue:out(Inputs),
180-
ok = rpc:call(Node, riak_pipe, queue_work, [Pipe, I]),
181-
queue_filler(Node, Pipe, queue:in(I, Q), Count+1)
181+
case rpc:call(Node, riak_pipe, queue_work, [Pipe, I], 40000) of
182+
ok ->
183+
lager:info("Received response from queue_work"),
184+
queue_filler(Node, Pipe, queue:in(I, Q), Count+1);
185+
_ ->
186+
lager:info("Timed out waiting for response from queue_work"),
187+
queue_filler(Node, Pipe, queue:in(I, Q), Count+1)
188+
end
182189
end.
183190

184191
%% @doc tell all fillers to stop and collect and sum their send counts

0 commit comments

Comments
 (0)