Skip to content

Commit a07018c

Browse files
committed
Merge pull request #618 from basho/feature/csm/cancel-fullsync
Ensure we can cancel a fullsync and restart.
2 parents 19ae3de + 6cd649d commit a07018c

File tree

3 files changed

+166
-1
lines changed

3 files changed

+166
-1
lines changed

src/rt.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1471,10 +1471,11 @@ log_to_nodes(Nodes, Fmt) ->
14711471
%% @doc Log a message to the console of the specified test nodes.
14721472
%% Messages are prefixed by the string "---riak_test--- "
14731473
%% Uses lager:info/2 'LFmt' and 'LArgs' semantics
1474-
log_to_nodes(Nodes, LFmt, LArgs) ->
1474+
log_to_nodes(Nodes0, LFmt, LArgs) ->
14751475
%% This logs to a node's info level, but if riak_test is running
14761476
%% at debug level, we want to know when we send this and what
14771477
%% we're saying
1478+
Nodes = lists:flatten(Nodes0),
14781479
lager:debug("log_to_nodes: " ++ LFmt, LArgs),
14791480
Module = lager,
14801481
Function = log,

tests/repl_cancel_fullsync.erl

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
-module(repl_cancel_fullsync).
2+
-behavior(riak_test).
3+
-export([confirm/0]).
4+
-include_lib("eunit/include/eunit.hrl").
5+
6+
-define(TEST_BUCKET,
7+
<<"repl-cancel-fullsync-failures-systest_a">>).
8+
-define(NUM_KEYS, 1000).
9+
10+
-define(CONF(Retries), [
11+
{riak_core,
12+
[
13+
{ring_creation_size, 8},
14+
{default_bucket_props, [{n_val, 1}]}
15+
]
16+
},
17+
{riak_kv,
18+
[
19+
%% Specify fast building of AAE trees
20+
{anti_entropy, {on, []}},
21+
{anti_entropy_build_limit, {100, 1000}},
22+
{anti_entropy_concurrency, 100}
23+
]
24+
},
25+
{riak_repl,
26+
[
27+
{fullsync_strategy, keylist},
28+
{fullsync_on_connect, false},
29+
{fullsync_interval, disabled},
30+
{max_fssource_retries, Retries}
31+
]}
32+
]).
33+
34+
%% @doc Ensure we can cancel a fullsync and restart it.
35+
confirm() ->
36+
rt:set_advanced_conf(all, ?CONF(5)),
37+
38+
Nodes = [ANodes, BNodes] = rt:build_clusters([3, 3]),
39+
40+
lager:info("ANodes: ~p", [ANodes]),
41+
lager:info("BNodes: ~p", [BNodes]),
42+
43+
AFirst = hd(ANodes),
44+
BFirst = hd(BNodes),
45+
46+
lager:info("Naming clusters."),
47+
repl_util:name_cluster(AFirst, "A"),
48+
repl_util:name_cluster(BFirst, "B"),
49+
50+
lager:info("Waiting for convergence."),
51+
rt:wait_until_ring_converged(ANodes),
52+
rt:wait_until_ring_converged(BNodes),
53+
54+
lager:info("Waiting for transfers to complete."),
55+
rt:wait_until_transfers_complete(ANodes),
56+
rt:wait_until_transfers_complete(BNodes),
57+
58+
lager:info("Get leaders."),
59+
LeaderA = repl_util:get_leader(AFirst),
60+
LeaderB = repl_util:get_leader(BFirst),
61+
62+
lager:info("Finding connection manager ports."),
63+
BPort = repl_util:get_port(LeaderB),
64+
65+
lager:info("Connecting cluster A to B"),
66+
repl_util:connect_cluster_by_name(LeaderA, BPort, "B"),
67+
68+
repl_util:write_to_cluster(AFirst, 1, ?NUM_KEYS, ?TEST_BUCKET),
69+
70+
repl_util:read_from_cluster(BFirst, 1, ?NUM_KEYS, ?TEST_BUCKET,
71+
?NUM_KEYS),
72+
73+
lager:info("Test fullsync from cluster A leader ~p to cluster B",
74+
[LeaderA]),
75+
repl_util:enable_fullsync(LeaderA, "B"),
76+
rt:wait_until_ring_converged(ANodes),
77+
78+
lager:info("Starting fullsync."),
79+
rt:log_to_nodes(Nodes, "Starting fullsync."),
80+
R1 = rpc:call(LeaderA, riak_repl_console, fullsync, [["start"]]),
81+
?assertEqual(ok, R1),
82+
repl_util:wait_until_fullsync_started(LeaderA),
83+
lager:info("Fullsync running."),
84+
85+
%% Get all active keylist server pids
86+
Coordinators = [Pid || {"B", Pid} <-
87+
riak_repl2_fscoordinator_sup:started(LeaderA)],
88+
States = [sys:get_state(P) || P <- Coordinators],
89+
KeylistPids = lists:flatten([element(14, State) || State <- States]),
90+
KLStates = [sys:get_state(Pid) || {Pid, _} <- KeylistPids],
91+
[?assertEqual(state, element(1, State)) || State <- KLStates],
92+
93+
lager:info("Stopping fullsync."),
94+
rt:log_to_nodes(Nodes, "Stopping fullsync."),
95+
R2 = rpc:call(LeaderA, riak_repl_console, fullsync, [["stop"]]),
96+
?assertEqual(ok, R2),
97+
repl_util:wait_until_fullsync_stopped(LeaderA),
98+
lager:info("Fullsync stopped."),
99+
100+
%% Give keylist pids time to stop
101+
timer:sleep(500),
102+
%% Ensure keylist pids are actually gone
103+
Exits = [catch sys:get_state(Pid) || {Pid, _} <- KeylistPids],
104+
[?assertMatch({'EXIT', _}, Exit) || Exit <- Exits],
105+
106+
[{"B", S1}] = rpc:call(LeaderA, riak_repl2_fscoordinator, status, []),
107+
?assertEqual(true, lists:member({fullsyncs_completed, 0}, S1)),
108+
lager:info("Fullsync not completed."),
109+
110+
[{"B", S2}] = rpc:call(LeaderA, riak_repl2_fscoordinator, status, []),
111+
?assertEqual(true, lists:member({in_progress, 0}, S2)),
112+
lager:info("** ~p", [S2]),
113+
114+
lager:info("Starting fullsync."),
115+
rt:log_to_nodes(Nodes, "Starting fullsync."),
116+
R3 = rpc:call(LeaderA, riak_repl_console, fullsync, [["start"]]),
117+
?assertEqual(ok, R3),
118+
repl_util:wait_until_fullsync_started(LeaderA),
119+
lager:info("Fullsync running again."),
120+
121+
Res = rt:wait_until(LeaderA,
122+
fun(_) ->
123+
Status = rpc:call(LeaderA,
124+
riak_repl_console,
125+
status,
126+
[quiet]),
127+
case proplists:get_value(server_fullsyncs, Status) of
128+
1 ->
129+
true;
130+
_ ->
131+
false
132+
end
133+
end),
134+
?assertEqual(ok, Res),
135+
repl_util:read_from_cluster(BFirst, 1, ?NUM_KEYS, ?TEST_BUCKET, 0),
136+
[{"B", S3}] = rpc:call(LeaderA, riak_repl2_fscoordinator, status, []),
137+
?assertEqual(true, lists:member({fullsyncs_completed, 1}, S3)),
138+
lager:info("Fullsync Complete"),
139+
140+
rt:log_to_nodes(Nodes, "Test completed."),
141+
rt:clean_cluster(ANodes),
142+
rt:clean_cluster(BNodes),
143+
144+
pass.

