diff --git a/bin/rtdev-install.sh b/bin/rtdev-install.sh index 00ed12001..920ac9ecc 100755 --- a/bin/rtdev-install.sh +++ b/bin/rtdev-install.sh @@ -37,5 +37,5 @@ echo " - Writing $RT_DEST_DIR/$RELEASE/VERSION" echo -n $VERSION > $RT_DEST_DIR/$RELEASE/VERSION cd $RT_DEST_DIR echo " - Reinitializing git state" -git add . +git add --all --force . git commit -a -m "riak_test init" --amend > /dev/null diff --git a/bin/rtdev-setup-releases.sh b/bin/rtdev-setup-releases.sh index a692e5a21..cb2756ee7 100755 --- a/bin/rtdev-setup-releases.sh +++ b/bin/rtdev-setup-releases.sh @@ -40,6 +40,6 @@ git init git config user.name "Riak Test" git config user.email "dev@basho.com" -git add . +git add --all --force . git commit -a -m "riak_test init" > /dev/null echo " - Successfully completed initial git commit of $RT_DEST_DIR" diff --git a/src/riak_test_escript.erl b/src/riak_test_escript.erl index b883364cc..e5c7449ef 100644 --- a/src/riak_test_escript.erl +++ b/src/riak_test_escript.erl @@ -110,8 +110,9 @@ main(Args) -> notice end, + Formatter = {lager_default_formatter, [time," [",severity,"] ", pid, " ", message, "\n"]}, application:set_env(lager, error_logger_hwm, 250), %% helpful for debugging - application:set_env(lager, handlers, [{lager_console_backend, ConsoleLagerLevel}, + application:set_env(lager, handlers, [{lager_console_backend, [ConsoleLagerLevel, Formatter]}, {lager_file_backend, [{file, "log/test.log"}, {level, ConsoleLagerLevel}]}]), lager:start(), diff --git a/src/rt.erl b/src/rt.erl index 96f31e2c1..8dfc385da 100644 --- a/src/rt.erl +++ b/src/rt.erl @@ -449,7 +449,7 @@ staged_join(Node, PNode) -> plan_and_commit(Node) -> timer:sleep(500), - lager:info("planning and commiting cluster join"), + lager:info("planning cluster join"), case rpc:call(Node, riak_core_claimant, plan, []) of {error, ring_not_ready} -> lager:info("plan: ring not ready"), @@ -461,6 +461,7 @@ plan_and_commit(Node) -> end. do_commit(Node) -> + lager:info("planning cluster commit"), case rpc:call(Node, riak_core_claimant, commit, []) of {error, plan_changed} -> lager:info("commit: plan changed"), @@ -472,8 +473,9 @@ do_commit(Node) -> timer:sleep(100), maybe_wait_for_changes(Node), do_commit(Node); - {error,nothing_planned} -> + {error, nothing_planned} -> %% Assume plan actually committed somehow + lager:info("commit: nothing planned"), ok; ok -> ok @@ -662,7 +664,7 @@ wait_until(Fun) when is_function(Fun) -> %% @doc Convenience wrapper for wait_until for the myriad functions that %% take a node as single argument. --spec wait_until([node()], fun((node()) -> boolean())) -> ok. +-spec wait_until(node(), fun(() -> boolean())) -> ok | {fail, Result :: term()}. wait_until(Node, Fun) when is_atom(Node), is_function(Fun) -> wait_until(fun() -> Fun(Node) end); diff --git a/src/yokozuna_rt.erl b/src/yokozuna_rt.erl index 7723d3e5f..3ddaee5ab 100644 --- a/src/yokozuna_rt.erl +++ b/src/yokozuna_rt.erl @@ -23,6 +23,7 @@ -include("yokozuna_rt.hrl"). -export([check_exists/2, + clear_trees/1, commit/2, expire_trees/1, gen_keys/1, @@ -225,6 +226,15 @@ expire_trees(Cluster) -> timer:sleep(100), ok. +%% @doc Expire YZ trees +-spec clear_trees([node()]) -> ok. +clear_trees(Cluster) -> + lager:info("Expire all trees"), + _ = [ok = rpc:call(Node, yz_entropy_mgr, clear_trees, []) + || Node <- Cluster], + ok. + + %% @doc Remove index directories, removing the index. -spec remove_index_dirs([node()], index_name()) -> ok. remove_index_dirs(Nodes, IndexName) -> @@ -364,20 +374,24 @@ create_and_set_index(Cluster, Pid, Bucket, Index) -> ok = riakc_pb_socket:create_search_index(Pid, Index), %% For possible legacy upgrade reasons, wrap create index in a wait wait_for_index(Cluster, Index), - set_index(Pid, Bucket, Index). + set_index(Pid, hd(Cluster), Bucket, Index). -spec create_and_set_index([node()], pid(), bucket(), index_name(), schema_name()) -> ok. create_and_set_index(Cluster, Pid, Bucket, Index, Schema) -> %% Create a search index and associate with a bucket lager:info("Create a search index ~s with a custom schema named ~s and " ++ - "associate it with bucket ~s", [Index, Schema, Bucket]), + "associate it with bucket ~p", [Index, Schema, Bucket]), ok = riakc_pb_socket:create_search_index(Pid, Index, Schema, []), %% For possible legacy upgrade reasons, wrap create index in a wait wait_for_index(Cluster, Index), - set_index(Pid, Bucket, Index). - --spec set_index(pid(), bucket(), index_name()) -> ok. -set_index(Pid, Bucket, Index) -> + set_index(Pid, hd(Cluster), Bucket, Index). + +-spec set_index(pid(), node(), bucket(), index_name()) -> ok. +set_index(_Pid, Node, {BucketType, _Bucket}, Index) -> + lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s", + [BucketType, Index]), + rt:create_and_activate_bucket_type(Node, BucketType, [{search_index, Index}]); +set_index(Pid, _Node, Bucket, Index) -> ok = riakc_pb_socket:set_search_index(Pid, Bucket, Index). internal_solr_url(Host, Port, Index) -> diff --git a/tests/bucket_types.erl b/tests/bucket_types.erl index 1715cee82..92f29a762 100644 --- a/tests/bucket_types.erl +++ b/tests/bucket_types.erl @@ -285,6 +285,7 @@ confirm() -> true -> Obj01 = riakc_obj:new(<<"test">>, <<"JRD">>, <<"John Robert Doe, 25">>), Obj02 = riakc_obj:new({Type, <<"test">>}, <<"JRD">>, <<"Jane Rachel Doe, 21">>), + Obj03 = riakc_obj:new({Type, <<"test">>}, <<"JRD2">>, <<"Jane2 Rachel2 Doe2, 22">>), Obj1 = riakc_obj:update_metadata(Obj01, riakc_obj:set_secondary_index( @@ -302,8 +303,16 @@ confirm() -> [<<"Jane">>, <<"Rachel">> ,<<"Doe">>]}])), - riakc_pb_socket:put(PB, Obj1), - riakc_pb_socket:put(PB, Obj2), + Obj3 = riakc_obj:update_metadata(Obj03, + riakc_obj:set_secondary_index( + riakc_obj:get_update_metadata(Obj03), + [{{integer_index, "age"}, + [22]},{{binary_index, "name"}, + [<<"Jane2">>, <<"Rachel2">> + ,<<"Doe2">>]}])), + ok = riakc_pb_socket:put(PB, Obj1), + ok = riakc_pb_socket:put(PB, Obj2), + ok = riakc_pb_socket:put(PB, Obj3), ?assertMatch({ok, {index_results_v1, [<<"JRD">>], _, _}}, riakc_pb_socket:get_index(PB, <<"test">>, {binary_index, @@ -322,7 +331,14 @@ confirm() -> "name"}, <<"Jane">>)), - %% wild stab at the undocumented cs_bucket_fold + ?assertMatch({ok, {index_results_v1, [<<"JRD2">>], _, _}}, riakc_pb_socket:get_index(PB, + {Type, + <<"test">>}, + {binary_index, + "name"}, + <<"Jane2">>)), + + %% wild stab at the undocumented cs_bucket_fold {ok, ReqID} = riakc_pb_socket:cs_bucket_fold(PB, <<"test">>, []), accumulate(ReqID), diff --git a/tests/proxy_overload_recovery.erl b/tests/proxy_overload_recovery.erl index 111320422..03248d008 100644 --- a/tests/proxy_overload_recovery.erl +++ b/tests/proxy_overload_recovery.erl @@ -208,13 +208,14 @@ prepare(ThresholdSeed) -> {ok, VPid0} = riak_core_vnode_manager:get_vnode_pid(Id, riak_kv_vnode), sys:resume(VPid0), ok = supervisor:terminate_child(riak_core_vnode_sup, VPid0), - false = is_process_alive(VPid0), %% Reset the proxy pid to make sure it resets state and picks up the new %% environment variables ok = supervisor:terminate_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}), RegName = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Index), undefined = whereis(RegName), + VPid1 = wait_for_vnode_change(VPid0, Index), + {ok, PPid} = supervisor:restart_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}), %% Find the proxy pid and check it's alive and matches the supervisor @@ -225,6 +226,7 @@ prepare(ThresholdSeed) -> %% and return the Pid so we know we have the same Pid. {ok, VPid} = riak_core_vnode_proxy:command_return_vnode( {riak_kv_vnode,Index,node()}, timeout), + ?assertEqual(VPid, VPid1), true = is_process_alive(PPid), true = is_process_alive(VPid), @@ -264,14 +266,14 @@ resume_args(#tstate{rt = RT}) -> resume(#rt{ppid = PPid, vpid = VPid}) -> sys:resume(VPid), %% Use the sys:get_status call to force a synchronous call - %% against the vnode proxy to ensure all messages sent by + %% against the vnode & the proxy to ensure all messages sent by %% this process have been serviced and there are no pending %% 'ping's in the vnode before we continue. %% Then drain the vnode to make sure any pending pongs have - %% been sent. - ok = drain(VPid), + %% been sent, and ensure the proxy has + _ = sys:get_status(PPid), _ = sys:get_status(VPid), - _ = sys:get_status(PPid). + ok = drain([VPid, PPid]). resume_next(S, _V, _A) -> S#tstate{vnode_running = true, proxy_msgs = 0, direct_msgs = 0}. @@ -324,28 +326,28 @@ overloaded_args(#tstate{vnode_running = Running, rt = RT}) -> overloaded(Running, #rt{ppid = PPid, vpid = VPid}) -> case Running of true -> - ok = drain(PPid), % make sure all proxy msgs processed/dropped - ok = drain(VPid); % make sure any pending ping/pongs are processed + ok = drain([PPid, VPid]); _ -> ok end, - {riak_core_vnode_proxy:overloaded(PPid), - msgq_len(VPid), % just for debug so we can review in log output - sys:get_status(PPid)}. % ditto + {messages, PMsgs} = process_info(PPid, messages), + {messages, VMsgs} = process_info(VPid, messages), + Overloaded = riak_core_vnode_proxy:overloaded(PPid), + {Overloaded, {VMsgs, PMsgs}, sys:get_status(PPid)}. overloaded_post(#tstate{threshold = undefined}, _A, - {R, _VnodeQ, _ProxyStatus}) -> + {R, _Messages, _ProxyStatus}) -> %% If there are no thresholds there should never be an overload eq(R, false); overloaded_post(#tstate{vnode_running = true}, _A, - {R, _VnodeQ = 0, _ProxyStatus}) -> + {R, _Messages, _ProxyStatus}) -> %% If the vnode is running, we have cleared queues so %% should not be in overload. eq(R, false); overloaded_post(#tstate{vnode_running = false, proxy_msgs = ProxyMsgs, threshold = Threshold}, _A, - {ResultOverload, _VnodeQ, _ProxyStatus}) -> + {ResultOverload, _Messages, _ProxyStatus}) -> %% Either %% mailbox is completely an estimate based on proxy msgs %% or mailbox is a check + estimate since @@ -392,16 +394,33 @@ prep_env(Var, Val) -> %% Wait until all messages are drained by the Pid. No guarantees %% about future messages being sent, or that responses for the %% last message consumed have been transmitted. -%% -drain(Pid) -> - case erlang:process_info(Pid, message_queue_len) of - {message_queue_len, 0} -> +%% NOTE: The "drain 3 times in a row" was determined empirically, +%% and may not be sufficient (2 was not). Given time constraints, +%% living with it for now. If this fails, we should really add some +%% tracing code around the send of messages to Vnode and Proxy to +%% determine where extra messages are coming from rather than just +%% make this "try 4 times" +%% +drain(Pid) when is_pid(Pid) -> + drain([Pid], {-1, -1}); + +drain(Pids) when is_list(Pids) -> + drain(Pids, {-1, -1}). +drain(Pids, {PrevPrev, Prev}) -> + _ = [sys:suspend(Pid) || Pid <- Pids], + Len = lists:foldl(fun(Pid, Acc0) -> + {message_queue_len, Len} = erlang:process_info(Pid, message_queue_len), + Acc0 + Len + end, 0, Pids), + _ = [sys:resume(Pid) || Pid <- Pids], + case {PrevPrev, Prev, Len} of + {0, 0, 0} -> ok; - {message_queue_len, L} when L > 0 -> - timer:sleep(1), % give it a millisecond to drain - drain(Pid); - ER -> - ER + _ -> + %% Attempt to ensure something else is scheduled before we try to drain again + erlang:yield(), + timer:sleep(1), + drain(Pids, {Prev, Len}) end. %% Return the length of the message queue (or crash if proc dead) @@ -457,3 +476,14 @@ confirm() -> pass. -endif. + + +wait_for_vnode_change(VPid0, Index) -> + {ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode), + case VPid1 of + VPid0 -> + timer:sleep(1), + wait_for_vnode_change(VPid0, Index); + _ -> + VPid1 + end. diff --git a/tests/repl_util.erl b/tests/repl_util.erl index a7be23f35..d8984374c 100644 --- a/tests/repl_util.erl +++ b/tests/repl_util.erl @@ -195,14 +195,17 @@ wait_until_fullsync_stopped(SourceLeader) -> end). wait_for_reads(Node, Start, End, Bucket, R) -> - rt:wait_until(Node, + ok = rt:wait_until(Node, fun(_) -> Reads = rt:systest_read(Node, Start, End, Bucket, R, <<>>, true), Reads == [] end), - Reads = rt:systest_read(Node, Start, End, Bucket, R, <<>>, true), - lager:info("Reads: ~p", [Reads]), - length(Reads). + %% rt:systest_read/6 returns a list of errors encountered while performing + %% the requested reads. Since we are asserting this list is empty above, + %% we already know that if we reached here, that the list of reads has + %% no errors. Therefore, we simply return 0 and do not execute another + %% systest_read call. + 0. get_fs_coord_status_item(Node, SinkName, ItemName) -> Status = rpc:call(Node, riak_repl_console, status, [quiet]), diff --git a/tests/verify_2i_returnterms.erl b/tests/verify_2i_returnterms.erl index 7a9f50ee4..f2155ce1e 100644 --- a/tests/verify_2i_returnterms.erl +++ b/tests/verify_2i_returnterms.erl @@ -25,6 +25,8 @@ stream_pb/3, http_query/3]). -define(BUCKET, <<"2ibucket">>). -define(FOO, <<"foo">>). +-define(BAZ, <<"baz">>). +-define(BAT, <<"bat">>). -define(Q_OPTS, [{return_terms, true}]). confirm() -> @@ -38,15 +40,20 @@ confirm() -> [put_an_object(PBPid, N) || N <- lists:seq(0, 100)], [put_an_object(PBPid, int_to_key(N), N, ?FOO) || N <- lists:seq(101, 200)], + put_an_object(PBPid, int_to_key(201), 201, ?BAZ), + put_an_object(PBPid, int_to_key(202), 202, ?BAT), %% Bucket, key, and index_eq queries should ignore `return_terms' - ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 200)]), + ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 202)]), assertEqual(RiakHttp, PBPid, ExpectedKeys, {<<"$key">>, int_to_key(0), int_to_key(999)}, ?Q_OPTS, keys), assertEqual(RiakHttp, PBPid, ExpectedKeys, { <<"$bucket">>, ?BUCKET}, ?Q_OPTS, keys), ExpectedFooKeys = lists:sort([int_to_key(N) || N <- lists:seq(101, 200)]), assertEqual(RiakHttp, PBPid, ExpectedFooKeys, {<<"field1_bin">>, ?FOO}, ?Q_OPTS, keys), + assertEqual(RiakHttp, PBPid, [int_to_key(201)], {<<"field1_bin">>, ?BAZ}, ?Q_OPTS, keys), + assertEqual(RiakHttp, PBPid, [int_to_key(201)], {<<"field2_int">>, 201}, ?Q_OPTS, keys), + ExpectedRangeResults = lists:sort([{list_to_binary(integer_to_list(N)), int_to_key(N)} || N <- lists:seq(1, 100)]), assertEqual(RiakHttp, PBPid, ExpectedRangeResults, {<<"field2_int">>, "1", "100"}, ?Q_OPTS, results), diff --git a/tests/yz_extractors.erl b/tests/yz_extractors.erl index 84f7d8b5e..cf596c43c 100644 --- a/tests/yz_extractors.erl +++ b/tests/yz_extractors.erl @@ -28,10 +28,12 @@ -include_lib("riakc/include/riakc.hrl"). -define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))). +-define(TYPE1, <<"extractors_in_paradise">>). +-define(TYPE2, <<"extractors_in_paradiso">>). -define(INDEX1, <<"test_idx1">>). --define(BUCKET1, <<"test_bkt1">>). +-define(BUCKET1, {?TYPE1, <<"test_bkt1">>}). -define(INDEX2, <<"test_idx2">>). --define(BUCKET2, <<"test_bkt2">>). +-define(BUCKET2, {?TYPE2, <<"test_bkt2">>}). -define(SCHEMANAME, <<"test">>). -define(TEST_SCHEMA, <<" @@ -205,6 +207,8 @@ confirm() -> %% Upgrade yokozuna_rt:rolling_upgrade(Cluster, current), + [rt:wait_until_ready(ANode) || ANode <- Cluster], + [rt:assert_capability(ANode, ?YZ_CAP, true) || ANode <- Cluster], [rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [true, false]) || ANode <- Cluster], @@ -278,9 +282,9 @@ get_map(Node) -> verify_extractor(Node, PacketData, Mod) -> rpc:call(Node, yz_extractor, run, [PacketData, Mod]). -bucket_url({Host,Port}, BName, Key) -> - ?FMT("http://~s:~B/buckets/~s/keys/~s", - [Host, Port, BName, Key]). +bucket_url({Host,Port}, {BType, BName}, Key) -> + ?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s", + [Host, Port, BType, BName, Key]). test_extractor_works(Cluster, Packet) -> [rt_intercept:add(ANode, {yz_noop_extractor, @@ -304,7 +308,7 @@ test_extractor_with_aae_expire(Cluster, Index, Bucket, Packet) -> {Host, Port} = rt:select_random(yokozuna_rt:host_entries( rt:connection_info( Cluster))), - URL = bucket_url({Host, Port}, mochiweb_util:quote_plus(Bucket), + URL = bucket_url({Host, Port}, Bucket, mochiweb_util:quote_plus(Key)), CT = ?EXTRACTOR_CT, @@ -326,8 +330,13 @@ test_extractor_with_aae_expire(Cluster, Index, Bucket, Packet) -> yokozuna_rt:override_schema(APid, Cluster, Index, ?SCHEMANAME, ?TEST_SCHEMA_UPGRADE), + {ok, "200", RHeaders, _} = ibrowse:send_req(URL, [{"Content-Type", CT}], get, + [], []), + VC = proplists:get_value("X-Riak-Vclock", RHeaders), + {ok, "204", _, _} = ibrowse:send_req( - URL, [{"Content-Type", CT}], put, Packet), + URL, [{"Content-Type", CT}, {"X-Riak-Vclock", VC}], + put, Packet), yokozuna_rt:commit(Cluster, Index), yokozuna_rt:search_expect({Host, Port}, Index, <<"method">>, diff --git a/tests/yz_handoff.erl b/tests/yz_handoff.erl index ab91d3bdb..4c5b1af6d 100644 --- a/tests/yz_handoff.erl +++ b/tests/yz_handoff.erl @@ -99,8 +99,7 @@ confirm() -> join_node = Node1, admin_node = Node2}], - %% Run Shell Script to count/test # of replicas and leave/join - %% nodes from the cluster + %% Run set of leave/join trials and count/test #'s from the cluster [[begin check_data(Nodes, KeyCount, BucketURL, SearchURL, State), check_counts(Pid, KeyCount, BucketURL)