diff --git a/README.md b/README.md index 28a3ffdc7..de9680283 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ contents of `$HOME/rt/riak` might look something like this: ``` $ ls $HOME/rt/riak -current riak-1.2.1 riak-1.3.2 riak-1.4.10 +current riak-1.4.12 riak-2.0.2 riak-2.0.4 riak-2.0.6 ``` Inside each of these directories is a `dev` folder, typically @@ -73,7 +73,7 @@ The first one that we want to look at is `rtdev-build-releases.sh`. If left unchanged, this script is going to do the following: 1. Download the source for the past three major Riak versions (e.g. - 1.3.2, 1.4.10 and 2.0.0) + 1.4.12, 2.0.6 and 2.1.2) 1. Build the proper version of Erlang that release was built with, using kerl (which it will also download) 1. Build those releases of Riak. @@ -113,14 +113,6 @@ same directory that you just built all of your releases into. By default this script initializes the repository into `$HOME/rt/riak` but you can override [`$RT_DEST_DIR`](https://github.com/basho/riak_test/blob/master/bin/rtdev-setup-releases.sh#L11). -**Note**: There is a bug in 1.3.x `leveldb` which does not properly resolve -the location of `pthread.h` when building on Macintosh OS X 10.9, aka -Mavericks. This has been fixed in subsequent releases, but for now a fix -is to manually add `#include ` to the top of -`deps/eleveldb/c_src/leveldb/include/leveldb/env.h`. Also the version -of `meck` needs to be updated, too. This is handled autmatically by -the script. - ### rtdev-current.sh `rtdev-current.sh` is where it gets interesting. You need to run that @@ -188,8 +180,10 @@ to tell riak_test about them. The method of choice is to create a {rt_project, "riak"}, {rtdev_path, [{root, "/home/you/rt/riak"}, {current, "/home/you/rt/riak/current"}, - {previous, "/home/you/rt/riak/riak-1.4.10"}, - {legacy, "/home/you/rt/riak/riak-1.3.2"} + {previous, "/home/you/rt/riak/riak-2.0.6"}, + {legacy, "/home/you/rt/riak/riak-1.4.12"}, + {'2.0.2', "/home/you/rt/riak/riak-2.0.2"}, + {'2.0.4', "/home/you/rt/riak/riak-2.0.4"} ]} ]}. ``` @@ -234,7 +228,9 @@ Tests that do not include coverage annotations will, if cover is enabled, honor #### Web hooks When reporting is enabled, each test result is posted to [Giddy Up](http://giddyup.basho.com). You can specify any number of webhooks that will also receive a POST request with JSON formatted test information, plus the URL -of the Giddy Up resource page. +of the Giddy Up resource page. + +N.B.: This configuration setting is optional, and *NOT* required any more for GiddyUp. ```erlang {webhooks, [ diff --git a/bin/rtdev-all.sh b/bin/rtdev-all.sh index 32d9595b7..174d086c4 100755 --- a/bin/rtdev-all.sh +++ b/bin/rtdev-all.sh @@ -4,7 +4,7 @@ ORIGDIR=`pwd` pushd `dirname $0` > /dev/null SCRIPT_DIR=`pwd` popd > /dev/null -: ${CURRENT_OTP:=$HOME/erlang-R16B02} +: ${CURRENT_OTP:="$HOME/erlang-R16B02"} : ${RT_CURRENT_TAG:=""} diff --git a/bin/rtdev-build-releases.sh b/bin/rtdev-build-releases.sh index e170a31d1..a4e4b175e 100755 --- a/bin/rtdev-build-releases.sh +++ b/bin/rtdev-build-releases.sh @@ -19,11 +19,19 @@ : ${R15B01:=$HOME/erlang-R15B01} : ${R16B02:=$HOME/erlang-R16B02} +# These are the default tags to use when building basho OTP releases. +# Export different tags to get a different build. N.B. You will need to +# remove the builds from kerl (e.g., kerl delete build $BUILDNAME) and +# possibly remove the directories above. +: ${R16_TAG:="OTP_R16B02_basho9"} +: ${R15_TAG:="basho_OTP_R15B01p"} + # By default the Open Source version of Riak will be used, but for internal # testing you can override this variable to use `riak_ee` instead : ${RT_USE_EE:=""} GITURL_RIAK="git://github.com/basho/riak" GITURL_RIAK_EE="git@github.com:basho/riak_ee" +GITDIR="riak-src" checkbuild() @@ -44,7 +52,7 @@ checkbuild() echo "You need 'curl' to be able to run this script, exiting" exit 1 fi - curl -O https://raw.github.com/spawngrid/kerl/master/kerl > /dev/null 2>&1; chmod a+x kerl + curl -O https://raw.githubusercontent.com/spawngrid/kerl/master/kerl > /dev/null 2>&1; chmod a+x kerl fi fi fi @@ -55,8 +63,32 @@ kerl() RELEASE=$1 BUILDNAME=$2 + export CFLAGS="-g -O2" + export LDFLAGS="-g" + if [ -n "`uname -r | grep el6`" ]; then + export CFLAGS="-g -DOPENSSL_NO_EC=1" + fi + BUILDFLAGS="--disable-hipe --enable-smp-support --without-odbc" + if [ $(uname -s) = "Darwin" ]; then + export CFLAGS="-g -O0" + BUILDFLAGS="$BUILDFLAGS --enable-darwin-64bit --with-dynamic-trace=dtrace" + else + BUILDFLAGS="$BUILDFLAGS --enable-m64-build" + fi + + KERL_ENV="KERL_CONFIGURE_OPTIONS=${BUILDFLAGS}" + MAKE="make -j10" + echo " - Building Erlang $RELEASE (this could take a while)" - ./kerl build $RELEASE $BUILDNAME > /dev/null 2>&1 + # Use the Basho-patched version of Erlang + if [ "$RELEASE" == "R15B01" ]; then + BUILD_CMD="./kerl build git git://github.com/basho/otp.git $R15_TAG $BUILDNAME" + elif [ "$RELEASE" == "R16B02" ]; then + BUILD_CMD="./kerl build git git://github.com/basho/otp.git $R16_TAG $BUILDNAME" + else + BUILD_CMD="./kerl build $RELEASE $BUILDNAME" + fi + env "$KERL_ENV" "MAKE=$MAKE" $BUILD_CMD RES=$? if [ "$RES" -ne 0 ]; then echo "[ERROR] Kerl build $BUILDNAME failed" @@ -64,7 +96,7 @@ kerl() fi echo " - Installing $RELEASE into $HOME/$BUILDNAME" - ./kerl install $BUILDNAME $HOME/$BUILDNAME > /dev/null 2>&1 + ./kerl install $BUILDNAME "$HOME/$BUILDNAME" > /dev/null 2>&1 RES=$? if [ "$RES" -ne 0 ]; then echo "[ERROR] Kerl install $BUILDNAME failed" @@ -77,6 +109,12 @@ build() SRCDIR=$1 ERLROOT=$2 TAG="$3" + if [ -z $4 ]; then + LOCKED_DEPS=true + else + LOCKED_DEPS=$4 + fi + if [ -z "$RT_USE_EE" ]; then GITURL=$GITURL_RIAK GITTAG=riak-$TAG @@ -85,6 +123,15 @@ build() GITTAG=riak_ee-$TAG fi + echo "Getting sources from github" + if [ ! -d $GITDIR ]; then + git clone $GITURL $GITDIR + else + cd $GITDIR + git pull origin develop + cd .. + fi + echo "Building $SRCDIR:" checkbuild $ERLROOT @@ -94,83 +141,53 @@ build() kerl $RELEASE $BUILDNAME fi - GITRES=1 - echo " - Cloning $GITURL" - rm -rf $SRCDIR - git clone $GITURL $SRCDIR - GITRES=$? - if [ $GITRES -eq 0 -a -n "$TAG" ]; then - cd $SRCDIR - git checkout $GITTAG + if [ ! -d $SRCDIR ] + then + GITRES=1 + echo " - Cloning $GITURL" + git clone $GITDIR $SRCDIR GITRES=$? - cd .. + if [ $GITRES -eq 0 -a -n "$TAG" ]; then + cd $SRCDIR + git checkout $GITTAG + GITRES=$? + cd .. + fi fi - RUN="env PATH=$ERLROOT/bin:$ERLROOT/lib/erlang/bin:$PATH \ + if [ ! -f "$SRCDIR/built" ] + then + + RUN="env PATH=$ERLROOT/bin:$ERLROOT/lib/erlang/bin:$PATH \ C_INCLUDE_PATH=$ERLROOT/usr/include \ LD_LIBRARY_PATH=$ERLROOT/usr/lib" - fix_riak_1_3 $SRCDIR $TAG "$RUN" - - echo " - Building stagedevrel in $SRCDIR (this could take a while)" - cd $SRCDIR - $RUN make all stagedevrel - RES=$? - if [ "$RES" -ne 0 ]; then - echo "[ERROR] make stagedevrel failed" - exit 1 - fi - cd .. - echo " - $SRCDIR built." -} + echo " - Building devrel in $SRCDIR (this could take a while)" + cd $SRCDIR -# Riak 1.3 has a few artifacts which need to be updated in order to build -# properly -fix_riak_1_3() -{ - SRCDIR=$1 - TAG="$2" - RUN="$3" + if $LOCKED_DEPS + then + CMD="make locked-deps devrel" + else + CMD="make all devrel" + fi - if [ "`echo $TAG | cut -d . -f1-2`" != "1.3" ]; then - return 0 + $RUN $CMD + RES=$? + if [ "$RES" -ne 0 ]; then + echo "[ERROR] make devrel failed" + exit 1 + fi + touch built + cd .. + echo " - $SRCDIR built." + else + echo " - already built" fi - - echo "- Patching Riak 1.3.x" - cd $SRCDIR - cat < - #include - #include -+#include - #include "leveldb/perf_count.h" - #include "leveldb/status.h" -EOF - cd ../../../../../../.. } -build "riak-1.4.10" $R15B01 http://s3.amazonaws.com/downloads.basho.com/riak/1.4/1.4.10/riak-1.4.10.tar.gz -echo -if [ -z "$RT_USE_EE" ]; then - build "riak-1.3.2" $R15B01 1.3.2 -else - build "riak-1.3.4" $R15B01 1.3.4 -fi +build "riak-1.4.12" $R15B01 1.4.12 false +build "riak-2.0.2" $R16B02 2.0.2 +build "riak-2.0.4" $R16B02 2.0.4 +build "riak-2.0.6" $R16B02 2.0.6 echo diff --git a/bin/rtdev-setup-releases.sh b/bin/rtdev-setup-releases.sh index cb2756ee7..cbc1f529b 100755 --- a/bin/rtdev-setup-releases.sh +++ b/bin/rtdev-setup-releases.sh @@ -24,6 +24,8 @@ then echo " - Initializing $RT_DEST_DIR/$vsn" mkdir -p "$RT_DEST_DIR/$vsn" cp -p -P -R "$rel" "$RT_DEST_DIR/$vsn" + # Route out the product and version from Git + (cd "$rel"; VERSION="$(git describe --tags)"; echo -n $VERSION > $RT_DEST_DIR/$vsn/VERSION) done else # This is useful when only testing with 'current' diff --git a/intercepts/hashtree_intercepts.erl b/intercepts/hashtree_intercepts.erl index a9bfb32ce..9439a0319 100644 --- a/intercepts/hashtree_intercepts.erl +++ b/intercepts/hashtree_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(hashtree_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/init_intercepts.erl b/intercepts/init_intercepts.erl index ea423e080..b7c61bb47 100644 --- a/intercepts/init_intercepts.erl +++ b/intercepts/init_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(init_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/intercept.erl b/intercepts/intercept.erl index b907ed072..7d8bf119c 100644 --- a/intercepts/intercept.erl +++ b/intercepts/intercept.erl @@ -1,8 +1,28 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(intercept). %% Export explicit API but also send compile directive to export all %% because some of these private functions are useful in their own %% right. --export([add/3, add/4]). +-export([add/3, add/4, clean/1]). -compile(export_all). -type abstract_code() :: term(). @@ -34,7 +54,7 @@ %% functions. %% %% E.g. `[{{update_perform,2}, sleep_update_perform}]' --spec add(module(), module(), mapping()) -> ok. +-spec add(module(), module(), mapping(), string()) -> ok. add(Target, Intercept, Mapping, OutDir) -> Original = ?ORIGINAL(Target), TargetAC = get_abstract_code(Target), @@ -46,9 +66,22 @@ add(Target, Intercept, Mapping, OutDir) -> ok = compile_and_load(Original, OrigAC, OutDir), ok = compile_and_load(Target, ProxyAC, OutDir). +-spec add(module(), module(), mapping()) -> ok. add(Target, Intercept, Mapping) -> add(Target, Intercept, Mapping, undefined). +%% @doc Cleanup proxy and backuped original module +-spec clean(module()) -> ok|{error, term()}. +clean(Target) -> + _ = code:purge(Target), + _ = code:purge(?ORIGINAL(Target)), + case code:load_file(Target) of + {module, Target} -> + ok; + {error, Reason} -> + {error, Reason} + end. + %% @private %% %% @doc Compile the abstract code `AC' and load it into the code server. diff --git a/intercepts/intercept.hrl b/intercepts/intercept.hrl index 021080504..d292e0ba1 100644 --- a/intercepts/intercept.hrl +++ b/intercepts/intercept.hrl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -define(I_TAG(S), "INTERCEPT: " ++ S). -define(I_INFO(Msg), error_logger:info_msg(?I_TAG(Msg))). -define(I_INFO(Msg, Args), error_logger:info_msg(?I_TAG(Msg), Args)). diff --git a/intercepts/riak_core_broadcast_intercepts.erl b/intercepts/riak_core_broadcast_intercepts.erl index 4e1419e99..9a391bf58 100644 --- a/intercepts/riak_core_broadcast_intercepts.erl +++ b/intercepts/riak_core_broadcast_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_core_broadcast_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_core_connection_intercepts.erl b/intercepts/riak_core_connection_intercepts.erl index 196adbe07..f47ac52ec 100644 --- a/intercepts/riak_core_connection_intercepts.erl +++ b/intercepts/riak_core_connection_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_core_connection_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_core_handoff_sender_intercepts.erl b/intercepts/riak_core_handoff_sender_intercepts.erl index f5fb62ad5..f8e0f2d84 100644 --- a/intercepts/riak_core_handoff_sender_intercepts.erl +++ b/intercepts/riak_core_handoff_sender_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_core_handoff_sender_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_core_ring_manager_intercepts.erl b/intercepts/riak_core_ring_manager_intercepts.erl index dfd4aae37..7f8416d43 100644 --- a/intercepts/riak_core_ring_manager_intercepts.erl +++ b/intercepts/riak_core_ring_manager_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_core_ring_manager_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_core_vnode_manager_intercepts.erl b/intercepts/riak_core_vnode_manager_intercepts.erl new file mode 100644 index 000000000..e96fdb10a --- /dev/null +++ b/intercepts/riak_core_vnode_manager_intercepts.erl @@ -0,0 +1,48 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + +-module(riak_core_vnode_manager_intercepts). +-include("intercept.hrl"). +%% API +-compile(export_all). +-define(M, riak_core_vnode_manager_orig). + +return_dead_process_pid_from_get_vnode_pid(Index, VNodeMod = riak_pipe_vnode) -> + %% ?I_INFO("Intercepting riak_core_vnode_master:get_vnode_pid"), + random:seed(os:timestamp()), + case random:uniform(100) of + 7 -> + %% Simulate what happens when a VNode completes handoff between get_vnode_pid + %% and the fold attempting to start - other attempts to intercept and slow + %% certain parts of Riak to invoke the particular race condition were unsuccessful + ?I_INFO("Replaced VNode with spawned function in get_vnode_pid"), + VNodePid = spawn(fun() -> + ok + end), + MonRef = erlang:monitor(process, VNodePid), + receive + {'DOWN', MonRef, process, VNodePid, _Reason} -> ok + end, + {ok, VNodePid}; + _ -> + ?M:get_vnode_pid_orig(Index, VNodeMod) + end; +return_dead_process_pid_from_get_vnode_pid(Index, VNodeMod) -> + ?M:get_vnode_pid_orig(Index, VNodeMod). \ No newline at end of file diff --git a/intercepts/riak_core_vnode_master_intercepts.erl b/intercepts/riak_core_vnode_master_intercepts.erl index b581ca769..45bd3253d 100644 --- a/intercepts/riak_core_vnode_master_intercepts.erl +++ b/intercepts/riak_core_vnode_master_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_core_vnode_master_intercepts). -compile(export_all). -include("intercept.hrl"). @@ -32,4 +52,27 @@ stop_vnode_after_bloom_fold_request_succeeds(IndexNode, Req, Sender, VMaster) -> ?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster) end; false -> ?M:command_return_vnode_orig(IndexNode, Req, Sender, VMaster) - end. \ No newline at end of file + end. + +return_dead_process_pid_from_get_vnode_pid(Node, Index, VNodeMod = riak_pipe_vnode) -> + %% ?I_INFO("Intercepting riak_core_vnode_master:get_vnode_pid"), + random:seed(os:timestamp()), + case random:uniform(100) of + 7 -> + %% Simulate what happens when a VNode completes handoff between get_vnode_pid + %% and the fold attempting to start - other attempts to intercept and slow + %% certain parts of Riak to invoke the particular race condition were unsuccessful + ?I_INFO("Replaced VNode with spawned function in get_vnode_pid"), + VNodePid = spawn(fun() -> + ok + end), + MonRef = erlang:monitor(VNodePid), + receive + {'DOWN', MonRef, process, VNodePid, _Reason} -> ok + end, + {ok, VNodePid}; + _ -> + ?M:get_vnode_pid_orig(Node, Index, VNodeMod) + end; +return_dead_process_pid_from_get_vnode_pid(Node, Index, VNodeMod) -> + ?M:get_vnode_pid_orig(Node, Index, VNodeMod). \ No newline at end of file diff --git a/intercepts/riak_core_vnode_proxy_sup_intercepts.erl b/intercepts/riak_core_vnode_proxy_sup_intercepts.erl index 43f44ad13..70ab8e450 100644 --- a/intercepts/riak_core_vnode_proxy_sup_intercepts.erl +++ b/intercepts/riak_core_vnode_proxy_sup_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_core_vnode_proxy_sup_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_ensemble_peer_intercepts.erl b/intercepts/riak_ensemble_peer_intercepts.erl index b434345aa..452e51d51 100644 --- a/intercepts/riak_ensemble_peer_intercepts.erl +++ b/intercepts/riak_ensemble_peer_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_ensemble_peer_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_kv_bitcask_backend_intercepts.erl b/intercepts/riak_kv_bitcask_backend_intercepts.erl index caec3ebf4..dc42cbeb3 100644 --- a/intercepts/riak_kv_bitcask_backend_intercepts.erl +++ b/intercepts/riak_kv_bitcask_backend_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_kv_bitcask_backend_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_kv_eleveldb_backend_intercepts.erl b/intercepts/riak_kv_eleveldb_backend_intercepts.erl index a6fe80f8a..e3feab244 100644 --- a/intercepts/riak_kv_eleveldb_backend_intercepts.erl +++ b/intercepts/riak_kv_eleveldb_backend_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_kv_eleveldb_backend_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_kv_get_fsm_intercepts.erl b/intercepts/riak_kv_get_fsm_intercepts.erl index 87a2393f2..e4119bd73 100644 --- a/intercepts/riak_kv_get_fsm_intercepts.erl +++ b/intercepts/riak_kv_get_fsm_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_kv_get_fsm_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_kv_index_hashtree_intercepts.erl b/intercepts/riak_kv_index_hashtree_intercepts.erl index d608fe69f..14a13ba86 100644 --- a/intercepts/riak_kv_index_hashtree_intercepts.erl +++ b/intercepts/riak_kv_index_hashtree_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_kv_index_hashtree_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_kv_put_fsm_intercepts.erl b/intercepts/riak_kv_put_fsm_intercepts.erl index bb276e06b..441399517 100644 --- a/intercepts/riak_kv_put_fsm_intercepts.erl +++ b/intercepts/riak_kv_put_fsm_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_kv_put_fsm_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_kv_vnode_intercepts.erl b/intercepts/riak_kv_vnode_intercepts.erl index 780cefd8a..c6b3fba70 100644 --- a/intercepts/riak_kv_vnode_intercepts.erl +++ b/intercepts/riak_kv_vnode_intercepts.erl @@ -1,7 +1,39 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_kv_vnode_intercepts). -compile(export_all). -include("intercept.hrl"). +%% shamelessly copied from riak_kv_vnode.hrl +-record(riak_kv_w1c_put_req_v1, { + bkey :: {binary(),binary()}, + encoded_val :: binary(), + type :: primary | fallback + % start_time :: non_neg_integer(), Jon to add? +}). +-record(riak_kv_w1c_put_reply_v1, { + reply :: ok | {error, term()}, + type :: primary | fallback +}). + -define(M, riak_kv_vnode_orig). %% @doc Simulate dropped puts by truncating the preflist for every kv @@ -30,6 +62,29 @@ slow_handle_coverage(Req, Filter, Sender, State) -> timer:sleep(Rand), ?M:handle_coverage_orig(Req, Filter, Sender, State). +%% @doc Count how many times we call handle_handoff_command +count_handoff_w1c_puts(#riak_kv_w1c_put_req_v1{}=Req, Sender, State) -> + Val = ?M:handle_handoff_command_orig(Req, Sender, State), + ets:update_counter(intercepts_tab, w1c_put_counter, 1), + Val; +count_handoff_w1c_puts(Req, Sender, State) -> + ?M:handle_handoff_command_orig(Req, Sender, State). + +%% @doc Count how many times we handle syncchronous and asynchronous replies +%% in handle_command when using w1c buckets +count_w1c_handle_command(#riak_kv_w1c_put_req_v1{}=Req, Sender, State) -> + case ?M:handle_command_orig(Req, Sender, State) of + {noreply, NewState} -> + ets:update_counter(intercepts_tab, w1c_async_replies, 1), + {noreply, NewState}; + {reply, #riak_kv_w1c_put_reply_v1{reply=ok, type=Type}, NewState} -> + ets:update_counter(intercepts_tab, w1c_sync_replies, 1), + {reply, #riak_kv_w1c_put_reply_v1{reply=ok, type=Type}, NewState}; + Any -> Any + end; +count_w1c_handle_command(Req, Sender, State) -> + ?M:handle_command_orig(Req, Sender, State). + %% @doc Simulate dropped gets/network partitions byresponding with %% noreply during get requests. drop_do_get(Sender, BKey, ReqId, State) -> @@ -104,3 +159,9 @@ corrupting_handle_handoff_data(BinObj0, State) -> corrupt_binary(O) -> crypto:rand_bytes(byte_size(O)). + +put_as_readrepair(Preflist, BKey, Obj, ReqId, StartTime, Options) -> + ?M:put_orig(Preflist, BKey, Obj, ReqId, StartTime, [rr | Options]). + +coord_put_as_readrepair(Preflist, BKey, Obj, ReqId, StartTime, Options) -> + ?M:coord_put_orig(Preflist, BKey, Obj, ReqId, StartTime, [rr | Options]). diff --git a/intercepts/riak_kv_worker_intercepts.erl b/intercepts/riak_kv_worker_intercepts.erl new file mode 100644 index 000000000..745a8153d --- /dev/null +++ b/intercepts/riak_kv_worker_intercepts.erl @@ -0,0 +1,46 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + +-module(riak_kv_worker_intercepts). +-compile(export_all). +-include("intercept.hrl"). +-define(M, riak_kv_worker_orig). + +% +% Okay, this is an interesting intercept. The intention here is to insert some +% code into the point where the vnode has completed its fold, but before it invokes the finish +% command, which will inform the riak_core_vnode that handoff has completed for this node. +% This is a magic time, when handoff is running, and the fold has completed. +% In this case, we send a message to a process that is running in riak test +% (see verify_handoff_write_once, for an example), which will do a write during this +% magic time. We wait for said process to return us an ok. +% +% The objective is to force the vnode to trigger a handle_handoff_command, +% thus exercising the runtime/forwarding handoff logic in the vnode. +% +handle_work_intercept({fold, FoldFun, FinishFun}, Sender, State) -> + FinishWrapperFun = fun(X) -> + catch global:send(rt_ho_w1c_proc, {write, self()}), + receive + ok -> ok + end, + FinishFun(X) + end, + ?M:handle_work_orig({fold, FoldFun, FinishWrapperFun}, Sender, State). \ No newline at end of file diff --git a/intercepts/riak_object_intercepts.erl b/intercepts/riak_object_intercepts.erl index 7ed350800..dd72b66df 100644 --- a/intercepts/riak_object_intercepts.erl +++ b/intercepts/riak_object_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_object_intercepts). -export([skippable_index_specs/1, skippable_diff_index_specs/2]). diff --git a/intercepts/riak_pipe_vnode_intercepts.erl b/intercepts/riak_pipe_vnode_intercepts.erl index b3e7ca2be..9018c2bf5 100644 --- a/intercepts/riak_pipe_vnode_intercepts.erl +++ b/intercepts/riak_pipe_vnode_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_pipe_vnode_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_repl2_fs_node_reserver_intercepts.erl b/intercepts/riak_repl2_fs_node_reserver_intercepts.erl index 3e3b99e1f..9deed6bf0 100644 --- a/intercepts/riak_repl2_fs_node_reserver_intercepts.erl +++ b/intercepts/riak_repl2_fs_node_reserver_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_repl2_fs_node_reserver_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_repl2_fssource_intercepts.erl b/intercepts/riak_repl2_fssource_intercepts.erl index cf37ff1d9..cf9f69aae 100644 --- a/intercepts/riak_repl2_fssource_intercepts.erl +++ b/intercepts/riak_repl2_fssource_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_repl2_fssource_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_repl2_leader_intercepts.erl b/intercepts/riak_repl2_leader_intercepts.erl index 46571ad86..c02a3c1b0 100644 --- a/intercepts/riak_repl2_leader_intercepts.erl +++ b/intercepts/riak_repl2_leader_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_repl2_leader_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_repl2_rtq_intercepts.erl b/intercepts/riak_repl2_rtq_intercepts.erl index c961a1c29..743d10cdc 100644 --- a/intercepts/riak_repl2_rtq_intercepts.erl +++ b/intercepts/riak_repl2_rtq_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + %% Intercepts functions for the riak_test in ../tests/repl_rt_heartbeat.erl -module(riak_repl2_rtq_intercepts). -compile(export_all). diff --git a/intercepts/riak_repl2_rtsink_conn_intercepts.erl b/intercepts/riak_repl2_rtsink_conn_intercepts.erl index 85a139739..0744c8deb 100644 --- a/intercepts/riak_repl2_rtsink_conn_intercepts.erl +++ b/intercepts/riak_repl2_rtsink_conn_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + %% Intercepts functions for the riak_test in ../tests/repl_rt_heartbeat.erl -module(riak_repl2_rtsink_conn_intercepts). -compile(export_all). diff --git a/intercepts/riak_repl2_rtsource_helper_intercepts.erl b/intercepts/riak_repl2_rtsource_helper_intercepts.erl index d4ac50fc2..2986e553b 100644 --- a/intercepts/riak_repl2_rtsource_helper_intercepts.erl +++ b/intercepts/riak_repl2_rtsource_helper_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + %% Intercepts functions for the riak_test in ../tests/repl_rt_heartbeat.erl -module(riak_repl2_rtsource_helper_intercepts). -compile(export_all). diff --git a/intercepts/riak_repl_aae_source_intercepts.erl b/intercepts/riak_repl_aae_source_intercepts.erl index e12203ee3..11f65a691 100644 --- a/intercepts/riak_repl_aae_source_intercepts.erl +++ b/intercepts/riak_repl_aae_source_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_repl_aae_source_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_repl_console_intercepts.erl b/intercepts/riak_repl_console_intercepts.erl index 736d7efc4..0a81ecc6d 100644 --- a/intercepts/riak_repl_console_intercepts.erl +++ b/intercepts/riak_repl_console_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_repl_console_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_repl_reduced_intercepts.erl b/intercepts/riak_repl_reduced_intercepts.erl index 5ed0fc414..32c4c0ef6 100644 --- a/intercepts/riak_repl_reduced_intercepts.erl +++ b/intercepts/riak_repl_reduced_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + %% @doc Put some bugs on the reduced N mutators. This allows one to %% capture and inspect the objects actually set to the mutate_put and %% mutate_get funcitons. This module is currently used by diff --git a/intercepts/riak_repl_ring_handler_intercepts.erl b/intercepts/riak_repl_ring_handler_intercepts.erl index 460ebda9a..c0ef9a20e 100644 --- a/intercepts/riak_repl_ring_handler_intercepts.erl +++ b/intercepts/riak_repl_ring_handler_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_repl_ring_handler_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/intercepts/riak_repl_util_intercepts.erl b/intercepts/riak_repl_util_intercepts.erl index aba688484..8d4dc767a 100644 --- a/intercepts/riak_repl_util_intercepts.erl +++ b/intercepts/riak_repl_util_intercepts.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%------------------------------------------------------------------- + -module(riak_repl_util_intercepts). -compile(export_all). -include("intercept.hrl"). diff --git a/rebar.config b/rebar.config index b28cada82..94200168e 100644 --- a/rebar.config +++ b/rebar.config @@ -11,12 +11,12 @@ {eunit_opts, [verbose]}. {deps, [ - {lager, ".*", {git, "git://github.com/basho/lager", {tag, "2.0.3"}}}, + {lager, ".*", {git, "git://github.com/basho/lager", {tag, "2.2.3"}}}, {getopt, ".*", {git, "git://github.com/jcomellas/getopt", {tag, "v0.4"}}}, - {meck, ".*", {git, "git://github.com/basho/meck.git", {tag, "0.8.2"}}}, + {meck, "0.8.2", {git, "git://github.com/basho/meck.git", {tag, "0.8.2"}}}, {mapred_verify, ".*", {git, "git://github.com/basho/mapred_verify", {branch, "master"}}}, - {riakc, ".*", {git, "git://github.com/basho/riak-erlang-client", {tag, "2.0.1"}}}, - {riakhttpc, ".*", {git, "git://github.com/basho/riak-erlang-http-client", "cac1200fdb4d7e4b6d3edf11e543a40c2f105fbe"}}, + {riakc, ".*", {git, "git://github.com/basho/riak-erlang-client", {branch, "develop-2.2"}}}, + {riakhttpc, ".*", {git, "git://github.com/basho/riak-erlang-http-client", {branch, "develop-2.2"}}}, {kvc, "1.3.0", {git, "https://github.com/etrepum/kvc", {tag, "v1.3.0"}}}, {druuid, ".*", {git, "git://github.com/kellymclaughlin/druuid.git", {tag, "0.2"}}} ]}. diff --git a/src/riak_test_escript.erl b/src/riak_test_escript.erl index e5c7449ef..fc694fb8c 100644 --- a/src/riak_test_escript.erl +++ b/src/riak_test_escript.erl @@ -297,26 +297,45 @@ run_test(Test, Outdir, TestMetaData, Report, HarnessArgs, NumTests) -> 1 -> keep_them_up; _ -> rt:teardown() end, - CoverageFile = rt_cover:maybe_export_coverage(Test, CoverDir, erlang:phash2(TestMetaData)), + CoverageFile = rt_cover:maybe_export_coverage(Test, + CoverDir, + erlang:phash2(TestMetaData)), case Report of undefined -> ok; _ -> - {value, {log, L}, TestResult} = lists:keytake(log, 1, SingleTestResult), + {value, {log, L}, TestResult} = + lists:keytake(log, 1, SingleTestResult), case giddyup:post_result(TestResult) of error -> woops; {ok, Base} -> %% Now push up the artifacts, starting with the test log giddyup:post_artifact(Base, {"riak_test.log", L}), - [ giddyup:post_artifact(Base, File) || File <- rt:get_node_logs() ], - [giddyup:post_artifact(Base, {filename:basename(CoverageFile) ++ ".gz", - zlib:gzip(element(2,file:read_file(CoverageFile)))}) || CoverageFile /= cover_disabled ], - ResultPlusGiddyUp = TestResult ++ [{giddyup_url, list_to_binary(Base)}], - [ rt:post_result(ResultPlusGiddyUp, WebHook) || WebHook <- get_webhooks() ] + [giddyup:post_artifact(Base, File) + || File <- rt:get_node_logs()], + maybe_post_debug_logs(Base), + [giddyup:post_artifact( + Base, + {filename:basename(CoverageFile) ++ ".gz", + zlib:gzip(element(2,file:read_file(CoverageFile)))}) + || CoverageFile /= cover_disabled], + ResultPlusGiddyUp = TestResult ++ + [{giddyup_url, list_to_binary(Base)}], + [rt:post_result(ResultPlusGiddyUp, WebHook) || + WebHook <- get_webhooks()] end end, rt_cover:stop(), [{coverdata, CoverageFile} | SingleTestResult]. +maybe_post_debug_logs(Base) -> + case rt_config:get(giddyup_post_debug_logs, true) of + true -> + [giddyup:post_artifact(Base, File) + || File <- rt:get_node_debug_logs()]; + _ -> + false + end. + get_webhooks() -> Hooks = lists:foldl(fun(E, Acc) -> [parse_webhook(E) | Acc] end, [], diff --git a/src/rt.erl b/src/rt.erl index 32a609ec0..738c49621 100644 --- a/src/rt.erl +++ b/src/rt.erl @@ -756,6 +756,7 @@ wait_for_service(Node, Services) when is_list(Services) -> {badrpc, Error} -> {badrpc, Error}; CurrServices when is_list(CurrServices) -> + lager:info("Waiting for services ~p: current services: ~p", [Services, CurrServices]), lists:all(fun(Service) -> lists:member(Service, CurrServices) end, Services); Res -> Res @@ -1825,6 +1826,9 @@ setup_harness(Test, Args) -> get_node_logs() -> ?HARNESS:get_node_logs(). +get_node_debug_logs() -> + ?HARNESS:get_node_debug_logs(). + %% @doc Performs a search against the log files on `Node' and returns all %% matching lines. -spec search_logs(node(), Pattern::iodata()) -> diff --git a/src/rt_cs_dev.erl b/src/rt_cs_dev.erl deleted file mode 100644 index 10e34b344..000000000 --- a/src/rt_cs_dev.erl +++ /dev/null @@ -1,511 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2013 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - -%% @private --module(rt_cs_dev). --compile(export_all). --include_lib("eunit/include/eunit.hrl"). - --define(DEVS(N), lists:concat(["dev", N, "@127.0.0.1"])). --define(DEV(N), list_to_atom(?DEVS(N))). --define(BUILD_PATHS, (rt_config:get(build_paths))). --define(SRC_PATHS, (rt_config:get(src_paths))). - -get_deps() -> - lists:flatten(io_lib:format("~s/dev/dev1/lib", [relpath(current)])). - -setup_harness(_Test, _Args) -> - confirm_build_type(rt_config:get(build_type, oss)), - Path = relpath(root), - %% Stop all discoverable nodes, not just nodes we'll be using for this test. - rt:pmap(fun(X) -> stop_all(X ++ "/dev") end, devpaths()), - - %% Reset nodes to base state - lager:info("Resetting nodes to fresh state"), - rtdev:run_git(Path, "reset HEAD --hard"), - rtdev:run_git(Path, "clean -fd"), - - lager:info("Cleaning up lingering pipe directories"), - rt:pmap(fun(Dir) -> - %% when joining two absolute paths, filename:join intentionally - %% throws away the first one. ++ gets us around that, while - %% keeping some of the security of filename:join. - %% the extra slashes will be pruned by filename:join, but this - %% ensures that there will be at least one between "/tmp" and Dir - PipeDir = filename:join(["/tmp//" ++ Dir, "dev"]), - %% when using filelib:wildcard/2, there must be a wildchar char - %% before the first '/'. - Files = filelib:wildcard("dev?/*.{r,w}", PipeDir), - [ file:delete(filename:join(PipeDir, File)) || File <- Files], - file:del_dir(PipeDir) - end, devpaths()), - ok. - -confirm_build_type(BuildType) -> - [ok = confirm_build_type(BuildType, Vsn) || Vsn <- [cs_current, cs_previous]]. - -confirm_build_type(BuildType, Vsn) -> - ReplPB = filename:join([relpath(Vsn), "dev/dev1/lib/riak_repl_pb_api*"]), - case {BuildType, filelib:wildcard(ReplPB)} of - {oss, []} -> ok; - {ee, [_|_]} -> ok; - _ -> - lager:error("Build type of ~p is not ~p", - [Vsn, BuildType]), - {error, {build_type_mismatch, Vsn}} - end. - -relpath(Vsn) -> - Path = ?BUILD_PATHS, - path(Vsn, Path). - -srcpath(Vsn) -> - Path = ?SRC_PATHS, - path(Vsn, Path). - -path(Vsn, Paths=[{_,_}|_]) -> - orddict:fetch(Vsn, orddict:from_list(Paths)); -path(current, Path) -> - Path; -path(root, Path) -> - Path; -path(_, _) -> - throw("Version requested but only one path provided"). - -upgrade(Node, NewVersion) -> - N = node_id(Node), - Version = node_version(N), - lager:info("Upgrading ~p : ~p -> ~p", [Node, Version, NewVersion]), - catch stop(Node), - rt:wait_until_unpingable(Node), - OldPath = relpath(Version), - NewPath = relpath(NewVersion), - - Commands = [ - io_lib:format("cp -p -P -R \"~s/dev/dev~b/data\" \"~s/dev/dev~b\"", - [OldPath, N, NewPath, N]), - io_lib:format("rm -rf ~s/dev/dev~b/data/*", - [OldPath, N]), - io_lib:format("cp -p -P -R \"~s/dev/dev~b/etc\" \"~s/dev/dev~b\"", - [OldPath, N, NewPath, N]) - ], - [ begin - lager:info("Running: ~s", [Cmd]), - os:cmd(Cmd) - end || Cmd <- Commands], - VersionMap = orddict:store(N, NewVersion, rt_config:get(rt_versions)), - rt_config:set(rt_versions, VersionMap), - start(Node), - rt:wait_until_pingable(Node), - ok. - -all_the_app_configs(DevPath) -> - lager:error("The dev path is ~p", [DevPath]), - case filelib:is_dir(DevPath) of - true -> - Devs = filelib:wildcard(DevPath ++ "/dev/dev*"), - [ Dev ++ "/etc/app.config" || Dev <- Devs]; - _ -> - lager:debug("~s is not a directory.", [DevPath]), - [] - end. - -update_app_config(all, Config) -> - lager:info("rtdev:update_app_config(all, ~p)", [Config]), - [ update_app_config(DevPath, Config) || DevPath <- devpaths()]; -update_app_config(Node, Config) when is_atom(Node) -> - N = node_id(Node), - Path = relpath(node_version(N)), - FileFormatString = "~s/dev/dev~b/etc/~s.config", - - AppConfigFile = io_lib:format(FileFormatString, [Path, N, "app"]), - AdvConfigFile = io_lib:format(FileFormatString, [Path, N, "advanced"]), - %% If there's an app.config, do it old style - %% if not, use cuttlefish's adavnced.config - case filelib:is_file(AppConfigFile) of - true -> - update_app_config_file(AppConfigFile, Config); - _ -> - update_app_config_file(AdvConfigFile, Config) - end; -update_app_config(DevPath, Config) -> - [update_app_config_file(AppConfig, Config) || AppConfig <- all_the_app_configs(DevPath)]. - -update_app_config_file(ConfigFile, Config) -> - lager:info("rtdev:update_app_config_file(~s, ~p)", [ConfigFile, Config]), - - BaseConfig = case file:consult(ConfigFile) of - {ok, [ValidConfig]} -> - ValidConfig; - {error, enoent} -> - [] - end, - MergeA = orddict:from_list(Config), - MergeB = orddict:from_list(BaseConfig), - NewConfig = - orddict:merge(fun(_, VarsA, VarsB) -> - MergeC = orddict:from_list(VarsA), - MergeD = orddict:from_list(VarsB), - orddict:merge(fun(_, ValA, _ValB) -> - ValA - end, MergeC, MergeD) - end, MergeA, MergeB), - NewConfigOut = io_lib:format("~p.", [NewConfig]), - ?assertEqual(ok, file:write_file(ConfigFile, NewConfigOut)), - ok. - -%% Appropriate backend will be set by rtcs later. -get_backends() -> - cs_multi_backend. - -node_path(Node) -> - N = node_id(Node), - Path = relpath(node_version(N)), - lists:flatten(io_lib:format("~s/dev/dev~b", [Path, N])). - -create_dirs(Nodes) -> - Snmp = [node_path(Node) ++ "/data/snmp/agent/db" || Node <- Nodes], - [?assertCmd("mkdir -p " ++ Dir) || Dir <- Snmp]. - -clean_data_dir(Nodes, SubDir) when is_list(Nodes) -> - DataDirs = [node_path(Node) ++ "/data/" ++ SubDir || Node <- Nodes], - lists:foreach(fun rm_dir/1, DataDirs). - -rm_dir(Dir) -> - lager:info("Removing directory ~s", [Dir]), - ?assertCmd("rm -rf " ++ Dir), - ?assertEqual(false, filelib:is_dir(Dir)). - -add_default_node_config(Nodes) -> - case rt_config:get(rt_default_config, undefined) of - undefined -> ok; - Defaults when is_list(Defaults) -> - rt:pmap(fun(Node) -> - update_app_config(Node, Defaults) - end, Nodes), - ok; - BadValue -> - lager:error("Invalid value for rt_default_config : ~p", [BadValue]), - throw({invalid_config, {rt_default_config, BadValue}}) - end. - -deploy_nodes(NodeConfig) -> - Path = relpath(root), - lager:info("Riak path: ~p", [Path]), - NumNodes = length(NodeConfig), - NodesN = lists:seq(1, NumNodes), - Nodes = [?DEV(N) || N <- NodesN], - NodeMap = orddict:from_list(lists:zip(Nodes, NodesN)), - {Versions, Configs} = lists:unzip(NodeConfig), - VersionMap = lists:zip(NodesN, Versions), - - %% Check that you have the right versions available - [ check_node(Version) || Version <- VersionMap ], - rt_config:set(rt_nodes, NodeMap), - rt_config:set(rt_versions, VersionMap), - - create_dirs(Nodes), - - %% Set initial config - add_default_node_config(Nodes), - rt:pmap(fun({_, default}) -> - ok; - ({Node, Config}) -> - update_app_config(Node, Config) - end, - lists:zip(Nodes, Configs)), - - %% create snmp dirs, for EE - create_dirs(Nodes), - - %% Start nodes - %%[run_riak(N, relpath(node_version(N)), "start") || N <- Nodes], - rt:pmap(fun(N) -> rtdev:run_riak(N, relpath(node_version(N)), "start") end, NodesN), - - %% Ensure nodes started - [ok = rt:wait_until_pingable(N) || N <- Nodes], - - %% %% Enable debug logging - %% [rpc:call(N, lager, set_loglevel, [lager_console_backend, debug]) || N <- Nodes], - - %% We have to make sure that riak_core_ring_manager is running before we can go on. - [ok = rt:wait_until_registered(N, riak_core_ring_manager) || N <- Nodes], - - %% Ensure nodes are singleton clusters - [ok = rt:check_singleton_node(?DEV(N)) || {N, Version} <- VersionMap, - Version /= "0.14.2"], - - lager:info("Deployed nodes: ~p", [Nodes]), - Nodes. - -stop_all(DevPath) -> - case filelib:is_dir(DevPath) of - true -> - Devs = filelib:wildcard(DevPath ++ "/{dev,stanchion}*"), - - %% Works, but I'd like it to brag a little more about it. - Stop = fun(C) -> - Cmd = stop_command(C), - [Output | _Tail] = string:tokens(os:cmd(Cmd), "\n"), - Status = case Output of - "ok" -> "ok"; - _ -> "wasn't running" - end, - lager:info("Stopping Node... ~s ~~ ~s.", [Cmd, Status]) - end, - [Stop(D) || D <- Devs]; - _ -> lager:info("~s is not a directory.", [DevPath]) - end, - ok. - -stop_command(C) -> - IsRiakCS = string:str(C, "riak_cs"), - IsStanchion = string:str(C, "stanchion"), - if - IsRiakCS > 0 -> - C ++ "/bin/riak-cs stop"; - IsStanchion > 0 -> - C ++ "/bin/stanchion stop"; - true -> - C ++ "/bin/riak stop" - end. - -stop(Node) -> - RiakPid = rpc:call(Node, os, getpid, []), - N = node_id(Node), - rtdev:run_riak(N, relpath(node_version(N)), "stop"), - F = fun(_N) -> - os:cmd("kill -0 " ++ RiakPid) =/= [] - end, - ?assertEqual(ok, rt:wait_until(Node, F)), - ok. - -start(Node) -> - N = node_id(Node), - rtdev:run_riak(N, relpath(node_version(N)), "start"), - ok. - -attach(Node, Expected) -> - interactive(Node, "attach", Expected). - -attach_direct(Node, Expected) -> - interactive(Node, "attach-direct", Expected). - -console(Node, Expected) -> - interactive(Node, "console", Expected). - -interactive(Node, Command, Exp) -> - N = node_id(Node), - Path = relpath(node_version(N)), - Cmd = rtdev:riakcmd(Path, N, Command), - lager:info("Opening a port for riak ~s.", [Command]), - lager:debug("Calling open_port with cmd ~s", [binary_to_list(iolist_to_binary(Cmd))]), - P = open_port({spawn, binary_to_list(iolist_to_binary(Cmd))}, - [stream, use_stdio, exit_status, binary, stderr_to_stdout]), - interactive_loop(P, Exp). - -interactive_loop(Port, Expected) -> - receive - {Port, {data, Data}} -> - %% We've gotten some data, so the port isn't done executing - %% Let's break it up by newline and display it. - Tokens = string:tokens(binary_to_list(Data), "\n"), - [lager:debug("~s", [Text]) || Text <- Tokens], - - %% Now we're going to take hd(Expected) which is either {expect, X} - %% or {send, X}. If it's {expect, X}, we foldl through the Tokenized - %% data looking for a partial match via rt:str/2. If we find one, - %% we pop hd off the stack and continue iterating through the list - %% with the next hd until we run out of input. Once hd is a tuple - %% {send, X}, we send that test to the port. The assumption is that - %% once we send data, anything else we still have in the buffer is - %% meaningless, so we skip it. That's what that {sent, sent} thing - %% is about. If there were a way to abort mid-foldl, I'd have done - %% that. {sent, _} -> is just a pass through to get out of the fold. - - NewExpected = lists:foldl(fun(X, Expect) -> - [{Type, Text}|RemainingExpect] = case Expect of - [] -> [{done, "done"}|[]]; - E -> E - end, - case {Type, rt:str(X, Text)} of - {expect, true} -> - RemainingExpect; - {expect, false} -> - [{Type, Text}|RemainingExpect]; - {send, _} -> - port_command(Port, list_to_binary(Text ++ "\n")), - [{sent, "sent"}|RemainingExpect]; - {sent, _} -> - Expect; - {done, _} -> - [] - end - end, Expected, Tokens), - %% Now that the fold is over, we should remove {sent, sent} if it's there. - %% The fold might have ended not matching anything or not sending anything - %% so it's possible we don't have to remove {sent, sent}. This will be passed - %% to interactive_loop's next iteration. - NewerExpected = case NewExpected of - [{sent, "sent"}|E] -> E; - E -> E - end, - %% If NewerExpected is empty, we've met all expected criteria and in order to boot - %% Otherwise, loop. - case NewerExpected of - [] -> ?assert(true); - _ -> interactive_loop(Port, NewerExpected) - end; - {Port, {exit_status,_}} -> - %% This port has exited. Maybe the last thing we did was {send, [4]} which - %% as Ctrl-D would have exited the console. If Expected is empty, then - %% We've met every expectation. Yay! If not, it means we've exited before - %% something expected happened. - ?assertEqual([], Expected) - after rt_config:get(rt_max_wait_time) -> - %% interactive_loop is going to wait until it matches expected behavior - %% If it doesn't, the test should fail; however, without a timeout it - %% will just hang forever in search of expected behavior. See also: Parenting - ?assertEqual([], Expected) - end. - -admin(Node, Args, Options) -> - N = node_id(Node), - Path = relpath(node_version(N)), - Cmd = rtdev:riak_admin_cmd(Path, N, Args), - lager:info("Running: ~s", [Cmd]), - Result = execute_admin_cmd(Cmd, Options), - lager:info("~s", [Result]), - {ok, Result}. - -execute_admin_cmd(Cmd, Options) -> - {_ExitCode, Result} = FullResult = wait_for_cmd(spawn_cmd(Cmd)), - case lists:member(return_exit_code, Options) of - true -> - FullResult; - false -> - Result - end. - -riak(Node, Args) -> - N = node_id(Node), - Path = relpath(node_version(N)), - Result = rtdev:run_riak(N, Path, Args), - lager:info("~s", [Result]), - {ok, Result}. - -node_id(Node) -> - NodeMap = rt_config:get(rt_nodes), - orddict:fetch(Node, NodeMap). - -node_version(N) -> - VersionMap = rt_config:get(rt_versions), - orddict:fetch(N, VersionMap). - -spawn_cmd(Cmd) -> - spawn_cmd(Cmd, []). -spawn_cmd(Cmd, Opts) -> - Port = open_port({spawn, Cmd}, [stream, in, exit_status, stderr_to_stdout] ++ Opts), - Port. - -wait_for_cmd(Port) -> - rt:wait_until(node(), - fun(_) -> - receive - {Port, Msg={data, _}} -> - self() ! {Port, Msg}, - false; - {Port, Msg={exit_status, _}} -> - catch port_close(Port), - self() ! {Port, Msg}, - true - after 0 -> - false - end - end), - get_cmd_result(Port, []). - -cmd(Cmd) -> - cmd(Cmd, []). - -cmd(Cmd, Opts) -> - wait_for_cmd(spawn_cmd(Cmd, Opts)). - -get_cmd_result(Port, Acc) -> - receive - {Port, {data, Bytes}} -> - get_cmd_result(Port, [Bytes|Acc]); - {Port, {exit_status, Status}} -> - Output = lists:flatten(lists:reverse(Acc)), - {Status, Output} - after 0 -> - timeout - end. - -check_node({_N, Version}) -> - case proplists:is_defined(Version, rt_config:get(build_paths)) of - true -> ok; - _ -> - lager:error("You don't have Riak ~s installed or configured", [Version]), - erlang:error("You don't have Riak " ++ atom_to_list(Version) ++ " installed or configured") - end. - -set_backend(Backend) -> - lager:info("rtdev:set_backend(~p)", [Backend]), - update_app_config(all, [{riak_kv, [{storage_backend, Backend}]}]), - get_backends(). - -get_version() -> - case file:read_file(relpath(cs_current) ++ "/VERSION") of - {error, enoent} -> unknown; - {ok, Version} -> Version - end. - -teardown() -> - %% Stop all discoverable nodes, not just nodes we'll be using for this test. - rt:pmap(fun(X) -> stop_all(X ++ "/dev") end, - devpaths()). - -whats_up() -> - io:format("Here's what's running...~n"), - - Up = [rpc:call(Node, os, cmd, ["pwd"]) || Node <- nodes()], - [io:format(" ~s~n",[string:substr(Dir, 1, length(Dir)-1)]) || Dir <- Up]. - -devpaths() -> - lists:usort([ DevPath || {Name, DevPath} <- rt_config:get(build_paths), - not lists:member(Name, [root, ee_root, cs_root, stanchion_root]) - ]). - -versions() -> - proplists:get_keys(rt_config:get(build_paths)) -- [root]. - -get_node_logs() -> - lists:flatmap(fun get_node_logs/1, [root, ee_root, cs_root, stanchion_root]). - -get_node_logs(Base) -> - Root = filename:absname(proplists:get_value(Base, ?BUILD_PATHS)), - %% Unlike Riak, Riak CS has multiple things in the root and so we need - %% to distinguish between them. - RootLen = length(filename:dirname(Root)) + 1, %% Remove the leading slash - [ begin - {ok, Port} = file:open(Filename, [read, binary]), - {lists:nthtail(RootLen, Filename), Port} - end || Filename <- filelib:wildcard(Root ++ "/*/dev/dev*/log/*") ]. diff --git a/src/rt_intercept.erl b/src/rt_intercept.erl index abb5890f8..4e2803935 100644 --- a/src/rt_intercept.erl +++ b/src/rt_intercept.erl @@ -78,6 +78,12 @@ add(Node, {Target, Intercept, Mapping}, OutDir) -> NMapping = [transform_anon_fun(M) || M <- Mapping], ok = rpc:call(Node, intercept, add, [Target, Intercept, NMapping, OutDir]). +clean(Node, Targets) when is_list(Targets) -> + [ok = clean(Node, T) || T <- Targets], + ok; +clean(Node, Target) -> + ok = rpc:call(Node, intercept, clean, [Target]). + %% The following function transforms anonymous function mappings passed %% from an Erlang shell. Anonymous intercept functions from compiled code %% require the developer to supply free variables themselves, and also diff --git a/src/rtdev.erl b/src/rtdev.erl index 678b12994..9733a1da4 100644 --- a/src/rtdev.erl +++ b/src/rtdev.erl @@ -26,6 +26,8 @@ -define(DEVS(N), lists:concat(["dev", N, "@127.0.0.1"])). -define(DEV(N), list_to_atom(?DEVS(N))). -define(PATH, (rt_config:get(rtdev_path))). +-define(DEBUG_LOG_FILE(N), + "dev" ++ integer_to_list(N) ++ "@127.0.0.1-riak-debug.tar.gz"). get_deps() -> lists:flatten(io_lib:format("~s/dev/dev1/lib", [relpath(current)])). @@ -52,6 +54,17 @@ riak_admin_cmd(Path, N, Args) -> ExecName = rt_config:get(exec_name, "riak"), io_lib:format("~s/dev/dev~b/bin/~s-admin ~s", [Path, N, ExecName, ArgStr]). +riak_debug_cmd(Path, N, Args) -> + Quoted = + lists:map(fun(Arg) when is_list(Arg) -> + lists:flatten([$", Arg, $"]); + (_) -> + erlang:error(badarg) + end, Args), + ArgStr = string:join(Quoted, " "), + ExecName = rt_config:get(exec_name, "riak"), + io_lib:format("~s/dev/dev~b/bin/~s-debug ~s", [Path, N, ExecName, ArgStr]). + run_git(Path, Cmd) -> lager:info("Running: ~s", [gitcmd(Path, Cmd)]), {0, Out} = cmd(gitcmd(Path, Cmd)), @@ -765,6 +778,35 @@ get_node_logs() -> {lists:nthtail(RootLen, Filename), Port} end || Filename <- filelib:wildcard(Root ++ "/*/dev/dev*/log/*") ]. +get_node_debug_logs() -> + NodeMap = rt_config:get(rt_nodes), + rt:pmap(fun(Node) -> + get_node_debug_logs(Node) + end, + NodeMap). + +get_node_debug_logs({_Node, NodeNum}) -> + DebugLogFile = ?DEBUG_LOG_FILE(NodeNum), + delete_existing_debug_log_file(DebugLogFile), + Path = relpath(node_version(NodeNum)), + Args = ["--logs"], + Cmd = riak_debug_cmd(Path, NodeNum, Args), + {ExitCode, Result} = wait_for_cmd(spawn_cmd(Cmd)), + case ExitCode of + 0 -> + DebugLogFile; + _ -> + exit({ExitCode, Result}) + end. + +%% If the debug log file exists from a previous test run it will cause the +%% `riak_debug_cmd' to fail. Therefore, delete the `DebugLogFile' if it exists. +%% Note that by ignoring the return value of `file:delete/1' this function +%% works whether or not the `DebugLogFile' actually exists at the time it is +%% called. +delete_existing_debug_log_file(DebugLogFile) -> + file:delete(DebugLogFile). + %% @doc Performs a search against the log files on `Node' and returns all %% matching lines. -spec search_logs(node(), Pattern::iodata()) -> diff --git a/src/yokozuna_rt.erl b/src/yokozuna_rt.erl index 8b7f0c2af..21a492317 100644 --- a/src/yokozuna_rt.erl +++ b/src/yokozuna_rt.erl @@ -26,6 +26,7 @@ check_exists/2, clear_trees/1, commit/2, + drain_solrqs/1, expire_trees/1, gen_keys/1, host_entries/1, @@ -46,6 +47,7 @@ wait_for_index/2, wait_for_schema/2, wait_for_schema/3, + wait_until/2, write_data/5, write_data/6, http/4, @@ -72,7 +74,7 @@ host_entries(ClusterConnInfo) -> [riak_http(I) || {_,I} <- ClusterConnInfo]. %% @doc Generate `SeqMax' keys. Yokozuna supports only UTF-8 compatible keys. --spec gen_keys(pos_integer()) -> [binary()]. +-spec gen_keys(pos_integer()) -> list(). gen_keys(SeqMax) -> [<> || N <- lists:seq(1, SeqMax), not lists:any( @@ -243,6 +245,7 @@ brutal_kill_remove_index_dirs(Nodes, IndexName, Services) -> -spec remove_index_dirs([node()], index_name(), [atom()]) -> ok. remove_index_dirs(Nodes, IndexName, Services) -> IndexDirs = get_index_dirs(IndexName, Nodes), + [rt:stop(ANode) || ANode <- Nodes], remove_index_dirs2(Nodes, IndexDirs, Services), ok. @@ -453,6 +456,13 @@ commit(Nodes, Index) -> rpc:multicall(Nodes, yz_solr, commit, [Index]), ok. +-spec drain_solrqs(node() | cluster()) -> ok. +drain_solrqs(Cluster) when is_list(Cluster) -> + [drain_solrqs(Node) || Node <- Cluster]; +drain_solrqs(Node) -> + rpc:call(Node, yz_solrq_drain_mgr, drain, []), + ok. + -spec override_schema(pid(), [node()], index_name(), schema_name(), string()) -> {ok, [node()]}. override_schema(Pid, Cluster, Index, Schema, RawUpdate) -> diff --git a/tests/overload.erl b/tests/overload.erl index 96ca04304..b72ef34c8 100644 --- a/tests/overload.erl +++ b/tests/overload.erl @@ -33,8 +33,10 @@ -define(KEY, <<"hotkey">>). -define(NORMAL_TYPE, <<"normal_type">>). -define(CONSISTENT_TYPE, <<"consistent_type">>). +-define(WRITE_ONCE_TYPE, <<"write_once_type">>). -define(NORMAL_BKV, {{?NORMAL_TYPE, ?BUCKET}, ?KEY, <<"test">>}). -define(CONSISTENT_BKV, {{?CONSISTENT_TYPE, ?BUCKET}, ?KEY, <<"test">>}). +-define(WRITE_ONCE_BKV, {{?WRITE_ONCE_TYPE, ?BUCKET}, ?KEY, <<"test">>}). %% This record contains the default values for config settings if they were not set %% in the advanced.config file - because setting something to `undefined` is not the same @@ -83,12 +85,14 @@ confirm() -> ok = create_bucket_type(Nodes, ?NORMAL_TYPE, [{n_val, 3}]), ok = create_bucket_type(Nodes, ?CONSISTENT_TYPE, [{consistent, true}, {n_val, 5}]), + ok = create_bucket_type(Nodes, ?WRITE_ONCE_TYPE, [{write_once, true}, {n_val, 1}]), rt:wait_until(ring_manager_check_fun(hd(Nodes))), Node1 = hd(Nodes), write_once(Node1, ?NORMAL_BKV), write_once(Node1, ?CONSISTENT_BKV), + write_once(Node1, ?WRITE_ONCE_BKV), Tests = [test_no_overload_protection, test_vnode_protection, @@ -99,7 +103,9 @@ confirm() -> ok = erlang:apply(?MODULE, Test, [Nodes, BKV]) end || Test <- Tests, BKV <- [?NORMAL_BKV, - ?CONSISTENT_BKV]], + ?CONSISTENT_BKV, + ?WRITE_ONCE_BKV]], + %% Test cover queries doesn't depend on bucket/keyvalue, just run it once test_cover_queries_overload(Nodes), pass. @@ -163,9 +169,15 @@ test_vnode_protection(Nodes, BKV) -> Pid ! resume, ok. + %% Don't check consistent gets, as they don't use the FSM test_fsm_protection(_, ?CONSISTENT_BKV) -> ok; + +%% Don't check on fast path either. +test_fsm_protection(_, ?WRITE_ONCE_BKV) -> + ok; + test_fsm_protection(Nodes, BKV) -> lager:info("Testing with coordinator protection enabled"), lager:info("Setting FSM limit to ~b", [?THRESHOLD]), diff --git a/tests/pipe_verify_handoff_blocking.erl b/tests/pipe_verify_handoff_blocking.erl index 4c48bdabc..6840bd214 100644 --- a/tests/pipe_verify_handoff_blocking.erl +++ b/tests/pipe_verify_handoff_blocking.erl @@ -43,17 +43,6 @@ %% archive), but we don't want them to process so many inputs that %% they consume their blocking queues before handing off. -%% Please Note: Under rare circumstances, this test may fail with a -%% "{badmatch,{error,[{vnode_down,noproc}]}}' error. This is not a -%% failure of this test but rather a side effect of a race condition -%% in riak_core_vnode_proxy. It manifests due to the fact that the -%% test is attempting to send a command to a vnode that is in fact -%% down, however monitor only works by issuing a command and getting -%% a PID. In some instances, get_vnode_pid fails because vnode shutdown -%% is queued up in the mailbox before monitor node. Unfortunately, the -%% fix would require a fundamental shift in the architecture of -%% riak_core, which at the time of this writing is not feasible for -%% this rare failure case. -module(pipe_verify_handoff_blocking). -export([ @@ -80,9 +69,12 @@ confirm() -> Inputs = lists:seq(1, 20), lager:info("Start ~b nodes", [?NODE_COUNT]), - NodeDefs = lists:duplicate(?NODE_COUNT, {current, default}), + Config = [{riak_core, [{ring_creation_size, 8}, + {vnode_management_timer, 1000}, + {handoff_concurrency, 100}, + {vnode_inactivity_timeout, 1000}]}], Services = [riak_pipe], - [Primary,Secondary] = Nodes = rt:deploy_nodes(NodeDefs, Services), + [Primary,Secondary] = Nodes = rt:deploy_nodes(?NODE_COUNT, Config, Services), %% Ensure each node owns 100% of it's own ring [?assertEqual([Node], rt:owners_according_to(Node)) || Node <- Nodes], @@ -114,15 +106,20 @@ confirm() -> lager:info("Unpause workers"), Runner ! go, + set_up_vnode_crashing_intercept(Primary), ok = rt:wait_until_transfers_complete(Nodes), + lager:info("Check input count"), FillerInputCount = stop_fillers(Fillers), %% if we make it this far, then no filler ever saw the vnode_down %% error message; otherwise badmatches in queue_filler/4 will have - %% halted the test - + %% halted the test. Note that we should not see this test fail + %% with `{error,[{vnode_down,noproc}]}}` errors, as the `noproc` case + %% should be handled similarly to the `normal` exit case in + %% `riak_pipe_vnode:queue_work_wait` + lager:info("Check pipe status"), _Status2 = pipe_status(Primary, Pipe), lager:info("Send eoi and collect results"), @@ -139,7 +136,13 @@ confirm() -> lager:info("~s: PASS", [atom_to_list(?MODULE)]), pass. -%%% queue filling +set_up_vnode_crashing_intercept(Primary) -> + lager:info("Add intercept to kill vnode before calling the wait function"), + rt_intercept:add(Primary, {riak_core_vnode_master, + [{{get_vnode_pid, 3}, + return_dead_process_pid_from_get_vnode_pid}]}). + +%% queue filling %% @doc fill pipe vnode queues by repeatedly sending each input in the %% input list until the queue reports timeout. @@ -174,9 +177,9 @@ queue_filler(Node, Pipe, Inputs, Count) -> receive {stop, Owner} -> Owner ! {done, Count} after 0 -> - {{value, I}, Q} = queue:out(Inputs), - ok = rpc:call(Node, riak_pipe, queue_work, [Pipe, I]), - queue_filler(Node, Pipe, queue:in(I, Q), Count+1) + {{value, I}, Q} = queue:out(Inputs), + ok = rpc:call(Node, riak_pipe, queue_work, [Pipe, I]), + queue_filler(Node, Pipe, queue:in(I, Q), Count+1) end. %% @doc tell all fillers to stop and collect and sum their send counts diff --git a/tests/repl_process_leak.erl b/tests/repl_process_leak.erl new file mode 100644 index 000000000..fed599142 --- /dev/null +++ b/tests/repl_process_leak.erl @@ -0,0 +1,143 @@ +%% @doc The purpose of thie test is to ensure the realtime helpers on both +%% the source and sink sides properly exit when a connection is flakey; ie +%% then there are errors and not out-right closes of the connection. + +-module(repl_process_leak). +-behavior(riak_test). +-export([confirm/0]). +-include_lib("eunit/include/eunit.hrl"). + +-define(SEND_ERROR_INTERVAL, 500). + +confirm() -> + Conf = [ + {riak_repl, [ + {fullsync_on_connect, false}, + {fullsync_interval, disabled} + ]} + ], + + lager:info("deploying 2 nodes"), + Nodes = rt:deploy_nodes(2, Conf, [riak_kv, riak_repl]), + + [SourceNode, SinkNode] = Nodes, + + lager:info("nameing clusters"), + repl_util:name_cluster(SourceNode, "source"), + repl_util:name_cluster(SinkNode, "sink"), + + {ok, {_IP, Port}} = rpc:call(SinkNode, application, get_env, [riak_core, cluster_mgr]), + + lager:info("connecting clusters using port ~p", [Port]), + repl_util:connect_cluster(SourceNode, "127.0.0.1", Port), + repl_util:wait_for_connection(SourceNode, "sink"), + + lager:info("enabling and starting realtime"), + repl_util:enable_realtime(SourceNode, "sink"), + repl_util:start_realtime(SourceNode, "sink"), + + lager:info("testing for leaks on flakey sink"), + flakey_sink(SourceNode, SinkNode), + + lager:info("testing for leaks on flakey source"), + flakey_source(SourceNode, SinkNode), + + pass. + +flakey_sink(_SourceNode, SinkNode) -> + InitialCount = rpc:call(SinkNode, erlang, system_info, [process_count]), + ProcCounts = send_sink_tcp_errors(SinkNode, 20, [InitialCount]), + + Smallest = lists:min(ProcCounts), + Biggest = lists:max(ProcCounts), + ?assert(2 =< Biggest - Smallest), + %?assertEqual(InitialProcCount, PostProcCount), + % the process count is increasing, but the helper did die + true. + +send_sink_tcp_errors(_SinkNode, 0, Acc) -> + Acc; + +send_sink_tcp_errors(SinkNode, N, Acc) -> + case rpc:call(SinkNode, riak_repl2_rtsink_conn_sup, started, []) of + [] -> + timer:sleep(?SEND_ERROR_INTERVAL), + send_sink_tcp_errors(SinkNode, N, Acc); + [P | _] -> + SysStatus = sys:get_status(P), + {status, P, _Modul, [_PDict, _Status, _, _, Data]} = SysStatus, + [_Header, _Data1, Data2] = Data, + {data, [{"State", StateRec}]} = Data2, + [Helper | _] = lists:filter(fun(E) -> + is_pid(E) + end, tuple_to_list(StateRec)), + HelpMon = erlang:monitor(process, Helper), + P ! {tcp_error, <<>>, test}, + Mon = erlang:monitor(process, P), + receive {'DOWN', Mon, process, P, _} -> ok end, + receive + {'DOWN', HelpMon, process, Helper, _} -> + ok + after 10000 -> + throw("helper didn't die") + end, + timer:sleep(?SEND_ERROR_INTERVAL), + Procs = rpc:call(SinkNode, erlang, system_info, [process_count]), + send_sink_tcp_errors(SinkNode, N - 1, [Procs | Acc]) + end. + +flakey_source(SourceNode, _SinkNode) -> + InitialProcCount = rpc:call(SourceNode, erlang, system_info, [process_count]), + ProcCounts = send_source_tcp_errors(SourceNode, 20, [InitialProcCount]), + + Biggest = lists:max(ProcCounts), + Smallest = lists:min(ProcCounts), + %lager:info("initial: ~p; post: ~p", [InitialProcCount, PostProcCount]), + %?assertEqual(InitialProcCount, PostProcCount). + ?assert(2 =< Biggest - Smallest), + true. + +send_source_tcp_errors(_SourceNode, 0, Acc) -> + Acc; + +send_source_tcp_errors(SourceNode, N, Acc) -> + List = rpc:call(SourceNode, riak_repl2_rtsource_conn_sup, enabled, []), + case proplists:get_value("sink", List) of + undefined -> + timer:sleep(?SEND_ERROR_INTERVAL), + send_source_tcp_errors(SourceNode, N, Acc); + Pid -> + lager:debug("Get the status"), + SysStatus = try sys:get_status(Pid) of + S -> S + catch + W:Y -> + lager:info("Sys failed due to ~p:~p", [W,Y]), + {status, Pid, undefined, [undefined, undefined, undefined, undefined, [undefined, undefined, {data, [{"State", {Pid}}]}]]} + end, + {status, Pid, _Module, [_PDict, _Status, _, _, Data]} = SysStatus, + [_Header, _Data1, Data2] = Data, + {data, [{"State", StateRec}]} = Data2, + [Helper | _] = lists:filter(fun(E) -> + is_pid(E) + end, tuple_to_list(StateRec)), + lager:debug("mon the hlepr"), + HelperMon = erlang:monitor(process, Helper), + lager:debug("Send the murder"), + Pid ! {tcp_error, <<>>, test}, + Mon = erlang:monitor(process, Pid), + lager:debug("Wait for deaths"), + receive + {'DOWN', Mon, process, Pid, _} -> ok + end, + receive + {'DOWN', HelperMon, process, Helper, _} -> + ok + after 10000 -> + throw("Helper didn't die") + end, + timer:sleep(?SEND_ERROR_INTERVAL), + Count = rpc:call(SourceNode, erlang, system_info, [process_count]), + send_source_tcp_errors(SourceNode, N - 1, [Count | Acc]) + end. + diff --git a/tests/repl_rt_heartbeat.erl b/tests/repl_rt_heartbeat.erl index 7306404a1..88e004030 100644 --- a/tests/repl_rt_heartbeat.erl +++ b/tests/repl_rt_heartbeat.erl @@ -65,8 +65,14 @@ confirm() -> suspend_heartbeat_messages(LeaderA), %% sleep longer than the HB timeout interval to force re-connection; - %% and give it time to restart the RT connection. Wait an extra 2 seconds. - timer:sleep(timer:seconds(?HB_TIMEOUT) + 2000), + %% and give it time to restart the RT connection. + %% Since it's possible we may disable heartbeats right after a heartbeat has been fired, + %% it can take up to 2*?HB_TIMEOUT seconds to detect a missed heartbeat. The extra second + %% is to avoid rare race conditions due to the timeouts lining up exactly. Not the prettiest + %% solution, but it failed so rarely at 2*HB_TIMEOUT, that this should be good enough + %% in practice, and it beats having to write a bunch of fancy intercepts to verify that + %% the timeout has been hit internally. + timer:sleep(timer:seconds(?HB_TIMEOUT*2) + 1000), %% Verify that RT connection has restarted by noting that it's Pid has changed RTConnPid2 = get_rt_conn_pid(LeaderA), diff --git a/tests/test_hll.erl b/tests/test_hll.erl new file mode 100644 index 000000000..dbe108456 --- /dev/null +++ b/tests/test_hll.erl @@ -0,0 +1,217 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2016 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%%% @doc r_t to test hll datatypes across a riak cluster + +-module(test_hll). + +-export([confirm/0]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(HLL_TYPE1, <<"hlls1">>). +-define(HLL_TYPE2, <<"hlls2">>). +-define(HLL_TYPE3, <<"hlls3">>). +-define(BUCKET1, {?HLL_TYPE1, <<"testbucket1">>}). +-define(BUCKET2, {?HLL_TYPE2, <<"testbucket2">>}). +-define(DEFAULT_P, 14). +-define(SET_P, 16). +-define(BAD_P, 1). +-define(P_SETTING, hll_precision). +-define(KEY, <<"flipit&reverseit">>). +-define(CONFIG, + [ + {riak_core, + [{ring_creation_size, 8}, + {anti_entropy_build_limit, {100, 1000}}, + {anti_entropy_concurrency, 8}, + {handoff_concurrency, 16}] + } + ]). + +confirm() -> + %% Configure cluster. + Nodes = [N1, N2, N3, N4] = rt:build_cluster(4, ?CONFIG), + + NodeRand1A = rt:select_random([N1, N2]), + NodeRand1B = rt:select_random([N1, N2]), + NodeRand2A = rt:select_random([N3, N4]), + NodeRand2B = rt:select_random([N3, N4]), + + lager:info("Create PB/HTTP Clients from first two nodes, and then" + " the second two nodes, as we'll partition Nodes 1 & 2 from" + " Nodes 3 & 4 later"), + + %% Create PB connection. + PBC1 = rt:pbc(NodeRand1A), + PBC2 = rt:pbc(NodeRand2A), + riakc_pb_socket:set_options(PBC1, [queue_if_disconnected]), + riakc_pb_socket:set_options(PBC2, [queue_if_disconnected]), + + %% Create HTTPC connection. + HttpC1 = rt:httpc(NodeRand1B), + HttpC2 = rt:httpc(NodeRand2B), + + ok = rt:create_activate_and_wait_for_bucket_type(Nodes, + ?HLL_TYPE1, + [{datatype, hll}, + {?P_SETTING, ?SET_P}]), + + ok = rt:create_activate_and_wait_for_bucket_type(Nodes, + ?HLL_TYPE2, + [{datatype, hll}]), + + lager:info("Create a bucket-type w/ a HLL datatype and a bad HLL precision" + " - This should throw an error"), + ?assertError({badmatch, {error, [{hll_precision, _}]}}, + rt:create_activate_and_wait_for_bucket_type(Nodes, + ?HLL_TYPE3, + [{datatype, hll}, + {?P_SETTING, + ?BAD_P}])), + + pb_tests(PBC1, PBC2, riakc_pb_socket, ?BUCKET1, Nodes), + http_tests(HttpC1, HttpC2, rhc, ?BUCKET2, Nodes), + + %% Stop PB connections. + riakc_pb_socket:stop(PBC1), + riakc_pb_socket:stop(PBC2), + + pass. + +http_tests(C1, C2, CMod, Bucket, Nodes) -> + lager:info("HTTP CLI TESTS: Create new Hll DT"), + + add_tests(C1, CMod, Bucket), + + HllSet0 = get_hll(C1, CMod, Bucket), + check_precision_and_reduce_test(C1, CMod, Bucket, ?DEFAULT_P, HllSet0), + + partition_write_heal(C1, C2, CMod, Bucket, Nodes), + + HllSet1 = get_hll(C1, CMod, Bucket), + check_precision_and_reduce_invalid_test(C1, CMod, Bucket, ?DEFAULT_P - 1, + HllSet1), + + ok. + +pb_tests(C1, C2, CMod, Bucket, Nodes) -> + lager:info("PB CLI TESTS: Create new Hll DT"), + + add_tests(C1, CMod, Bucket), + + HllSet0 = get_hll(C1, CMod, Bucket), + check_precision_and_reduce_test(C1, CMod, Bucket, ?SET_P, HllSet0), + + partition_write_heal(C1, C2, CMod, Bucket, Nodes), + + HllSet1 = get_hll(C1, CMod, Bucket), + check_precision_and_reduce_invalid_test(C1, CMod, Bucket, ?SET_P - 1, HllSet1), + + ok. + +add_tests(C, CMod, Bucket) -> + S0 = riakc_hll:new(), + + add_element(C, CMod, Bucket, S0, <<"OH">>), + {ok, S1} = CMod:fetch_type(C, Bucket, ?KEY), + ?assertEqual(riakc_hll:value(S1), 1), + + add_elements(C, CMod, Bucket, S1, [<<"C">>, <<"A">>, <<"P">>]), + {ok, S2} = CMod:fetch_type(C, Bucket, ?KEY), + ?assertEqual(riakc_hll:value(S2), 4), + + add_redundant_element(C, CMod, Bucket, S2, <<"OH">>), + {ok, S3} = CMod:fetch_type(C, Bucket, ?KEY), + ?assertEqual(riakc_hll:value(S3), 4). + +partition_write_heal(C1, C2, CMod, Bucket, Nodes) -> + lager:info("Partition cluster in two to force merge."), + [N1, N2, N3, N4] = Nodes, + PartInfo = rt:partition([N1, N2], [N3, N4]), + + try + lager:info("Write to one side of the partition"), + {ok, S0} = CMod:fetch_type(C1, Bucket, ?KEY), + add_element(C1, CMod, Bucket, S0, <<"OH hello there">>), + {ok, S1} = CMod:fetch_type(C1, Bucket, ?KEY), + ?assertEqual(riakc_hll:value(S1), 5), + + lager:info("Write to the other side of the partition"), + {ok, S2} = CMod:fetch_type(C2, Bucket, ?KEY), + add_element(C2, CMod, Bucket, S2, <<"Riak 1.4.eva">>), + {ok, S3} = CMod:fetch_type(C2, Bucket, ?KEY), + ?assertEqual(riakc_hll:value(S3), 5), + + lager:info("Heal") + after + ok = rt:heal(PartInfo) + end, + + ok = rt:wait_until_no_pending_changes(Nodes), + ok = rt:wait_until_transfers_complete(Nodes), + + lager:info("Once healed, check both sides for the correct, merged value"), + {ok, S4} = CMod:fetch_type(C1, Bucket, ?KEY), + ?assertEqual(riakc_hll:value(S4), 6), + {ok, S5} = CMod:fetch_type(C2, Bucket, ?KEY), + ?assertEqual(riakc_hll:value(S5), 6). + +get_hll(C, CMod, Bucket) -> + {ok, Obj} =CMod:get(C, Bucket, ?KEY), + {ok, CRDT} = riak_kv_crdt:from_binary(riakc_obj:get_value(Obj)), + {_, _, _, HllSet} = CRDT, + HllSet. + +add_element(C, CMod, Bucket, S, Elem) -> + lager:info("Add element to HLL DT"), + CMod:update_type( + C, Bucket, ?KEY, + riakc_hll:to_op( + riakc_hll:add_element(Elem, S))). + +add_elements(C, CMod, Bucket, S, Elems) -> + lager:info("Add multiple elements to HLL DT"), + CMod:update_type( + C, Bucket, ?KEY, + riakc_hll:to_op( + riakc_hll:add_elements(Elems, S))). + +add_redundant_element(C, CMod, Bucket, S, Elem) -> + lager:info("Add redundant element to HLL DT by calling" + " add_element/3 again"), + add_element(C, CMod, Bucket, S, Elem). + +check_precision_and_reduce_test(C, CMod, Bucket, ExpP, HllSet) -> + {ok, Props0} = CMod:get_bucket(C, Bucket), + ?assertEqual(proplists:get_value(?P_SETTING, Props0), ExpP), + ?assertEqual(riak_kv_hll:precision(HllSet), ExpP), + ok = CMod:set_bucket(C, Bucket, [{?P_SETTING, ExpP - 1}]), + {ok, Props1} = CMod:get_bucket(C, Bucket), + ?assertEqual(proplists:get_value(?P_SETTING, Props1), ExpP - 1). + +check_precision_and_reduce_invalid_test(C, CMod, Bucket, ExpP, HllSet) -> + lager:info("HLL's can be reduced, but never increased.\n" + " Test to make sure we don't allow invalid values."), + + ?assertEqual(riak_kv_hll:precision(HllSet), ExpP), + {error, _} = CMod:set_bucket(C, Bucket, [{?P_SETTING, ExpP + 1}]), + {ok, Props} = CMod:get_bucket(C, Bucket), + ?assertEqual(proplists:get_value(?P_SETTING, Props), ExpP), + ?assertEqual(riak_kv_hll:precision(HllSet), ExpP). diff --git a/tests/verify_aae.erl b/tests/verify_aae.erl index 1d5fa2dda..f81b14c1f 100644 --- a/tests/verify_aae.erl +++ b/tests/verify_aae.erl @@ -43,6 +43,7 @@ % I would hope this would come from the testing framework some day % to use the test in small and large scenarios. -define(DEFAULT_RING_SIZE, 8). +-define(AAE_THROTTLE_LIMITS, [{-1, 0}, {100, 10}]). -define(CFG, [{riak_kv, [ @@ -51,7 +52,8 @@ {anti_entropy_build_limit, {100, 1000}}, {anti_entropy_concurrency, 100}, {anti_entropy_expire, 24 * 60 * 60 * 1000}, % Not for now! - {anti_entropy_tick, 500} + {anti_entropy_tick, 500}, + {aae_throttle_limits, ?AAE_THROTTLE_LIMITS} ]}, {riak_core, [ @@ -65,9 +67,25 @@ confirm() -> Nodes = rt:build_cluster(?NUM_NODES, ?CFG), + verify_throttle_config(Nodes), verify_aae(Nodes), pass. +verify_throttle_config(Nodes) -> + lists:foreach( + fun(Node) -> + ?assert(rpc:call(Node, + riak_kv_entropy_manager, + is_aae_throttle_enabled, + [])), + ?assertMatch(?AAE_THROTTLE_LIMITS, + rpc:call(Node, + riak_kv_entropy_manager, + get_aae_throttle_limits, + [])) + end, + Nodes). + verify_aae(Nodes) -> Node1 = hd(Nodes), % First, recovery without tree rebuilds diff --git a/tests/verify_cluster_converge.erl b/tests/verify_cluster_converge.erl new file mode 100644 index 000000000..6b5931f79 --- /dev/null +++ b/tests/verify_cluster_converge.erl @@ -0,0 +1,15 @@ +-module(verify_cluster_converge). + +-behavior(riak_test). +-export([confirm/0]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("riakc/include/riakc.hrl"). + +-define(assertDenied(Op), ?assertMatch({error, <<"Permission",_/binary>>}, Op)). + +confirm() -> + lager:info("Deploy & cluster some nodes"), + + _Nodes = rt:build_cluster(4), + pass. \ No newline at end of file diff --git a/tests/verify_crdt_capability.erl b/tests/verify_crdt_capability.erl index 9b49b5a57..d444af721 100644 --- a/tests/verify_crdt_capability.erl +++ b/tests/verify_crdt_capability.erl @@ -63,8 +63,13 @@ confirm() -> lager:info("Upgrayded!!"), ?assertEqual(ok, rt:wait_until_ready(Current)), ?assertEqual(ok, rt:wait_until_ready(Previous)), - rt:wait_for_service(Previous, riak_kv), - ?assertEqual(ok, rt:wait_until_capability_contains(Current, {riak_kv, crdt}, [riak_dt_pncounter, riak_dt_orswot, riak_dt_map, pncounter])), + rt:wait_for_service(Previous, riak_kv), + ?assertEqual(ok, rt:wait_until_capability_contains(Current, {riak_kv, crdt}, + [riak_dt_pncounter, + riak_dt_orswot, + riak_dt_map, + pncounter, + riak_kv_hll])), ?assertMatch(ok, rhc:counter_incr(PrevHttp, ?BUCKET, ?KEY, 1)), ?assertMatch({ok, 5}, rhc:counter_val(PrevHttp, ?BUCKET, ?KEY)), @@ -86,7 +91,7 @@ confirm() -> ?assertEqual(8, begin {ok, Counter} = riakc_pb_socket:fetch_type(PrevPB1, {<<"default">>, ?BUCKET}, ?KEY), riakc_counter:value(Counter) - end), + end), ?assertEqual(ok, riakc_pb_socket:update_type(PrevPB1, {<<"default">>, ?BUCKET}, ?KEY, gen_counter_op())), ?assertEqual({ok, 9}, riakc_pb_socket:counter_val(PB, ?BUCKET, ?KEY)), diff --git a/tests/verify_dt_converge.erl b/tests/verify_dt_converge.erl index 60c78ab36..db6b67e8c 100644 --- a/tests/verify_dt_converge.erl +++ b/tests/verify_dt_converge.erl @@ -32,9 +32,11 @@ -define(CTYPE, <<"counters">>). -define(STYPE, <<"sets">>). -define(MTYPE, <<"maps">>). +-define(HTYPE, <<"hlls">>). -define(TYPES, [{?CTYPE, counter}, {?STYPE, set}, - {?MTYPE, map}]). + {?MTYPE, map}, + {?HTYPE, hll}]). -define(PB_BUCKET, <<"pbtest">>). -define(HTTP_BUCKET, <<"httptest">>). @@ -58,17 +60,17 @@ confirm() -> %% Do some updates to each type [update_1(Type, ?PB_BUCKET, Client, riakc_pb_socket) || - {Type, Client} <- lists:zip(?TYPES, [P1, P2, P3])], + {Type, Client} <- lists:zip(?TYPES, [P1, P2, P3, P4])], [update_1(Type, ?HTTP_BUCKET, Client, rhc) || - {Type, Client} <- lists:zip(?TYPES, [H1, H2, H3])], + {Type, Client} <- lists:zip(?TYPES, [H1, H2, H3, H4])], %% Check that the updates are stored [check_1(Type, ?PB_BUCKET, Client, riakc_pb_socket) || - {Type, Client} <- lists:zip(?TYPES, [P4, P3, P2])], + {Type, Client} <- lists:zip(?TYPES, [P4, P3, P2, P1])], [check_1(Type, ?HTTP_BUCKET, Client, rhc) || - {Type, Client} <- lists:zip(?TYPES, [H4, H3, H2])], + {Type, Client} <- lists:zip(?TYPES, [H4, H3, H2, H1])], lager:info("Partition cluster in two."), @@ -77,34 +79,34 @@ confirm() -> lager:info("Modify data on side 1"), %% Modify one side [update_2a(Type, ?PB_BUCKET, Client, riakc_pb_socket) || - {Type, Client} <- lists:zip(?TYPES, [P1, P2, P1])], + {Type, Client} <- lists:zip(?TYPES, [P1, P2, P1, P2])], [update_2a(Type, ?HTTP_BUCKET, Client, rhc) || - {Type, Client} <- lists:zip(?TYPES, [H1, H2, H1])], + {Type, Client} <- lists:zip(?TYPES, [H1, H2, H1, H2])], lager:info("Check data is unmodified on side 2"), %% check value on one side is different from other [check_2b(Type, ?PB_BUCKET, Client, riakc_pb_socket) || - {Type, Client} <- lists:zip(?TYPES, [P4, P3, P4])], + {Type, Client} <- lists:zip(?TYPES, [P4, P3, P4, P3])], [check_2b(Type, ?HTTP_BUCKET, Client, rhc) || - {Type, Client} <- lists:zip(?TYPES, [H4, H3, H4])], + {Type, Client} <- lists:zip(?TYPES, [H4, H3, H4, H3])], lager:info("Modify data on side 2"), %% Modify other side [update_3b(Type, ?PB_BUCKET, Client, riakc_pb_socket) || - {Type, Client} <- lists:zip(?TYPES, [P3, P4, P3])], + {Type, Client} <- lists:zip(?TYPES, [P3, P4, P3, P4])], [update_3b(Type, ?HTTP_BUCKET, Client, rhc) || - {Type, Client} <- lists:zip(?TYPES, [H3, H4, H3])], + {Type, Client} <- lists:zip(?TYPES, [H3, H4, H3, H4])], lager:info("Check data is unmodified on side 1"), %% verify values differ [check_3a(Type, ?PB_BUCKET, Client, riakc_pb_socket) || - {Type, Client} <- lists:zip(?TYPES, [P2, P2, P1])], + {Type, Client} <- lists:zip(?TYPES, [P2, P2, P1, P1])], [check_3a(Type, ?HTTP_BUCKET, Client, rhc) || - {Type, Client} <- lists:zip(?TYPES, [H2, H2, H1])], + {Type, Client} <- lists:zip(?TYPES, [H2, H2, H1, H1])], %% heal lager:info("Heal and check merged values"), @@ -138,7 +140,7 @@ create_bucket_types([N1|_]=Nodes, Types) -> [Name, [{datatype, Type}, {allow_mult, true}]]) || {Name, Type} <- Types ], [rt:wait_until(N1, bucket_type_ready_fun(Name)) || {Name, _Type} <- Types], - [ rt:wait_until(N, bucket_type_matches_fun(Types)) || N <- Nodes]. + [rt:wait_until(N, bucket_type_matches_fun(Types)) || N <- Nodes]. bucket_type_ready_fun(Name) -> fun(Node) -> @@ -190,6 +192,13 @@ update_1({BType, map}, Bucket, Client, CMod) -> riakc_counter:increment(10, C) end, M1) end, + {BType, Bucket}, ?KEY, ?MODIFY_OPTS); +update_1({BType, hll}, Bucket, Client, CMod) -> + lager:info("update_1: Updating hyperloglog(set)"), + CMod:modify_type(Client, + fun(S) -> + riakc_hll:add_element(<<"Z">>, S) + end, {BType, Bucket}, ?KEY, ?MODIFY_OPTS). check_1({BType, counter}, Bucket, Client, CMod) -> @@ -202,7 +211,10 @@ check_1({BType, map}, Bucket, Client, CMod) -> lager:info("check_1: Checking map value is correct"), check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_map, [{{<<"followers">>, counter}, 10}, - {{<<"friends">>, set}, [<<"Russell">>]}]). + {{<<"friends">>, set}, [<<"Russell">>]}]); +check_1({BType, hll}, Bucket, Client, CMod) -> + lager:info("check_1: Checking hll value is correct"), + check_value(Client,CMod,{BType, Bucket},?KEY,riakc_hll,1). update_2a({BType, counter}, Bucket, Client, CMod) -> CMod:modify_type(Client, @@ -231,6 +243,14 @@ update_2a({BType, map}, Bucket, Client, CMod) -> end, M1) end, + {BType, Bucket}, ?KEY, ?MODIFY_OPTS); +update_2a({BType, hll}, Bucket, Client, CMod) -> + CMod:modify_type(Client, + fun(S) -> + riakc_hll:add_element( + <<"DANG">>, + riakc_hll:add_element(<<"Z^2">>, S)) + end, {BType, Bucket}, ?KEY, ?MODIFY_OPTS). check_2b({BType, counter}, Bucket, Client, CMod) -> @@ -243,7 +263,10 @@ check_2b({BType, map},Bucket,Client,CMod) -> lager:info("check_2b: Checking map value is unchanged"), check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_map, [{{<<"followers">>, counter}, 10}, - {{<<"friends">>, set}, [<<"Russell">>]}]). + {{<<"friends">>, set}, [<<"Russell">>]}]); +check_2b({BType, hll},Bucket,Client,CMod) -> + lager:info("check_2b: Checking hll value is unchanged"), + check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_hll, 1). update_3b({BType, counter}, Bucket, Client, CMod) -> CMod:modify_type(Client, @@ -273,6 +296,12 @@ update_3b({BType, map},Bucket,Client,CMod) -> end, M1) end, + {BType, Bucket}, ?KEY, ?MODIFY_OPTS); +update_3b({BType, hll}, Bucket, Client, CMod) -> + CMod:modify_type(Client, + fun(S) -> + riakc_hll:add_element(<<"Zedds Dead">>, S) + end, {BType, Bucket}, ?KEY, ?MODIFY_OPTS). check_3a({BType, counter}, Bucket, Client, CMod) -> @@ -287,7 +316,10 @@ check_3a({BType, map}, Bucket, Client, CMod) -> check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_map, [{{<<"followers">>, counter}, 10}, {{<<"friends">>, set}, [<<"Russell">>, <<"Sam">>]}, - {{<<"verified">>, flag}, true}]). + {{<<"verified">>, flag}, true}]); +check_3a({BType, hll}, Bucket, Client, CMod) -> + lager:info("check_3a: Checking hll value is unchanged"), + check_value(Client,CMod,{BType, Bucket},?KEY,riakc_hll,3). check_4({BType, counter}, Bucket, Client, CMod) -> lager:info("check_4: Checking final merged value of counter"), @@ -311,6 +343,14 @@ check_4({BType, map}, Bucket, Client, CMod) -> {{<<"followers">>, counter}, 10}, {{<<"friends">>, set}, [<<"Sam">>]}, {{<<"verified">>, flag}, true}], + [{pr, 3}, {notfound_ok, false}]); +check_4({BType, hll}, Bucket, Client, CMod) -> + lager:info("check_4: Checking final merged value of hll"), + check_value(Client, + CMod, {BType, Bucket}, + ?KEY, + riakc_hll, + 4, [{pr, 3}, {notfound_ok, false}]). check_value(Client, CMod, Bucket, Key, DTMod, Expected) -> @@ -322,7 +362,8 @@ check_value(Client, CMod, Bucket, Key, DTMod, Expected, Options) -> try Result = CMod:fetch_type(Client, Bucket, Key, Options), - lager:info("Expected ~p~n got ~p~n", [Expected, Result]), + lager:info("Expected ~p~n got ~p~n", [Expected, + Result]), ?assertMatch({ok, _}, Result), {ok, C} = Result, ?assertEqual(true, DTMod:is_type(C)), diff --git a/tests/verify_handoff_write_once.erl b/tests/verify_handoff_write_once.erl new file mode 100644 index 000000000..9753946ed --- /dev/null +++ b/tests/verify_handoff_write_once.erl @@ -0,0 +1,207 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(verify_handoff_write_once). +-behavior(riak_test). +-export([confirm/0]). +-include_lib("eunit/include/eunit.hrl"). +-define(BUCKET_TYPE, <<"write_once">>). +-define(BUCKET, {?BUCKET_TYPE, <<"write_once">>}). + + +%% @doc This test will run a handoff in the case of write_once buckets, verifying +%% that write-once entries are properly handed off as part of ownership handoff, +%% but more importantly, that riak_kv_vnode properly handles data being written into +%% riak while ownership handoff is taking place. +%% +%% This test will create two nodes each with a ring size of 8, and populate one node +%% with 1k entries. It will then join the two nodes to make a cluster of size 2, which +%% will result in ownership handoff of four of the nodes (in each direction). +%% +%% We have intercepted the riak_kv_worker, which handles handoff for an individual vnode, +%% to ensure what we can send data through Riak while the cluster is in the handoff state, +%% thus ensuring that the riak_kv_vnode:handle_handoff_command callback is exercised in +%% the case of write_once buckets. +%% +%% We install intercepts at key points in the vnode to measure how many time various key +%% parts of the code are called. +%% +%% We run the above test twice, once in the case where we are doing asynchronous writes on the +%% back end, and once when we are using synchronous writes. Currently, this is toggled via +%% the use of a back end that can support async writes (currently, only leveldb) +%% +confirm() -> + + AsyncConfig = create_config(riak_kv_eleveldb_backend), + AsyncCluster = run_test(AsyncConfig, true), + + rt:clean_cluster(AsyncCluster), + + SyncConfig = create_config(riak_kv_memory_backend), + _SyncCluster = run_test(SyncConfig, false), + + pass. + +create_config(Backend) -> + [{riak_core, [ + {default_bucket_props, [{n_val, 1}]}, + {ring_creation_size, 8}, + {handoff_acksync_threshold, 20}, + {handoff_concurrency, 4}, + {handoff_receive_timeout, 2000}, + {vnode_management_timer, 100}]}, + {riak_kv, [ + {storage_backend, Backend}]} + ]. + +run_test(Config, AsyncWrites) -> + %% + %% Deploy 2 nodes based on config. Wait for K/V to start on each node. + %% + lager:info("Deploying 2 nodes..."), + Cluster = [RootNode, NewNode] = rt:deploy_nodes(2, Config), + [rt:wait_for_service(Node, riak_kv) || Node <- [RootNode, NewNode]], + %% + %% Set up the intercepts + %% + lager:info("Setting up intercepts..."), + make_intercepts_tab(RootNode), + % This intercept will tell the backround process (below) to send an event for each + % vnode that is being handed off (there will be 4 such vnodes, in this test case) + rt_intercept:add( + RootNode, {riak_kv_worker, [{{handle_work, 3}, handle_work_intercept}]} + ), + rt_intercept:add( + RootNode, {riak_kv_vnode, [ + %% Count everytime riak_kv_vnode:handle_handoff_command/3 is called with a write_once message + {{handle_handoff_command, 3}, count_handoff_w1c_puts}, + %% Count everytime riak_kv_vnode:handle_command/3 is called with a write_once message + {{handle_command, 3}, count_w1c_handle_command} + ]} + ), + true = rpc:call(RootNode, ets, insert, [intercepts_tab, {w1c_async_replies, 0}]), + true = rpc:call(RootNode, ets, insert, [intercepts_tab, {w1c_sync_replies, 0}]), + true = rpc:call(RootNode, ets, insert, [intercepts_tab, {w1c_put_counter, 0}]), + %% + %% Seed the root node with some data + %% + lager:info("Populating root node..."), + rt:create_and_activate_bucket_type(RootNode, ?BUCKET_TYPE, [{write_once, true}, {n_val, 1}]), + NTestItems = 100, + RingSize = proplists:get_value(ring_creation_size, proplists:get_value(riak_core, Config)), + [] = rt:systest_write(RootNode, 1, NTestItems, ?BUCKET, 1), + %% + %% Start an asynchronous proc which will send puts into riak during handoff. + %% + lager:info("Joining new node with cluster..."), + start_proc(RootNode, NTestItems, RingSize div 2), + rt:join(NewNode, RootNode), + TotalSent = wait_until_async_writes_complete(), + ?assertMatch(ok, rt:wait_until_nodes_ready(Cluster)), + rt:wait_until_bucket_type_visible(Cluster, ?BUCKET_TYPE), + rt:wait_until_no_pending_changes(Cluster), + rt:wait_until_transfers_complete(Cluster), + %% + %% Verify the results + %% + lager:info("Validating data after handoff..."), + Results2 = rt:systest_read(NewNode, 1, TotalSent, ?BUCKET, 1), + ?assertMatch([], Results2), + lager:info("Read ~p entries.", [TotalSent]), + [{_, Count}] = rpc:call(RootNode, ets, lookup, [intercepts_tab, w1c_put_counter]), + ?assertEqual(RingSize div 2, Count), + lager:info("We handled ~p write_once puts during handoff.", [Count]), + [{_, W1CAsyncReplies}] = rpc:call(RootNode, ets, lookup, [intercepts_tab, w1c_async_replies]), + [{_, W1CSyncReplies}] = rpc:call(RootNode, ets, lookup, [intercepts_tab, w1c_sync_replies]), + case AsyncWrites of + true -> + ?assertEqual(NTestItems + RingSize div 2, W1CAsyncReplies), + ?assertEqual(0, W1CSyncReplies); + false -> + ?assertEqual(0, W1CAsyncReplies), + ?assertEqual(NTestItems + RingSize div 2, W1CSyncReplies) + end, + Cluster. + +make_intercepts_tab(Node) -> + SupPid = rpc:call(Node, erlang, whereis, [sasl_safe_sup]), + intercepts_tab = rpc:call(Node, ets, new, [intercepts_tab, [named_table, + public, set, {heir, SupPid, {}}]]). + + +%% +%% Notes on the background process and corresponding intercepts. +%% +%% The code below is used to spawn a background process that is globally +%% registered with the name rt_ho_w1c_proc. This process will +%% wait for a message from the riak_kv_worker handle_work intercept, +%% telling this proc to write a message into Riak. The timing of the +%% intercept is such that the write is guaranteed to take place while +%% handoff is in progress, but before the vnode has been told to finish. +%% Sending this message will trigger this background process to do a +%% write into Riak, which in turn will force the vnode's +%% handle_handoff_command to be called. +%% + +-record(state, { + node, sender, k, pids=[], expected, init=true +}). + +start_proc(Node, NTestItems, Expected) -> + Self = self(), + Pid = spawn_link(fun() -> loop(#state{node=Node, sender=Self, k=NTestItems, expected=Expected}) end), + global:register_name(rt_ho_w1c_proc, Pid), + receive ok -> ok end. + +loop(#state{node=Node, sender=Sender, k=K, pids=Pids, expected=Expected, init=Init} = State) -> + case Init of + true -> + Sender ! ok; + _ -> ok + end, + receive + {write, Pid} -> + ThePids = [Pid | Pids], + NumPids = length(ThePids), + case NumPids of + Expected -> + %% + %% The number of expected vnodes are now in the handoff state. Do some writes, and send ok's + %% back to the waiting vnodes. Once they get the ok back, they will complete handoff. At this + %% point, we are done, so we can tell the test to proceed and wait for handoff to complete. + %% + [] = rt:systest_write(Node, K + 1, K + Expected, ?BUCKET, 1), + lager:info( + "Asynchronously wrote entries [~p..~p] during handoff. Sending ok's back to ~p waiting vnode(s)...", + [K + 1, K + Expected, NumPids] + ), + [ThePid ! ok || ThePid <- ThePids], + Sender ! (K + Expected); + _ -> + loop(State#state{pids=ThePids, init=false}) + end + end. + + +wait_until_async_writes_complete() -> + receive + K -> K + after 60000 -> + throw("Timed out after 60s waiting for async writes to complete.") + end. \ No newline at end of file diff --git a/tests/verify_object_limits.erl b/tests/verify_object_limits.erl index fd8af35dc..e8a0d4d06 100644 --- a/tests/verify_object_limits.erl +++ b/tests/verify_object_limits.erl @@ -36,6 +36,7 @@ confirm() -> [Node1] = rt:build_cluster(1, [{riak_kv, [ {ring_creation_size, 8}, + {anti_entropy, {off,[]}}, {max_object_size, ?MAX_SIZE}, {warn_object_size, ?WARN_SIZE}, {max_siblings, ?MAX_SIBLINGS}, @@ -51,6 +52,9 @@ confirm() -> [{allow_mult, true}])), verify_size_limits(C, Node1), verify_sibling_limits(C, Node1), + lager:notice("Starting readrepair section of test"), + verify_readrepair_ignore_max_size(C, Node1), + verify_readrepair_ignore_max_sib(C, Node1), pass. verify_size_limits(C, Node1) -> @@ -128,3 +132,47 @@ verify_sibling_limits(C, Node1) -> lager:info("Result when too many siblings : ~p", [Res]), ?assertMatch({error,_}, Res), ok. + +verify_readrepair_ignore_max_size(C, Node1) -> + % Add intercept to treat all vnode puts as readrepairs + Intercept = {riak_kv_vnode, [{{put, 6}, put_as_readrepair},{{coord_put,6}, coord_put_as_readrepair}]}, + ok = rt_intercept:add(Node1, Intercept), + % Do put with value greater than max size and confirm warning + lager:info("Checking readrepair put of size ~p, expecting ok result and log warning", [?MAX_SIZE*2]), + K = <<"rrsizetest">>, + V = <<0:(?MAX_SIZE*2)/integer-unit:8>>, + O = riakc_obj:new(?BUCKET, K, V), + ?assertMatch(ok, riakc_pb_socket:put(C, O)), + verify_size_write_warning(Node1, K, ?MAX_SIZE*2), + % Clean intercept + ok = rt_intercept:clean(Node1, riak_kv_vnode), + ok. + +verify_readrepair_ignore_max_sib(C, Node1) -> + lager:info("Checking sibling warning on readrepair above max siblings=~p", [?MAX_SIBLINGS]), + K = <<"rrsibtest">>, + V = <<"sibtest">>, + O = riakc_obj:new(?BUCKET, K, V), + % Force sibling error + [?assertMatch(ok, riakc_pb_socket:put(C, O)) + || _ <- lists:seq(1, ?MAX_SIBLINGS)], + Res = riakc_pb_socket:put(C, O), + lager:info("Result when too many siblings : ~p", [Res]), + ?assertMatch({error,_}, Res), + % Add intercept to spoof writes as readrepair + Intercept = {riak_kv_vnode, [{{put, 6}, put_as_readrepair},{{coord_put,6}, coord_put_as_readrepair}]}, + ok = rt_intercept:add(Node1, Intercept), + % Verify readrepair writes return ok and log warning + lager:info("Verifying succesful put above max_siblings with readrepair"), + ?assertMatch(ok, riakc_pb_socket:put(C, O)), + P = io_lib:format("warning.*siblings.*~p.*~p.*(~p)", + [?BUCKET, K, ?MAX_SIBLINGS+1]), + Found = rt:expect_in_log(Node1, P), + lager:info("Looking for sibling warning: ~p", [Found]), + ?assertEqual(true, Found), + % Clean intercept + ok = rt_intercept:clean(Node1, riak_kv_vnode), + ok. + + + diff --git a/tests/verify_riak_stats.erl b/tests/verify_riak_stats.erl index 8ed8edc03..efa072a29 100644 --- a/tests/verify_riak_stats.erl +++ b/tests/verify_riak_stats.erl @@ -838,6 +838,19 @@ common_stats() -> <<"vnode_set_update_time_median">>, <<"vnode_set_update_total">>, <<"webmachine_version">>, + <<"write_once_merge">>, + <<"write_once_put_objsize_100">>, + <<"write_once_put_objsize_95">>, + <<"write_once_put_objsize_99">>, + <<"write_once_put_objsize_mean">>, + <<"write_once_put_objsize_median">>, + <<"write_once_put_time_100">>, + <<"write_once_put_time_95">>, + <<"write_once_put_time_99">>, + <<"write_once_put_time_mean">>, + <<"write_once_put_time_median">>, + <<"write_once_puts">>, + <<"write_once_puts_total">>, <<"xmerl_version">>, <<"yokozuna_version">> ]. diff --git a/tests/verify_snmp_repl.erl b/tests/verify_snmp_repl.erl index dde8b60a6..ead7bff2e 100644 --- a/tests/verify_snmp_repl.erl +++ b/tests/verify_snmp_repl.erl @@ -63,6 +63,7 @@ intercept_riak_snmp_stat_poller(Node) -> RiakTestProcess ! pass catch Exception:Reason -> + lager:error("Failure in riak_snmp_stat_poller_orig:set_rows_orig: ~p~n", [{Exception, Reason}]), RiakTestProcess ! {fail, {Exception, Reason}}, error({Exception, Reason}) end @@ -70,11 +71,19 @@ intercept_riak_snmp_stat_poller(Node) -> wait_for_snmp_stat_poller() -> receive - pass -> pass; - {fail, Reason} -> {fail, Reason}; - X -> {fail, {unknown, X}} + pass -> + pass; + {fail, Reason} -> + lager:error("Failure in wait_for_snmp_stat_poller: ~p~n", [Reason]), + error({fail, Reason}); + X -> + lager:error("Unknown failure in wait_for_snmp_stat_poller: ~p~n", [X]), + error(X) after - 1000 -> {fail, timeout} + 1000 -> + Message = "Timeout waiting for snmp_stat_poller.", + lager:error(Message), + error({timeout, Message}) end. make_nodes(NodeCount, ClusterCount, Config) -> diff --git a/tests/verify_write_once.erl b/tests/verify_write_once.erl new file mode 100644 index 000000000..02fcf41d4 --- /dev/null +++ b/tests/verify_write_once.erl @@ -0,0 +1,333 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(verify_write_once). +-export([confirm/0]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(DEFAULT_RING_SIZE, 16). +-define(NVAL, 2). +-define(BUCKET_TYPE, <<"write_once">>). +-define(BUCKET, {?BUCKET_TYPE, <<"bucket">>}). +-define(ASYNC_PUT_BUCKET_TYPE, <<"async_put">>). +-define(ASYNC_PUT_BUCKET, {?ASYNC_PUT_BUCKET_TYPE, <<"bucket">>}). +-define(ANY_VALUE, <<"any">>). + + +%% @doc This test exercises the write_once bucket property, which results in puts that avoid coordination +%% and reads before writes, and which therefore have lower latency and higher throughput. +%% +confirm() -> + %% + %% Set two clusters. We need one for most of the testing of this code path. + %% The first cluster will use the memory back end. + %% The second cluster will be a singleton cluster with the leveldb back end, + %% in order to test asynchronous puts + %% + [Cluster1, Cluster2] = rt:deploy_clusters([ + {4, config(?DEFAULT_RING_SIZE, ?NVAL)}, + {1, config(?DEFAULT_RING_SIZE, ?NVAL, riak_kv_eleveldb_backend)} + ]), + rt:join_cluster(Cluster1), + % rt:join_cluster(Cluster2), + lager:info("Set up clusters: ~p, ~p", [Cluster1, Cluster2]), + %% + %% Select a random node, and use it to create an immutable bucket + %% + Node = lists:nth(random:uniform(length((Cluster1))), Cluster1), + rt:create_and_activate_bucket_type(Node, ?BUCKET_TYPE, [{write_once, true}]), + rt:wait_until_bucket_type_status(?BUCKET_TYPE, active, Cluster1), + lager:info("Created ~p bucket type on ~p", [?BUCKET_TYPE, Node]), + %% + %% + %% + pass = confirm_put(Node), + pass = confirm_w(Cluster1), + pass = confirm_pw(Cluster1), + pass = confirm_rww(Cluster1), + pass = confirm_async_put(hd(Cluster2)), + pass. + +%% +%% private +%% + + +confirm_put(Node) -> + ok = verify_put(Node, ?BUCKET, <<"confirm_put_key">>, <<"confirm_put_value">>), + verify_failed_put( + Node, ?BUCKET, <<"confirm_put-bad_w">>, ?ANY_VALUE, [{w, 9999}], + fun(Error) -> + ?assertMatch({n_val_violation, 3}, Error) + end + ), + verify_failed_put( + Node, ?BUCKET, <<"confirm_put-bad_pw">>, ?ANY_VALUE, [{pw, 9999}], + fun(Error) -> + ?assertMatch({n_val_violation, 3}, Error) + end + ), + lager:info("confirm_put...ok"), + pass. + + +confirm_w(Nodes) -> + %% + %% split the cluster into 2 paritions [dev1, dev2, dev3], [dev4] + %% + P1 = lists:sublist(Nodes, 3), + P2 = lists:sublist(Nodes, 4, 1), + PartitonInfo = rt:partition(P1, P2), + [Node1 | _Rest1] = P1, + verify_put(Node1, ?BUCKET, <<"confirm_w_key">>, <<"confirm_w_value">>), + [Node2 | _Rest2] = P2, + %% + %% By setting sloppy_quorum to false, we require a strict quorum of primaries. But because + %% we only have one node in the partition, the put should fail. It should bail immediately + %% without even attempting a write on the back end, because a quorum will not be possible. + %% + verify_failed_put( + Node2, ?BUCKET, <<"confirm_w_key">>, <<"confirm_w_value">>, [{sloppy_quorum, false}], + fun(Error) -> + ?assertMatch({insufficient_vnodes, _, need, 2}, Error) + end + ), + rt:heal(PartitonInfo), + lager:info("confirm_pw...ok"), + pass. + + +confirm_pw(Nodes) -> + %% + %% split the cluster into 2 paritions [dev1, dev2, dev3], [dev4] + %% + P1 = lists:sublist(Nodes, 3), + P2 = lists:sublist(Nodes, 4, 1), + PartitonInfo = rt:partition(P1, P2), + [Node1 | _Rest1] = P1, + verify_put(Node1, ?BUCKET, <<"confirm_pw_key">>, <<"confirm_pw_value">>), + [Node2 | _Rest2] = P2, + %% + %% Similar to the above test -- if pw is all, then we require n_val puts on primaries, but + %% the node is a singleton in the partition, so this, too, should fail. This will time + %% out, so set the timeout to something small. + %% + verify_put_timeout( + Node2, ?BUCKET, <<"confirm_pw_key">>, ?ANY_VALUE, [{pw, all}], 1000, + fun(Error) -> + ?assertMatch({pw_val_unsatisfied, 3, _}, Error) + end + ), + rt:heal(PartitonInfo), + lager:info("confirm_pw...ok"), + pass. + +confirm_rww(Nodes) -> + %% + %% split the cluster into 2 paritions + %% + P1 = lists:sublist(Nodes, 2), + P2 = lists:sublist(Nodes, 3, 2), + PartitonInfo = rt:partition(P1, P2), + NumFastMerges = num_fast_merges(Nodes), + %% + %% put different values into each partiton + %% + [Node1 | _Rest1] = P1, + verify_put(Node1, ?BUCKET, <<"confirm_rww_key">>, <<"confirm_rww_value1">>), + [Node2 | _Rest2] = P2, + verify_put(Node2, ?BUCKET, <<"confirm_rww_key">>, <<"confirm_rww_value2">>), + %% + %% After healing, both should agree on an arbitrary value + %% + rt:heal(PartitonInfo), + rt:wait_until(fun() -> + V1 = get(Node1, ?BUCKET, <<"confirm_rww_key">>), + V2 = get(Node2, ?BUCKET, <<"confirm_rww_key">>), + V1 =:= V2 + end), + ?assert(NumFastMerges < num_fast_merges(Nodes)), + lager:info("confirm_rww...ok"), + pass. + +%% +%% In order to test asynchronous puts, at this point we need a node with leveldb, as +%% that is currently the only back end that supports it. In the future, we may add +%% async puts as a capability which can be arbitrated through the multi backend. +%% +confirm_async_put(Node) -> + %% + %% Set up the intercepts on the singleton node in cluster2 + %% + make_intercepts_tab(Node), + rt_intercept:add( + Node, {riak_kv_vnode, [ + %% Count everytime riak_kv_vnode:handle_handoff_command/3 is called with a write_once message + {{handle_command, 3}, count_w1c_handle_command} + ]} + ), + %% + %% Create the bucket type + %% + rt:create_and_activate_bucket_type(Node, ?ASYNC_PUT_BUCKET_TYPE, [{write_once, true}, {backend, myeleveldb}]), + rt:wait_until_bucket_type_status(?ASYNC_PUT_BUCKET_TYPE, active, [Node]), + lager:info("Created ~p bucket type on ~p", [?ASYNC_PUT_BUCKET_TYPE, Node]), + %% + %% Clear the intercept counters + %% + true = rpc:call(Node, ets, insert, [intercepts_tab, {w1c_async_replies, 0}]), + true = rpc:call(Node, ets, insert, [intercepts_tab, {w1c_sync_replies, 0}]), + + ok = verify_put(Node, ?ASYNC_PUT_BUCKET, <<"confirm_async_put_key">>, <<"confirm_async_put_value">>), + %% + %% verify that we have handled 3 asynchronous writes and 0 synchronous writes + %% + [{_, W1CAsyncReplies}] = rpc:call(Node, ets, lookup, [intercepts_tab, w1c_async_replies]), + [{_, W1CSyncReplies}] = rpc:call(Node, ets, lookup, [intercepts_tab, w1c_sync_replies]), + ?assertEqual(0, W1CSyncReplies), + ?assertEqual(3, W1CAsyncReplies), + %% + %% reconfigure the node to force use of synchronous writes with leveldb + %% + rt:update_app_config(Node, [{riak_kv, [{allow_async_put, false}]}]), + rt:start(Node), + %% + %% Set up the intercepts on the singleton node in cluster2 + %% + make_intercepts_tab(Node), + rt_intercept:add( + Node, {riak_kv_vnode, [ + %% Count everytime riak_kv_vnode:handle_handoff_command/3 is called with a write_once message + {{handle_command, 3}, count_w1c_handle_command} + ]} + ), + %% + %% Clear the intercept counters + %% + true = rpc:call(Node, ets, insert, [intercepts_tab, {w1c_async_replies, 0}]), + true = rpc:call(Node, ets, insert, [intercepts_tab, {w1c_sync_replies, 0}]), + + ok = verify_put(Node, ?ASYNC_PUT_BUCKET, <<"confirm_async_put_key">>, <<"confirm_async_put_value">>), + %% + %% verify that we have handled 0 asynchronous writes and 3 synchronous writes, instead + %% + [{_, W1CAsyncReplies2}] = rpc:call(Node, ets, lookup, [intercepts_tab, w1c_async_replies]), + [{_, W1CSyncReplies2}] = rpc:call(Node, ets, lookup, [intercepts_tab, w1c_sync_replies]), + ?assertEqual(3, W1CSyncReplies2), + ?assertEqual(0, W1CAsyncReplies2), + %% + %% done! + %% + lager:info("confirm_async_put...ok"), + pass. + +verify_put(Node, Bucket, Key, Value) -> + verify_put(Node, Bucket, Key, Value, [], Value). + +verify_put(Node, Bucket, Key, Value, Options, ExpectedValue) -> + Client = rt:pbc(Node), + _Ret = riakc_pb_socket:put( + Client, riakc_obj:new( + Bucket, Key, Value + ), + Options + ), + {ok, Val} = riakc_pb_socket:get(Client, Bucket, Key), + ?assertEqual(ExpectedValue, riakc_obj:get_value(Val)), + ok. + +verify_failed_put(Node, Bucket, Key, Value, Options, ExpectedPutReturnFunc) -> + Client = rt:pbc(Node), + {error, PutReturnValue} = riakc_pb_socket:put( + Client, riakc_obj:new( + Bucket, Key, Value + ), + Options + ), + ExpectedPutReturnFunc(parse(PutReturnValue)), + ok. + + +verify_put_timeout(Node, Bucket, Key, Value, Options, Timeout, ExpectedPutReturnFunc) -> + Client = rt:pbc(Node), + {Time, {error, Val}} = timer:tc( + fun() -> + riakc_pb_socket:put( + Client, riakc_obj:new( + Bucket, Key, Value + ), [{timeout, Timeout} | Options] + ) + end + ), + ExpectedPutReturnFunc(parse(Val)), + ?assert(Time div 1000000 =< 2*Timeout), + ok. + +num_fast_merges(Nodes) -> + lists:foldl( + fun(Node, Acc) -> + {write_once_merge, N} = proplists:lookup( + write_once_merge, + rpc:call(Node, riak_kv_stat, get_stats, []) + ), + Acc + N + end, + 0, Nodes + ). + +get(Node, Bucket, Key) -> + Client = rt:pbc(Node), + {ok, Val} = riakc_pb_socket:get(Client, Bucket, Key), + riakc_obj:get_value(Val). + +config(RingSize, NVal) -> + config(RingSize, NVal, riak_kv_multi_backend). + +config(RingSize, NVal, Backend) -> + [ + {riak_core, [ + {default_bucket_props, [{n_val, NVal}]}, + {vnode_management_timer, 1000}, + {ring_creation_size, RingSize}] + }, + {riak_kv, [ + {anti_entropy_build_limit, {100, 1000}}, + {anti_entropy_concurrency, 100}, + {anti_entropy_tick, 100}, + {anti_entropy, {on, []}}, + {anti_entropy_timeout, 5000}, + {storage_backend, Backend}, + {multi_backend, [ + {mymemory, riak_kv_memory_backend, []}, + {myeleveldb, riak_kv_eleveldb_backend, []} + ]} + ]} + ]. + +parse(Binary) -> + {ok, Tokens, _} = erl_scan:string(binary_to_list(Binary) ++ "."), + {ok, Term} = erl_parse:parse_term(Tokens), + Term. + +make_intercepts_tab(Node) -> + SupPid = rpc:call(Node, erlang, whereis, [sasl_safe_sup]), + intercepts_tab = rpc:call(Node, ets, new, [intercepts_tab, [named_table, + public, set, {heir, SupPid, {}}]]). diff --git a/tests/yz_crdt.erl b/tests/yz_crdt.erl index 1a103815e..44e37f01a 100644 --- a/tests/yz_crdt.erl +++ b/tests/yz_crdt.erl @@ -59,59 +59,67 @@ confirm() -> ?KEY, riakc_map:to_op(Map2)), + yokozuna_rt:drain_solrqs(Nodes), yokozuna_rt:commit(Nodes, ?INDEX), - %% Perform simple queries, check for register, set fields. - {ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search( + ok = rt:wait_until( + fun() -> + validate_search_results(Pid) + end), + %% Stop PB connection. + riakc_pb_socket:stop(Pid), + + pass. + +validate_search_results(Pid) -> + try + {ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search( Pid, ?INDEX, <<"name_register:Chris*">>), - lager:info("Search name_register:Chris*: ~p~n", [Results1a]), - ?assertEqual(length(Results1a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)), - <<"thing">>), - - {ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search( + lager:info("Search name_register:Chris*: ~p~n", [Results1a]), + ?assertEqual(length(Results1a), 1), + ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)), + list_to_binary(?KEY)), + ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)), + <<"thing">>), + + {ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search( Pid, ?INDEX, <<"interests_set:thing*">>), - lager:info("Search interests_set:thing*: ~p~n", [Results2a]), - ?assertEqual(length(Results2a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)), - <<"thing">>), - - {ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search( + lager:info("Search interests_set:thing*: ~p~n", [Results2a]), + ?assertEqual(length(Results2a), 1), + ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)), + list_to_binary(?KEY)), + ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)), + <<"thing">>), + + {ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search( Pid, ?INDEX, <<"_yz_rb:testbucket">>), - lager:info("Search testbucket: ~p~n", [Results3a]), - ?assertEqual(length(Results3a), 1), - ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)), - list_to_binary(?KEY)), - ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)), - <<"thing">>), - - %% Redo queries and check if results are equal - {ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search( + lager:info("Search testbucket: ~p~n", [Results3a]), + ?assertEqual(length(Results3a), 1), + ?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)), + list_to_binary(?KEY)), + ?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)), + <<"thing">>), + + %% Redo queries and check if results are equal + {ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search( Pid, ?INDEX, <<"name_register:Chris*">>), - ?assertEqual(number_of_fields(Results1a), - number_of_fields(Results1b)), + ?assertEqual(number_of_fields(Results1a), + number_of_fields(Results1b)), - {ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search( + {ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search( Pid, ?INDEX, <<"interests_set:thing*">>), - ?assertEqual(number_of_fields(Results2a), - number_of_fields(Results2b)), + ?assertEqual(number_of_fields(Results2a), + number_of_fields(Results2b)), - {ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search( + {ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search( Pid, ?INDEX, <<"_yz_rb:testbucket">>), ?assertEqual(number_of_fields(Results3a), number_of_fields(Results3b)), - - %% Stop PB connection. - riakc_pb_socket:stop(Pid), - - %% Clean cluster. - rt:clean_cluster(Nodes), - - pass. + true + catch Err:Reason -> + lager:info("Waiting for CRDT search results to converge. Error was ~p.", [{Err, Reason}]), + false + end. %% @private number_of_fields(Resp) -> diff --git a/tests/yz_extractors.erl b/tests/yz_extractors.erl index d21f0b699..0a03f516f 100644 --- a/tests/yz_extractors.erl +++ b/tests/yz_extractors.erl @@ -401,7 +401,7 @@ test_bad_extraction(Cluster) -> %% Verify the stats. There should be one more index failure, %% but there should be more more "melts" (error threshold failures) %% - yz_rt:wait_until( + yokozuna_rt:wait_until( Cluster, fun(_Node) -> check_error_stats(Cluster, PreviousFailCount, PreviousErrorThresholdCount)