Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 134 additions & 9 deletions tests/verify_aaefold_nval_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

% I would hope this would come from the testing framework some day
% to use the test in small and large scenarios.
-define(DEFAULT_RING_SIZE, 8).
-define(REBUILD_TICK, 30 * 1000).
-define(DEFAULT_RING_SIZE, 32).
-define(REBUILD_TICK, 20 * 1000).
-define(CFG_NOREBUILD,
[{riak_kv,
[
Expand Down Expand Up @@ -67,7 +67,7 @@
{ring_creation_size, ?DEFAULT_RING_SIZE}
]}]
).
-define(NUM_NODES, 3).
-define(NUM_NODES, 4).
-define(NUM_KEYS_PERNODE, 10000).
-define(BUCKET, <<"test_bucket">>).
-define(N_VAL, 3).
Expand All @@ -78,7 +78,7 @@ confirm() ->
Nodes0 = rt:build_cluster(?NUM_NODES, ?CFG_NOREBUILD),
ClientHeadHTTP = rt:httpc(hd(Nodes0)),
ClientTailHTTP = rt:httpc(lists:last(Nodes0)),
ok = verify_aae_fold(Nodes0, rhc, ClientHeadHTTP, ClientTailHTTP),
ok = verify_aae_fold(Nodes0, rhc, ClientHeadHTTP, ClientTailHTTP, false),

rt:clean_cluster(Nodes0),

Expand All @@ -88,12 +88,15 @@ confirm() ->
timer:sleep(2 * ?REBUILD_TICK),
ClientHeadPB = rt:pbc(hd(Nodes1)),
ClientTailPB = rt:pbc(lists:last(Nodes1)),
ok = verify_aae_fold(Nodes1, riakc_pb_socket, ClientHeadPB, ClientTailPB),
ok = verify_aae_fold(Nodes1, riakc_pb_socket, ClientHeadPB, ClientTailPB, true),

pass.


verify_aae_fold(Nodes, Mod, CH, CT) ->
verify_aae_fold(Nodes, Mod, CH, CT, Rebuild) ->

TestStart = os:timestamp(),
timer:sleep(1000),

lager:info("Fold for empty root"),
{ok, {root, RH0}} = Mod:aae_merge_root(CH, ?N_VAL),
Expand All @@ -109,7 +112,7 @@ verify_aae_fold(Nodes, Mod, CH, CT) ->
KeyCount + ?NUM_KEYS_PERNODE
end,

lists:foldl(KeyLoadFun, 1, Nodes),
lists:foldl(KeyLoadFun, 0, Nodes),
lager:info("Loaded ~w objects", [?NUM_KEYS_PERNODE * length(Nodes)]),
wait_until_root_stable(Mod, CH),
wait_until_root_stable(Mod, CT),
Expand Down Expand Up @@ -190,9 +193,109 @@ verify_aae_fold(Nodes, Mod, CH, CT) ->
?assertMatch(true, lists:sort(KCL1) == lists:sort(KCL2)),

% Need to re-start or clean will fail
rt:start_and_wait(hd(tl(Nodes))).
rt:start_and_wait(hd(tl(Nodes))),

timer:sleep(1000),
TestStage1 = os:timestamp(),
timer:sleep(1000),

Stage1Range = convert_to_modified_range(TestStart, TestStage1),

case Rebuild of
true ->
lager:info("Test fold many times"),
lager:info("Maybe catch race with rebuild"),
lists:foreach(
fun(_I) ->
check_all_ranges(Mod, CH, Stage1Range)
end,
lists:seq(1, 20)
);
false ->
check_all_ranges(Mod, CH, Stage1Range)
end,

lager:info("Regressing object version to 0 on head Node"),
ok =
rpc:call(
hd(Nodes),
riak_core_capability,
register,
[{riak_kv, object_format}, [v0], v0]),
wait_until_object_format(Nodes, v0),

lists:foldl(KeyLoadFun, ?NUM_NODES * ?NUM_KEYS_PERNODE, Nodes),

timer:sleep(1000),
TestStage2 = os:timestamp(),
timer:sleep(1000),

case Rebuild of
true ->
lager:info("Test fold many times"),
lager:info("Maybe catch race with rebuild"),
lists:foreach(
fun(_I) ->
check_all_ranges(Mod, CH, Stage1Range)
end,
lists:seq(1, 20)
);
false ->
check_all_ranges(Mod, CH, Stage1Range)
end,

lager:info("Reverting object version to 1 on head Node"),
ok =
rpc:call(
hd(Nodes),
riak_core_capability,
register,
[{riak_kv, object_format}, [v1, v0], v0]),
wait_until_object_format(Nodes, v1),

lists:foldl(KeyLoadFun, ?NUM_NODES * ?NUM_KEYS_PERNODE * 2, Nodes),

timer:sleep(1000),
TestStage3 = os:timestamp(),
timer:sleep(1000),

Stage2Range = convert_to_modified_range(TestStage1, TestStage2),
Stage3Range = convert_to_modified_range(TestStage2, TestStage3),

{ok, {keysclocks, KCL6}} =
Mod:aae_range_clocks(CH, ?BUCKET, all, all, Stage2Range),
lager:info(
"Fetched ~w keys loaded in stage 2",
[length(KCL6)]
),
?assert(length(KCL6) == ?NUM_NODES * ?NUM_KEYS_PERNODE),
{ok, {keysclocks, KCL7}} =
Mod:aae_range_clocks(CH, ?BUCKET, all, all, Stage3Range),
lager:info(
"Fetched ~w keys loaded in stage 3",
[length(KCL7)]
),
?assert(length(KCL7) == ?NUM_NODES * ?NUM_KEYS_PERNODE),

case Rebuild of
true ->
lager:info("Test fold many times"),
lager:info("Maybe catch race with rebuild"),
lists:foreach(
fun(_I) ->
check_all_ranges(Mod, CH, Stage1Range)
end,
lists:seq(1, 20)
);
false ->
check_all_ranges(Mod, CH, Stage1Range)
end,

ok.

convert_to_modified_range({StartMega, StartS, _}, {EndMega, EndS, _}) ->
{StartMega * 1000000 + StartS, EndMega * 1000000 + EndS}.

to_key(N) ->
list_to_binary(io_lib:format("K~6..0B", [N])).

Expand Down Expand Up @@ -233,4 +336,26 @@ wait_until_root_stable(Mod, Client) ->
lager:info("Root not stable: branch ~w compares ~w with ~w",
[L, V0, V1]),
wait_until_root_stable(Mod, Client)
end.
end.

wait_until_object_format(Nodes, Format) ->
CheckNodeFun =
fun(N) ->
fun() ->
Format ==
rpc:call(
N,
riak_core_capability,
get,
[{riak_kv, object_format}, v0])
end
end,
lists:foreach(fun(N) -> rt:wait_until(CheckNodeFun(N)) end, Nodes).

check_all_ranges(Mod, CH, Stage1Range) ->
{ok, {keysclocks, KCL5}} =
Mod:aae_range_clocks(CH, ?BUCKET, all, all, Stage1Range),
lager:info(
"Fetched ~w keys loaded in stage 1",
[length(KCL5)]
).