@@ -208,13 +208,14 @@ prepare(ThresholdSeed) ->
208208 {ok , VPid0 } = riak_core_vnode_manager :get_vnode_pid (Id , riak_kv_vnode ),
209209 sys :resume (VPid0 ),
210210 ok = supervisor :terminate_child (riak_core_vnode_sup , VPid0 ),
211- false = is_process_alive (VPid0 ),
212211
213212 % % Reset the proxy pid to make sure it resets state and picks up the new
214213 % % environment variables
215214 ok = supervisor :terminate_child (riak_core_vnode_proxy_sup , {riak_kv_vnode , Id }),
216215 RegName = riak_core_vnode_proxy :reg_name (riak_kv_vnode , Index ),
217216 undefined = whereis (RegName ),
217+ VPid1 = wait_for_vnode_change (VPid0 , Index ),
218+
218219 {ok , PPid } = supervisor :restart_child (riak_core_vnode_proxy_sup , {riak_kv_vnode , Id }),
219220
220221 % % Find the proxy pid and check it's alive and matches the supervisor
@@ -225,6 +226,7 @@ prepare(ThresholdSeed) ->
225226 % % and return the Pid so we know we have the same Pid.
226227 {ok , VPid } = riak_core_vnode_proxy :command_return_vnode (
227228 {riak_kv_vnode ,Index ,node ()}, timeout ),
229+ ? assertEqual (VPid , VPid1 ),
228230
229231 true = is_process_alive (PPid ),
230232 true = is_process_alive (VPid ),
@@ -264,14 +266,14 @@ resume_args(#tstate{rt = RT}) ->
264266resume (# rt {ppid = PPid , vpid = VPid }) ->
265267 sys :resume (VPid ),
266268 % % Use the sys:get_status call to force a synchronous call
267- % % against the vnode proxy to ensure all messages sent by
269+ % % against the vnode & the proxy to ensure all messages sent by
268270 % % this process have been serviced and there are no pending
269271 % % 'ping's in the vnode before we continue.
270272 % % Then drain the vnode to make sure any pending pongs have
271- % % been sent.
272- ok = drain ( VPid ),
273+ % % been sent, and ensure the proxy has
274+ _ = sys : get_status ( PPid ),
273275 _ = sys :get_status (VPid ),
274- _ = sys : get_status ( PPid ).
276+ ok = drain ([ VPid , PPid ] ).
275277
276278resume_next (S , _V , _A ) ->
277279 S # tstate {vnode_running = true , proxy_msgs = 0 , direct_msgs = 0 }.
@@ -324,28 +326,28 @@ overloaded_args(#tstate{vnode_running = Running, rt = RT}) ->
324326overloaded (Running , # rt {ppid = PPid , vpid = VPid }) ->
325327 case Running of
326328 true ->
327- ok = drain (PPid ), % make sure all proxy msgs processed/dropped
328- ok = drain (VPid ); % make sure any pending ping/pongs are processed
329+ ok = drain ([PPid , VPid ]);
329330 _ ->
330331 ok
331332 end ,
332- {riak_core_vnode_proxy :overloaded (PPid ),
333- msgq_len (VPid ), % just for debug so we can review in log output
334- sys :get_status (PPid )}. % ditto
333+ {messages , PMsgs } = process_info (PPid , messages ),
334+ {messages , VMsgs } = process_info (VPid , messages ),
335+ Overloaded = riak_core_vnode_proxy :overloaded (PPid ),
336+ {Overloaded , {VMsgs , PMsgs }, sys :get_status (PPid )}.
335337
336338overloaded_post (# tstate {threshold = undefined }, _A ,
337- {R , _VnodeQ , _ProxyStatus }) ->
339+ {R , _Messages , _ProxyStatus }) ->
338340 % % If there are no thresholds there should never be an overload
339341 eq (R , false );
340342overloaded_post (# tstate {vnode_running = true }, _A ,
341- {R , _VnodeQ = 0 , _ProxyStatus }) ->
343+ {R , _Messages , _ProxyStatus }) ->
342344 % % If the vnode is running, we have cleared queues so
343345 % % should not be in overload.
344346 eq (R , false );
345347overloaded_post (# tstate {vnode_running = false ,
346348 proxy_msgs = ProxyMsgs ,
347349 threshold = Threshold }, _A ,
348- {ResultOverload , _VnodeQ , _ProxyStatus }) ->
350+ {ResultOverload , _Messages , _ProxyStatus }) ->
349351 % % Either
350352 % % mailbox is completely an estimate based on proxy msgs
351353 % % or mailbox is a check + estimate since
@@ -392,16 +394,33 @@ prep_env(Var, Val) ->
392394% % Wait until all messages are drained by the Pid. No guarantees
393395% % about future messages being sent, or that responses for the
394396% % last message consumed have been transmitted.
395- % %
396- drain (Pid ) ->
397- case erlang :process_info (Pid , message_queue_len ) of
398- {message_queue_len , 0 } ->
397+ % % NOTE: The "drain 3 times in a row" was determined empirically,
398+ % % and may not be sufficient (2 was not). Given time constraints,
399+ % % living with it for now. If this fails, we should really add some
400+ % % tracing code around the send of messages to Vnode and Proxy to
401+ % % determine where extra messages are coming from rather than just
402+ % % make this "try 4 times"
403+ % %
404+ drain (Pid ) when is_pid (Pid ) ->
405+ drain ([Pid ], {- 1 , - 1 });
406+
407+ drain (Pids ) when is_list (Pids ) ->
408+ drain (Pids , {- 1 , - 1 }).
409+ drain (Pids , {PrevPrev , Prev }) ->
410+ _ = [sys :suspend (Pid ) || Pid <- Pids ],
411+ Len = lists :foldl (fun (Pid , Acc0 ) ->
412+ {message_queue_len , Len } = erlang :process_info (Pid , message_queue_len ),
413+ Acc0 + Len
414+ end , 0 , Pids ),
415+ _ = [sys :resume (Pid ) || Pid <- Pids ],
416+ case {PrevPrev , Prev , Len } of
417+ {0 , 0 , 0 } ->
399418 ok ;
400- { message_queue_len , L } when L > 0 ->
401- timer : sleep ( 1 ), % give it a millisecond to drain
402- drain ( Pid );
403- ER ->
404- ER
419+ _ ->
420+ % % Attempt to ensure something else is scheduled before we try to drain again
421+ erlang : yield (),
422+ timer : sleep ( 1 ),
423+ drain ( Pids , { Prev , Len })
405424 end .
406425
407426% % Return the length of the message queue (or crash if proc dead)
@@ -457,3 +476,14 @@ confirm() ->
457476 pass .
458477
459478-endif .
479+
480+
481+ wait_for_vnode_change (VPid0 , Index ) ->
482+ {ok , VPid1 } = riak_core_vnode_manager :get_vnode_pid (Index , riak_kv_vnode ),
483+ case VPid1 of
484+ VPid0 ->
485+ timer :sleep (1 ),
486+ wait_for_vnode_change (VPid0 , Index );
487+ _ ->
488+ VPid1
489+ end .
0 commit comments