Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/rtdev-install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ echo " - Writing $RT_DEST_DIR/$RELEASE/VERSION"
echo -n $VERSION > $RT_DEST_DIR/$RELEASE/VERSION
cd $RT_DEST_DIR
echo " - Reinitializing git state"
git add .
git add --all --force .
git commit -a -m "riak_test init" --amend > /dev/null
2 changes: 1 addition & 1 deletion bin/rtdev-setup-releases.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ git init
git config user.name "Riak Test"
git config user.email "[email protected]"

git add .
git add --all --force .
git commit -a -m "riak_test init" > /dev/null
echo " - Successfully completed initial git commit of $RT_DEST_DIR"
3 changes: 2 additions & 1 deletion src/riak_test_escript.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ main(Args) ->
notice
end,

Formatter = {lager_default_formatter, [time," [",severity,"] ", pid, " ", message, "\n"]},
application:set_env(lager, error_logger_hwm, 250), %% helpful for debugging
application:set_env(lager, handlers, [{lager_console_backend, ConsoleLagerLevel},
application:set_env(lager, handlers, [{lager_console_backend, [ConsoleLagerLevel, Formatter]},
{lager_file_backend, [{file, "log/test.log"},
{level, ConsoleLagerLevel}]}]),
lager:start(),
Expand Down
8 changes: 5 additions & 3 deletions src/rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ staged_join(Node, PNode) ->

plan_and_commit(Node) ->
timer:sleep(500),
lager:info("planning and commiting cluster join"),
lager:info("planning cluster join"),
case rpc:call(Node, riak_core_claimant, plan, []) of
{error, ring_not_ready} ->
lager:info("plan: ring not ready"),
Expand All @@ -461,6 +461,7 @@ plan_and_commit(Node) ->
end.

do_commit(Node) ->
lager:info("planning cluster commit"),
case rpc:call(Node, riak_core_claimant, commit, []) of
{error, plan_changed} ->
lager:info("commit: plan changed"),
Expand All @@ -472,8 +473,9 @@ do_commit(Node) ->
timer:sleep(100),
maybe_wait_for_changes(Node),
do_commit(Node);
{error,nothing_planned} ->
{error, nothing_planned} ->
%% Assume plan actually committed somehow
lager:info("commit: nothing planned"),
ok;
ok ->
ok
Expand Down Expand Up @@ -662,7 +664,7 @@ wait_until(Fun) when is_function(Fun) ->

%% @doc Convenience wrapper for wait_until for the myriad functions that
%% take a node as single argument.
-spec wait_until([node()], fun((node()) -> boolean())) -> ok.
-spec wait_until(node(), fun(() -> boolean())) -> ok | {fail, Result :: term()}.
wait_until(Node, Fun) when is_atom(Node), is_function(Fun) ->
wait_until(fun() -> Fun(Node) end);

Expand Down
26 changes: 20 additions & 6 deletions src/yokozuna_rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
-include("yokozuna_rt.hrl").

-export([check_exists/2,
clear_trees/1,
commit/2,
expire_trees/1,
gen_keys/1,
Expand Down Expand Up @@ -225,6 +226,15 @@ expire_trees(Cluster) ->
timer:sleep(100),
ok.

%% @doc Expire YZ trees
-spec clear_trees([node()]) -> ok.
clear_trees(Cluster) ->
lager:info("Expire all trees"),
_ = [ok = rpc:call(Node, yz_entropy_mgr, clear_trees, [])
|| Node <- Cluster],
ok.


%% @doc Remove index directories, removing the index.
-spec remove_index_dirs([node()], index_name()) -> ok.
remove_index_dirs(Nodes, IndexName) ->
Expand Down Expand Up @@ -364,20 +374,24 @@ create_and_set_index(Cluster, Pid, Bucket, Index) ->
ok = riakc_pb_socket:create_search_index(Pid, Index),
%% For possible legacy upgrade reasons, wrap create index in a wait
wait_for_index(Cluster, Index),
set_index(Pid, Bucket, Index).
set_index(Pid, hd(Cluster), Bucket, Index).
-spec create_and_set_index([node()], pid(), bucket(), index_name(),
schema_name()) -> ok.
create_and_set_index(Cluster, Pid, Bucket, Index, Schema) ->
%% Create a search index and associate with a bucket
lager:info("Create a search index ~s with a custom schema named ~s and " ++
"associate it with bucket ~s", [Index, Schema, Bucket]),
"associate it with bucket ~p", [Index, Schema, Bucket]),
ok = riakc_pb_socket:create_search_index(Pid, Index, Schema, []),
%% For possible legacy upgrade reasons, wrap create index in a wait
wait_for_index(Cluster, Index),
set_index(Pid, Bucket, Index).

-spec set_index(pid(), bucket(), index_name()) -> ok.
set_index(Pid, Bucket, Index) ->
set_index(Pid, hd(Cluster), Bucket, Index).

-spec set_index(pid(), node(), bucket(), index_name()) -> ok.
set_index(_Pid, Node, {BucketType, _Bucket}, Index) ->
lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s",
[BucketType, Index]),
rt:create_and_activate_bucket_type(Node, BucketType, [{search_index, Index}]);
set_index(Pid, _Node, Bucket, Index) ->
ok = riakc_pb_socket:set_search_index(Pid, Bucket, Index).

internal_solr_url(Host, Port, Index) ->
Expand Down
22 changes: 19 additions & 3 deletions tests/bucket_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ confirm() ->
true ->
Obj01 = riakc_obj:new(<<"test">>, <<"JRD">>, <<"John Robert Doe, 25">>),
Obj02 = riakc_obj:new({Type, <<"test">>}, <<"JRD">>, <<"Jane Rachel Doe, 21">>),
Obj03 = riakc_obj:new({Type, <<"test">>}, <<"JRD2">>, <<"Jane2 Rachel2 Doe2, 22">>),

Obj1 = riakc_obj:update_metadata(Obj01,
riakc_obj:set_secondary_index(
Expand All @@ -302,8 +303,16 @@ confirm() ->
[<<"Jane">>, <<"Rachel">>
,<<"Doe">>]}])),