tests/repl_util.erl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
wait_until_connection/1,
1414
wait_until_no_connection/1,
1515
wait_for_reads/5,
16+
wait_until_fullsync_started/1,
17+
wait_until_fullsync_stopped/1,
1618
start_and_wait_until_fullsync_complete/1,
1719
start_and_wait_until_fullsync_complete/2,
1820
start_and_wait_until_fullsync_complete/3,
@@ -168,6 +170,24 @@ wait_until_no_connection(Node) ->
168170
end
169171
end). %% 40 seconds is enough for repl
170172

173+
wait_until_fullsync_started(SourceLeader) ->
174+
rt:wait_until(fun() ->
175+
lager:info("Waiting for fullsync to start"),
176+
Coordinators = [Pid || {"B", Pid} <-
177+
riak_repl2_fscoordinator_sup:started(SourceLeader)],
178+
lists:any(fun riak_repl2_fscoordinator:is_running/1,
179+
Coordinators)
180+
end).
181+
182+
wait_until_fullsync_stopped(SourceLeader) ->
183+
rt:wait_until(fun() ->
184+
lager:info("Waiting for fullsync to stop"),
185+
Coordinators = [Pid || {"B", Pid} <-
186+
riak_repl2_fscoordinator_sup:started(SourceLeader)],
187+
not lists:any(fun riak_repl2_fscoordinator:is_running/1,
188+
Coordinators)
189+
end).
190+
171191
wait_for_reads(Node, Start, End, Bucket, R) ->
172192
rt:wait_until(Node,
173193
fun(_) ->

0 commit comments

Comments
 (0)