Skip to content

Commit be90d04

Browse files
committed
storage: introduce on_master_enable service
Before this patch the `rebalancer` and `recovery` service could start just right after master switch (by `auto` master detection or manual reconfiguration) before the master had time to sync its vclock with other replicas in replicaset. It could lead to doubled buckets according to "Doubled buckets RFC". To fix it we introduce a new storage service - `on_master_enable` service. If master is changed in replicaset, this service is triggered and waits until newly elected master syncs its vclock with other replicas. Other storage services - `rebalancer` and `recovery` can't start until `on_master_enable` set `M.buckets_are_in_sync`. Part of #214 NO_TEST=bugfix NO_DOC=bugfix
1 parent 39aada8 commit be90d04

File tree

7 files changed

+483
-14
lines changed

7 files changed

+483
-14
lines changed

test/storage-luatest/log_verbosity_2_2_test.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ test_group.before_all(function(g)
4242
end)
4343

4444
test_group.after_all(function(g)
45-
g.cluster:drop()
45+
g.cluster:stop()
4646
end)
4747

4848
test_group.test_recovery_do_not_spam_same_errors = function(g)
@@ -119,7 +119,7 @@ test_group.test_rebalancer_do_not_spam_same_errors = function(g)
119119
end
120120
end)
121121
local msg = "Error during downloading rebalancer states"
122-
g.replica_1_a:wait_log_exactly_once(msg, {timeout = 0.1,
122+
g.replica_1_a:wait_log_exactly_once(msg, {timeout = 1,
123123
on_yield = function() ivshard.storage.rebalancer_wakeup() end})
124124
g.replica_2_a:exec(function()
125125
ivshard.storage.rebalancer_request_state = _G.old_call
Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
local t = require('luatest')
2+
local vtest = require('test.luatest_helpers.vtest')
3+
local vutil = require('vshard.util')
4+
5+
local test_group = t.group('storage')
6+
7+
local cfg_template = {
8+
sharding = {
9+
{
10+
replicas = {
11+
replica_1_a = {
12+
master = true
13+
},
14+
replica_1_b = {},
15+
},
16+
},
17+
{
18+
replicas = {
19+
replica_2_a = {
20+
master = true
21+
},
22+
replica_2_b = {},
23+
},
24+
},
25+
},
26+
bucket_count = 20,
27+
replication_timeout = 0.1,
28+
}
29+
local global_cfg
30+
31+
test_group.before_all(function(g)
32+
t.run_only_if(vutil.version_is_at_least(2, 11, 0, nil, 0, 0))
33+
global_cfg = vtest.config_new(cfg_template)
34+
vtest.cluster_new(g, global_cfg)
35+
vtest.cluster_bootstrap(g, global_cfg)
36+
vtest.cluster_wait_vclock_all(g)
37+
vtest.cluster_rebalancer_disable(g)
38+
vtest.cluster_exec_each(g, function()
39+
ivconst.MASTER_SEARCH_WORK_INTERVAL = ivtest.busy_step
40+
end)
41+
42+
vtest.cluster_exec_each_master(g, function()
43+
local s = box.schema.space.create('test', {
44+
format = {
45+
{'id', 'unsigned'},
46+
{'bucket_id', 'unsigned'},
47+
},
48+
})
49+
s:create_index('id', {parts = {'id'}})
50+
s:create_index('bucket_id', {parts = {'bucket_id'}, unique = false})
51+
end)
52+
end)
53+
54+
test_group.after_each(function(g)
55+
vtest.cluster_exec_each_master(g, function()
56+
box.space.test:truncate()
57+
end)
58+
end)
59+
60+
test_group.after_all(function(g)
61+
g.cluster:stop()
62+
end)
63+
64+
local function move_first_n_buckets(src_storage, dest_storage, n)
65+
for _ = 1, n do
66+
local moved_bucket = vtest.storage_first_bucket(src_storage)
67+
vtest.bucket_move(src_storage, dest_storage, moved_bucket)
68+
end
69+
end
70+
71+
local function unsync_replicaset_and_switch_master(old_master, new_master)
72+
-- We should change the replication_synchro_quorum on old_master so that
73+
-- it can apply a transaction without a confirmation of another replica.
74+
-- Also we break the replication on replica (new_master in future) so that
75+
-- it can't get a vclock from old_master, because we must have an unsynced
76+
-- replicaset after master switch.
77+
old_master:exec(function() box.cfg{replication_synchro_quorum = 1} end)
78+
new_master:exec(function()
79+
box.cfg{replication = {}}
80+
t.helpers.retrying({}, function()
81+
for _, conn in pairs(box.info.replication) do
82+
t.assert_not(conn.upstream)
83+
end
84+
end)
85+
end)
86+
local old_master_vclock = old_master:exec(function()
87+
local tuple = {1, _G.get_first_bucket()}
88+
box.space.test:insert(tuple)
89+
t.assert_equals(box.space.test:select(), {tuple})
90+
return box.info.vclock
91+
end)
92+
-- Change masters and check that their vclocks are not equal.
93+
old_master:exec(function() box.cfg{read_only = true} end)
94+
new_master:exec(function(old_master_vclock)
95+
local vclock_le = function(vc1, vc2)
96+
local le = true
97+
for i, lsn in ipairs(vc1) do
98+
if i == 0 then
99+
-- Skip local component.
100+
goto continue
101+
end
102+
le = le and lsn <= (vc2[i] or 0)
103+
if not le then
104+
break
105+
end
106+
::continue::
107+
end
108+
return le
109+
end
110+
t.helpers.retrying({}, function()
111+
-- In some rare cases the downstream vclock of another instance may
112+
-- not be updated in time. It can lead to a situation when we will
113+
-- try to compare the same old_vclock of destination node with
114+
-- current vclock. It can hinder us to test on_master_enable.
115+
for _, replica in ipairs(box.info.replication) do
116+
local downstream = replica.downstream
117+
if downstream and downstream.vclock then
118+
t.assert(vclock_le(old_master_vclock, downstream.vclock))
119+
end
120+
end
121+
t.assert_not_equals(old_master_vclock, box.info.vclock)
122+
t.assert_equals(box.space.test:select(), {})
123+
end)
124+
box.cfg{read_only = false}
125+
end, {old_master_vclock})
126+
end
127+
128+
local function reset_replicaset_after_master_switch(new_master, old_master)
129+
new_master:exec(function() box.cfg{read_only = true} end)
130+
old_master:exec(function() box.cfg{read_only = false,
131+
replication_synchro_quorum = 2} end)
132+
vtest.storage_wait_pairsync(old_master, new_master)
133+
end
134+
135+
local function cfg_swap_master_of_replicaset(g, rs_num)
136+
-- This function is applicable only for cluster with 'manual' master mode.
137+
-- Otherwise it can be dangerous to invoke it.
138+
local new_cfg_template = table.deepcopy(cfg_template)
139+
local replicas = new_cfg_template.sharding[rs_num].replicas
140+
local replica_a = replicas[string.format('replica_%s_a', rs_num)]
141+
local replica_b = replicas[string.format('replica_%s_b', rs_num)]
142+
replica_a.master, replica_b.master = not replica_a.master, replica_a.master
143+
-- If we reconfigure the vshard cluster without 'manual' box_cfg_mode,
144+
-- the nodes in replicasets will be reconfigured and as a consequence
145+
-- the replication will be restored. This is unacceptable for us, because
146+
-- after the vshard config changes we must have the previous topology in
147+
-- replicaset in order to test the behavious of new unsynced master.
148+
new_cfg_template.box_cfg_mode = 'manual'
149+
local new_global_cfg = vtest.config_new(new_cfg_template)
150+
vtest.cluster_cfg(g, new_global_cfg)
151+
end
152+
153+
local function cfg_make_cluster_auto_master(g)
154+
local new_cfg_template = table.deepcopy(cfg_template)
155+
for i, rs in ipairs(new_cfg_template.sharding) do
156+
rs.master = 'auto'
157+
local replica_a = rs.replicas[string.format('replica_%s_a', i)]
158+
replica_a.read_only = false
159+
replica_a.master = nil
160+
local replica_b = rs.replicas[string.format('replica_%s_b', i)]
161+
replica_b.read_only = true
162+
end
163+
local new_global_cfg = vtest.config_new(new_cfg_template)
164+
vtest.cluster_cfg(g, new_global_cfg)
165+
end
166+
167+
local function minimize_rebalancer_intervals(server)
168+
-- We should lower the rebalancer idle and work intervals in order to
169+
-- not hang on long waiting of rebalancer's readiness after enabling
170+
-- of on_master_enable service or after sudden wakeup.
171+
server:exec(function()
172+
local consts = require('vshard.consts')
173+
rawset(_G, 'old_rebalancer_idle_interval',
174+
consts.REBALANCER_IDLE_INTERVAL)
175+
rawset(_G, 'old_rebalancer_work_interval',
176+
consts.REBALANCER_WORK_INTERVAL)
177+
consts.REBALANCER_IDLE_INTERVAL = 0.01
178+
consts.REBALANCER_WORK_INTERVAL = 0.01
179+
end)
180+
end
181+
182+
local function reset_rebalancer_intervals(server)
183+
server:exec(function()
184+
local consts = require('vshard.consts')
185+
consts.REBALANCER_IDLE_INTERVAL = _G.old_rebalancer_idle_interval
186+
consts.REBALANCER_WORK_INTERVAL = _G.old_rebalancer_work_interval
187+
end)
188+
end
189+
190+
local REBALANCER_BAD_START = 'Error during the start of rabalancing:.*' ..
191+
'Newly elected master hasn\'t synchronized'
192+
193+
local REBALANCER_SENDS_ROUTES = 'The following rebalancer routes were sent'
194+
195+
local REBALANCER_RECEIVES_ROUTES = 'Apply rebalancer routes with'
196+
197+
local CLUSTER_IS_BALANSED = 'The cluster is balanced ok'
198+
199+
local NEW_MASTER_SYNC = 'New master has synchronized with other replicas'
200+
201+
local function test_rebalancer_sending_until_master_sync(g, old_sender, sender,
202+
receiver)
203+
vtest.cluster_rebalancer_enable(g)
204+
t.helpers.retrying({}, function()
205+
t.assert(sender:grep_log(REBALANCER_BAD_START))
206+
t.assert_not(receiver:grep_log(REBALANCER_RECEIVES_ROUTES))
207+
end)
208+
sender:exec(function(replication)
209+
box.cfg{replication = replication}
210+
end, {old_sender.net_box_uri, sender.net_box_uri})
211+
vtest.storage_wait_pairsync(old_sender, sender)
212+
t.helpers.retrying({}, function()
213+
t.assert(sender:grep_log(NEW_MASTER_SYNC))
214+
sender:exec(function() ivshard.storage.rebalancer_wakeup() end)
215+
t.assert(receiver:grep_log(REBALANCER_RECEIVES_ROUTES))
216+
t.assert(sender:grep_log(REBALANCER_SENDS_ROUTES))
217+
t.assert(sender:grep_log(CLUSTER_IS_BALANSED))
218+
end)
219+
reset_replicaset_after_master_switch(sender, old_sender)
220+
vtest.cluster_rebalancer_disable(g)
221+
end
222+
223+
local function test_rebalancer_receiving_until_master_sync(g, old_receiver,
224+
receiver, sender)
225+
vtest.cluster_rebalancer_enable(g)
226+
t.helpers.retrying({}, function()
227+
t.assert(receiver:grep_log(REBALANCER_BAD_START))
228+
t.assert(sender:grep_log(REBALANCER_SENDS_ROUTES))
229+
end)
230+
receiver:exec(function(replication)
231+
box.cfg{replication = replication}
232+
end, {old_receiver.net_box_uri, receiver.net_box_uri})
233+
vtest.storage_wait_pairsync(old_receiver, receiver)
234+
t.helpers.retrying({}, function()
235+
t.assert(receiver:grep_log(NEW_MASTER_SYNC))
236+
sender:exec(function() ivshard.storage.rebalancer_wakeup() end)
237+
t.assert(sender:grep_log(REBALANCER_SENDS_ROUTES))
238+
t.assert(receiver:grep_log(REBALANCER_RECEIVES_ROUTES))
239+
t.assert(sender:grep_log(CLUSTER_IS_BALANSED))
240+
end)
241+
reset_replicaset_after_master_switch(receiver, old_receiver)
242+
vtest.cluster_rebalancer_disable(g)
243+
end
244+
245+
local function test_recovery_is_nop_until_master_sync(old_sender, sender,
246+
receiver)
247+
-- We should set the errinj.ERRINJ_RECEIVE_PARTIALLY on the destination
248+
-- node in order to test the work of recovery service on unsynced and
249+
-- synced newly elected master.
250+
receiver:exec(function()
251+
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true
252+
end)
253+
local bucket_id = vtest.storage_first_bucket(sender)
254+
local rs_uuid = receiver:replicaset_uuid()
255+
sender:exec(function(bucket_id, rs_uuid)
256+
ivshard.storage.bucket_send(bucket_id, rs_uuid)
257+
t.assert_equals(box.space._bucket:get{bucket_id}.status, 'sending')
258+
end, {bucket_id, rs_uuid})
259+
receiver:exec(function() ivshard.storage.recovery_wakeup() end)
260+
-- The recovery service can't start until the on_master_enable service
261+
-- finishes its work (syncs new master).
262+
sender:exec(function(bucket_id)
263+
ivshard.storage.recovery_wakeup()
264+
t.helpers.retrying({}, function()
265+
t.assert_equals(box.space._bucket:get{bucket_id}.status, 'sending')
266+
end)
267+
end, {bucket_id})
268+
t.helpers.retrying({}, function()
269+
t.assert(sender:grep_log('Error during the start of recovery:.*' ..
270+
'Newly elected master hasn\'t synchronized'))
271+
end)
272+
sender:exec(function(replication)
273+
box.cfg{replication = replication}
274+
end, {old_sender.net_box_uri, sender.net_box_uri})
275+
vtest.storage_wait_pairsync(old_sender, sender)
276+
t.helpers.retrying({}, function()
277+
t.assert(sender:grep_log(NEW_MASTER_SYNC))
278+
end)
279+
sender:exec(function(bucket_id)
280+
ivshard.storage.recovery_wakeup()
281+
t.helpers.retrying({}, function()
282+
t.assert_equals(box.space._bucket:get{bucket_id}.status, 'active')
283+
end)
284+
end, {bucket_id})
285+
t.assert(sender:grep_log('Finish bucket recovery step, 1 ' ..
286+
'sending buckets are recovered among 1'))
287+
reset_replicaset_after_master_switch(sender, old_sender)
288+
receiver:exec(function()
289+
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false
290+
end)
291+
end
292+
293+
test_group.test_rebalancer_sending_with_manual_master_switch = function(g)
294+
minimize_rebalancer_intervals(g.replica_1_b)
295+
local old_sender, sender, receiver = g.replica_1_a, g.replica_1_b,
296+
g.replica_2_a
297+
move_first_n_buckets(old_sender, receiver, 3)
298+
unsync_replicaset_and_switch_master(old_sender, sender)
299+
cfg_swap_master_of_replicaset(g, 1)
300+
test_rebalancer_sending_until_master_sync(g, old_sender, sender, receiver)
301+
vtest.cluster_cfg(g, vtest.config_new(cfg_template))
302+
reset_rebalancer_intervals(g.replica_1_b)
303+
end
304+
305+
test_group.test_rebalancer_receiving_with_manual_master_switch = function(g)
306+
minimize_rebalancer_intervals(g.replica_1_a)
307+
minimize_rebalancer_intervals(g.replica_2_b)
308+
local old_receiver, receiver, sender = g.replica_2_a, g.replica_2_b,
309+
g.replica_1_a
310+
move_first_n_buckets(sender, old_receiver, 3)
311+
unsync_replicaset_and_switch_master(old_receiver, receiver)
312+
cfg_swap_master_of_replicaset(g, 2)
313+
test_rebalancer_receiving_until_master_sync(g, old_receiver, receiver,
314+
sender)
315+
vtest.cluster_cfg(g, vtest.config_new(cfg_template))
316+
reset_rebalancer_intervals(g.replica_1_a)
317+
reset_rebalancer_intervals(g.replica_2_b)
318+
end
319+
320+
test_group.test_recovery_with_manual_master_switch = function(g)
321+
local old_sender, sender, receiver = g.replica_1_a, g.replica_1_b,
322+
g.replica_2_a
323+
unsync_replicaset_and_switch_master(old_sender, sender)
324+
cfg_swap_master_of_replicaset(g, 1)
325+
test_recovery_is_nop_until_master_sync(old_sender, sender, receiver)
326+
vtest.cluster_cfg(g, vtest.config_new(cfg_template))
327+
end
328+
329+
test_group.test_rebalancer_sending_with_auto_master_switch = function(g)
330+
minimize_rebalancer_intervals(g.replica_1_b)
331+
local old_sender, sender, receiver = g.replica_1_a, g.replica_1_b,
332+
g.replica_2_a
333+
move_first_n_buckets(old_sender, receiver, 3)
334+
cfg_make_cluster_auto_master(g)
335+
unsync_replicaset_and_switch_master(old_sender, sender)
336+
test_rebalancer_sending_until_master_sync(g, old_sender, sender, receiver)
337+
vtest.cluster_cfg(g, vtest.config_new(cfg_template))
338+
reset_rebalancer_intervals(g.replica_1_b)
339+
end
340+
341+
test_group.test_rebalancer_receiving_with_auto_master_switch = function(g)
342+
minimize_rebalancer_intervals(g.replica_1_a)
343+
minimize_rebalancer_intervals(g.replica_2_b)
344+
local old_receiver, receiver, sender = g.replica_2_a, g.replica_2_b,
345+
g.replica_1_a
346+
move_first_n_buckets(sender, old_receiver, 3)
347+
cfg_make_cluster_auto_master(g)
348+
unsync_replicaset_and_switch_master(old_receiver, receiver)
349+
test_rebalancer_receiving_until_master_sync(g, old_receiver, receiver,
350+
sender)
351+
vtest.cluster_cfg(g, vtest.config_new(cfg_template))
352+
reset_rebalancer_intervals(g.replica_1_a)
353+
reset_rebalancer_intervals(g.replica_2_b)
354+
end
355+
356+
test_group.test_recovery_with_auto_master_switch = function(g)
357+
local old_sender, sender, receiver = g.replica_1_a, g.replica_1_b,
358+
g.replica_2_a
359+
cfg_make_cluster_auto_master(g)
360+
unsync_replicaset_and_switch_master(old_sender, sender)
361+
test_recovery_is_nop_until_master_sync(old_sender, sender, receiver)
362+
vtest.cluster_cfg(g, vtest.config_new(cfg_template))
363+
end

0 commit comments

Comments
 (0)