riakc_pb_socket:put(PB, Obj1),
riakc_pb_socket:put(PB, Obj2),
Obj3 = riakc_obj:update_metadata(Obj03,
riakc_obj:set_secondary_index(
riakc_obj:get_update_metadata(Obj03),
[{{integer_index, "age"},
[22]},{{binary_index, "name"},
[<<"Jane2">>, <<"Rachel2">>
,<<"Doe2">>]}])),
ok = riakc_pb_socket:put(PB, Obj1),
ok = riakc_pb_socket:put(PB, Obj2),
ok = riakc_pb_socket:put(PB, Obj3),

?assertMatch({ok, {index_results_v1, [<<"JRD">>], _, _}}, riakc_pb_socket:get_index(PB, <<"test">>,
{binary_index,
Expand All @@ -322,7 +331,14 @@ confirm() ->
"name"},
<<"Jane">>)),

%% wild stab at the undocumented cs_bucket_fold
?assertMatch({ok, {index_results_v1, [<<"JRD2">>], _, _}}, riakc_pb_socket:get_index(PB,
{Type,
<<"test">>},
{binary_index,
"name"},
<<"Jane2">>)),

%% wild stab at the undocumented cs_bucket_fold
{ok, ReqID} = riakc_pb_socket:cs_bucket_fold(PB, <<"test">>, []),
accumulate(ReqID),

Expand Down
74 changes: 52 additions & 22 deletions tests/proxy_overload_recovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,14 @@ prepare(ThresholdSeed) ->
{ok, VPid0} = riak_core_vnode_manager:get_vnode_pid(Id, riak_kv_vnode),
sys:resume(VPid0),
ok = supervisor:terminate_child(riak_core_vnode_sup, VPid0),
false = is_process_alive(VPid0),

%% Reset the proxy pid to make sure it resets state and picks up the new
%% environment variables
ok = supervisor:terminate_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),
RegName = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Index),
undefined = whereis(RegName),
VPid1 = wait_for_vnode_change(VPid0, Index),

{ok, PPid} = supervisor:restart_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),

%% Find the proxy pid and check it's alive and matches the supervisor
Expand All @@ -225,6 +226,7 @@ prepare(ThresholdSeed) ->
%% and return the Pid so we know we have the same Pid.
{ok, VPid} = riak_core_vnode_proxy:command_return_vnode(
{riak_kv_vnode,Index,node()}, timeout),
?assertEqual(VPid, VPid1),

true = is_process_alive(PPid),
true = is_process_alive(VPid),
Expand Down Expand Up @@ -264,14 +266,14 @@ resume_args(#tstate{rt = RT}) ->
resume(#rt{ppid = PPid, vpid = VPid}) ->
sys:resume(VPid),
%% Use the sys:get_status call to force a synchronous call
%% against the vnode proxy to ensure all messages sent by
%% against the vnode & the proxy to ensure all messages sent by
%% this process have been serviced and there are no pending
%% 'ping's in the vnode before we continue.
%% Then drain the vnode to make sure any pending pongs have
%% been sent.
ok = drain(VPid),
%% been sent, and ensure the proxy has
_ = sys:get_status(PPid),
_ = sys:get_status(VPid),
_ = sys:get_status(PPid).
ok = drain([VPid, PPid]).

resume_next(S, _V, _A) ->
S#tstate{vnode_running = true, proxy_msgs = 0, direct_msgs = 0}.
Expand Down Expand Up @@ -324,28 +326,28 @@ overloaded_args(#tstate{vnode_running = Running, rt = RT}) ->
overloaded(Running, #rt{ppid = PPid, vpid = VPid}) ->
case Running of
true ->
ok = drain(PPid), % make sure all proxy msgs processed/dropped
ok = drain(VPid); % make sure any pending ping/pongs are processed
ok = drain([PPid, VPid]);
_ ->
ok
end,
{riak_core_vnode_proxy:overloaded(PPid),
msgq_len(VPid), % just for debug so we can review in log output
sys:get_status(PPid)}. % ditto
{messages, PMsgs} = process_info(PPid, messages),
{messages, VMsgs} = process_info(VPid, messages),
Overloaded = riak_core_vnode_proxy:overloaded(PPid),
{Overloaded, {VMsgs, PMsgs}, sys:get_status(PPid)}.

overloaded_post(#tstate{threshold = undefined}, _A,
{R, _VnodeQ, _ProxyStatus}) ->
{R, _Messages, _ProxyStatus}) ->
%% If there are no thresholds there should never be an overload
eq(R, false);
overloaded_post(#tstate{vnode_running = true}, _A,
{R, _VnodeQ = 0, _ProxyStatus}) ->
{R, _Messages, _ProxyStatus}) ->
%% If the vnode is running, we have cleared queues so
%% should not be in overload.
eq(R, false);
overloaded_post(#tstate{vnode_running = false,
proxy_msgs = ProxyMsgs,
threshold = Threshold}, _A,
{ResultOverload, _VnodeQ, _ProxyStatus}) ->
{ResultOverload, _Messages, _ProxyStatus}) ->
%% Either
%% mailbox is completely an estimate based on proxy msgs
%% or mailbox is a check + estimate since
Expand Down Expand Up @@ -392,16 +394,33 @@ prep_env(Var, Val) ->
%% Wait until all messages are drained by the Pid. No guarantees
%% about future messages being sent, or that responses for the
%% last message consumed have been transmitted.
%%
drain(Pid) ->
case erlang:process_info(Pid, message_queue_len) of
{message_queue_len, 0} ->
%% NOTE: The "drain 3 times in a row" was determined empirically,
%% and may not be sufficient (2 was not). Given time constraints,
%% living with it for now. If this fails, we should really add some
%% tracing code around the send of messages to Vnode and Proxy to
%% determine where extra messages are coming from rather than just
%% make this "try 4 times"
%%
drain(Pid) when is_pid(Pid) ->
drain([Pid], {-1, -1});

drain(Pids) when is_list(Pids) ->
drain(Pids, {-1, -1}).
drain(Pids, {PrevPrev, Prev}) ->
_ = [sys:suspend(Pid) || Pid <- Pids],
Len = lists:foldl(fun(Pid, Acc0) ->
{message_queue_len, Len} = erlang:process_info(Pid, message_queue_len),
Acc0 + Len
end, 0, Pids),
_ = [sys:resume(Pid) || Pid <- Pids],
case {PrevPrev, Prev, Len} of
{0, 0, 0} ->
ok;
{message_queue_len, L} when L > 0 ->
timer:sleep(1), % give it a millisecond to drain
drain(Pid);
ER ->
ER
_ ->
%% Attempt to ensure something else is scheduled before we try to drain again
erlang:yield(),
timer:sleep(1),
drain(Pids, {Prev, Len})
end.

%% Return the length of the message queue (or crash if proc dead)
Expand Down Expand Up @@ -457,3 +476,14 @@ confirm() ->
pass.

-endif.


wait_for_vnode_change(VPid0, Index) ->
{ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode),
case VPid1 of
VPid0 ->
timer:sleep(1),
wait_for_vnode_change(VPid0, Index);
_ ->
VPid1
end.
11 changes: 7 additions & 4 deletions tests/repl_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,17 @@ wait_until_fullsync_stopped(SourceLeader) ->
end).

wait_for_reads(Node, Start, End, Bucket, R) ->
rt:wait_until(Node,
ok = rt:wait_until(Node,
fun(_) ->
Reads = rt:systest_read(Node, Start, End, Bucket, R, <<>>, true),
Reads == []
end),
Reads = rt:systest_read(Node, Start, End, Bucket, R, <<>>, true),
lager:info("Reads: ~p", [Reads]),
length(Reads).
%% rt:systest_read/6 returns a list of errors encountered while performing
%% the requested reads. Since we are asserting this list is empty above,
%% we already know that if we reached here, that the list of reads has
%% no errors. Therefore, we simply return 0 and do not execute another
%% systest_read call.
0.

get_fs_coord_status_item(Node, SinkName, ItemName) ->
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
Expand Down
9 changes: 8 additions & 1 deletion tests/verify_2i_returnterms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
stream_pb/3, http_query/3]).
-define(BUCKET, <<"2ibucket">>).
-define(FOO, <<"foo">>).
-define(BAZ, <<"baz">>).
-define(BAT, <<"bat">>).
-define(Q_OPTS, [{return_terms, true}]).

confirm() ->
Expand All @@ -38,15 +40,20 @@ confirm() ->

[put_an_object(PBPid, N) || N <- lists:seq(0, 100)],
[put_an_object(PBPid, int_to_key(N), N, ?FOO) || N <- lists:seq(101, 200)],
put_an_object(PBPid, int_to_key(201), 201, ?BAZ),
put_an_object(PBPid, int_to_key(202), 202, ?BAT),

%% Bucket, key, and index_eq queries should ignore `return_terms'
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 200)]),
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 202)]),
assertEqual(RiakHttp, PBPid, ExpectedKeys, {<<"$key">>, int_to_key(0), int_to_key(999)}, ?Q_OPTS, keys),
assertEqual(RiakHttp, PBPid, ExpectedKeys, { <<"$bucket">>, ?BUCKET}, ?Q_OPTS, keys),

ExpectedFooKeys = lists:sort([int_to_key(N) || N <- lists:seq(101, 200)]),
assertEqual(RiakHttp, PBPid, ExpectedFooKeys, {<<"field1_bin">>, ?FOO}, ?Q_OPTS, keys),

assertEqual(RiakHttp, PBPid, [int_to_key(201)], {<<"field1_bin">>, ?BAZ}, ?Q_OPTS, keys),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure to put some comments around/before the newly added expectations so we know why they were added and what they are testing.

assertEqual(RiakHttp, PBPid, [int_to_key(201)], {<<"field2_int">>, 201}, ?Q_OPTS, keys),

ExpectedRangeResults = lists:sort([{list_to_binary(integer_to_list(N)), int_to_key(N)} || N <- lists:seq(1, 100)]),
assertEqual(RiakHttp, PBPid, ExpectedRangeResults, {<<"field2_int">>, "1", "100"}, ?Q_OPTS, results),

Expand Down
Loading