Skip to content

Commit 134ead9

Browse files
committed
basic tests for double_buckets problem
1 parent 6683748 commit 134ead9

File tree

2 files changed

+316
-27
lines changed

2 files changed

+316
-27
lines changed
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
local t = require('luatest')
2+
local json = require('json')
3+
local fiber = require('fiber')
4+
5+
local utils = require('crud.common.utils')
6+
7+
local helpers = require('test.helper')
8+
9+
local function wait_balance(g)
10+
t.helpers.retrying({timeout=30}, function()
11+
local buckets_count_s1 = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()")
12+
local buckets_count_s2 = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()")
13+
t.assert_equals(buckets_count_s1, 1500)
14+
t.assert_equals(buckets_count_s2, 1500)
15+
end)
16+
end
17+
18+
local function balance_cluster(g)
19+
if g.params.backend == "config" then
20+
local cfg = g.cluster:cfg()
21+
cfg.groups.storages.replicasets["s-1"].sharding = {
22+
weight = 1,
23+
}
24+
cfg.groups.storages.replicasets["s-2"].sharding = {
25+
weight = 1,
26+
}
27+
g.cluster:cfg(cfg)
28+
wait_balance(g)
29+
end
30+
end
31+
32+
local pgroup_duplicates = t.group('double_buckets_duplicates', helpers.backend_matrix({
33+
{engine = 'memtx', operation = 'replace'},
34+
{engine = 'memtx', operation = 'insert'},
35+
{engine = 'memtx', operation = 'upsert'},
36+
{engine = 'memtx', operation = 'insert_many'},
37+
{engine = 'memtx', operation = 'replace_many'},
38+
{engine = 'memtx', operation = 'upsert_many'},
39+
}))
40+
41+
pgroup_duplicates.before_all(function(g)
42+
helpers.start_default_cluster(g, 'srv_simple_operations')
43+
end)
44+
45+
pgroup_duplicates.after_all(function(g)
46+
helpers.stop_cluster(g.cluster, g.params.backend)
47+
end)
48+
49+
pgroup_duplicates.before_each(function(g)
50+
helpers.truncate_space_on_cluster(g.cluster, 'customers')
51+
end)
52+
53+
pgroup_duplicates.after_each(function(g)
54+
balance_cluster(g)
55+
end)
56+
57+
--- Rebalance stalls if we move all buckets at once; use a small subset.
58+
local test_tuples = {
59+
{22, box.NULL, 'Alex', 34},
60+
-- {92, box.NULL, 'Artur', 29},
61+
-- {3, box.NULL, 'Anastasia', 22},
62+
-- {5, box.NULL, 'Sergey', 25},
63+
-- {9, box.NULL, 'Anna', 30},
64+
-- {71, box.NULL, 'Oksana', 29},
65+
}
66+
67+
local last_call = fiber.time()
68+
local duplicate_operations = {
69+
insert = function(g)
70+
return g.router:call('crud.insert', {'customers', {45, box.NULL, 'John Fedor', 42}})
71+
end,
72+
replace = function(g)
73+
return g.router:call('crud.replace', {'customers', {45, box.NULL, 'John Fedor', 42}})
74+
end,
75+
upsert = function (g)
76+
return g.router:call('crud.upsert', {'customers', {45, box.NULL, 'John Fedor', 42}, {{'+', 'age', 1}}})
77+
end,
78+
insert_many = function(g)
79+
if fiber.time() - last_call < 1 then
80+
return
81+
end
82+
last_call = fiber.time()
83+
return g.router:call('crud.insert_many', {'customers', test_tuples})
84+
end,
85+
replace_many = function(g)
86+
if fiber.time() - last_call < 1 then
87+
return
88+
end
89+
last_call = fiber.time()
90+
return g.router:call('crud.replace_many', {'customers', test_tuples})
91+
end,
92+
upsert_many = function(g)
93+
if fiber.time() - last_call < 1 then
94+
return
95+
end
96+
last_call = fiber.time()
97+
local tuples = {}
98+
for i = 1, 2 do
99+
tuples[i] = {{i, box.NULL, 'John Fedor', 42}, {{'+', 'age', 1}}}
100+
end
101+
return g.router:call('crud.upsert_many', {'customers', tuples})
102+
end
103+
}
104+
105+
local function check_duplicates(tuples)
106+
local ids = {}
107+
for _, tuple in pairs(tuples) do
108+
t.assert_equals(ids[tuple[1]], nil, ('duplicate to tuple: %s'):format(json.encode(tuple)))
109+
ids[tuple[1]] = true
110+
end
111+
end
112+
113+
114+
--- write requests cause duplicates by primary key in cluster
115+
pgroup_duplicates.test_duplicates = function(g)
116+
t.skip_if(
117+
not (
118+
utils.tarantool_version_at_least(3, 1) and (g.params.backend == "config")
119+
),
120+
'test implemented only for 3.1 and greater'
121+
)
122+
if g.params.backend == "config" then
123+
t.xfail('not implemented yet')
124+
duplicate_operations[g.params.operation](g)
125+
126+
local cfg = g.cluster:cfg()
127+
cfg.groups.storages.replicasets["s-1"].sharding = {
128+
weight = 0,
129+
}
130+
g.cluster:cfg(cfg)
131+
t.helpers.retrying({timeout=30}, function()
132+
local buckets_count = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()")
133+
duplicate_operations[g.params.operation](g)
134+
t.assert_equals(buckets_count, 0)
135+
end)
136+
137+
cfg.groups.storages.replicasets["s-2"].sharding = {
138+
weight = 0,
139+
}
140+
cfg.groups.storages.replicasets["s-1"].sharding = {
141+
weight = 1,
142+
}
143+
g.cluster:cfg(cfg)
144+
t.helpers.retrying({timeout=30}, function()
145+
local buckets_count = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()")
146+
duplicate_operations[g.params.operation](g)
147+
t.assert_equals(buckets_count, 0)
148+
end)
149+
150+
local res = g.router:call('crud.select', {'customers'})
151+
check_duplicates(res.rows)
152+
end
153+
end
154+
155+
local pgroup_not_applied = t.group('double_buckets_not_applied', helpers.backend_matrix({
156+
{engine = 'memtx', operation = 'delete'},
157+
{engine = 'memtx', operation = 'update'},
158+
{engine = 'memtx', operation = 'get'},
159+
}))
160+
161+
pgroup_not_applied.before_all(function(g)
162+
helpers.start_default_cluster(g, 'srv_simple_operations')
163+
end)
164+
165+
pgroup_not_applied.after_all(function(g)
166+
helpers.stop_cluster(g.cluster, g.params.backend)
167+
end)
168+
169+
pgroup_not_applied.before_each(function(g)
170+
helpers.truncate_space_on_cluster(g.cluster, 'customers')
171+
end)
172+
173+
pgroup_not_applied.after_each(function(g)
174+
balance_cluster(g)
175+
end)
176+
177+
local not_applied_operations = {
178+
delete = {
179+
call = function(g, key)
180+
last_call = fiber.time()
181+
return g.router:call('crud.delete', { 'customers', {key} })
182+
end,
183+
check_applied = function(rows, applied_ids)
184+
for _, tuple in pairs(rows) do
185+
t.assert_equals(
186+
applied_ids[tuple[1]],
187+
nil,
188+
('tuples %s was marked as deleted, but exists'):format(json.encode(tuple))
189+
)
190+
end
191+
end,
192+
check_not_applied = function(not_applied_ids)
193+
t.assert_equals(
194+
next(not_applied_ids),
195+
nil,
196+
'tuples were inserted, but crud.delete returned 0 rows, as if there were no such tuples'
197+
)
198+
end
199+
},
200+
update = {
201+
call = function(g, key)
202+
return g.router:call('crud.update', { 'customers', key, {{'=', 'name', 'applied'}} })
203+
end,
204+
check_applied = function(rows, applied_ids)
205+
for _, tuple in pairs(rows) do
206+
if applied_ids[tuple[1]] then
207+
t.assert_equals(
208+
tuple[3],
209+
'applied',
210+
('tuples %s was marked as updated, but was not updated'):format(json.encode(tuple))
211+
)
212+
end
213+
end
214+
end,
215+
check_not_applied = function(not_applied_ids)
216+
t.assert_equals(
217+
next(not_applied_ids),
218+
nil,
219+
'tuples were created, but crud.update returned 0 rows, as if there were no such tuples'
220+
)
221+
end
222+
},
223+
get = {
224+
call = function (g, key)
225+
return g.router:call('crud.get', { 'customers', key })
226+
end,
227+
check_applied = function() end,
228+
check_not_applied = function(not_applied_ids)
229+
t.assert_equals(
230+
next(not_applied_ids),
231+
nil,
232+
'tuples were created, but crud.get returned 0 rows, as if there were no such tuples'
233+
)
234+
end
235+
}
236+
}
237+
238+
--- Some requests do not create duplicates but return 0 rows as if there is no tuple
239+
--- with this key. The tuple can still exist in cluster but be unavailable during
240+
--- rebalance. CRUD should return an error in this case, not 0 rows as if there were
241+
--- no tuples.
242+
pgroup_not_applied.test_not_applied = function(g)
243+
t.skip_if(
244+
not (
245+
utils.tarantool_version_at_least(3, 1) and (g.params.backend == "config")
246+
),
247+
'test implemented only for 3.1 and greater'
248+
)
249+
if g.params.backend == "config" then
250+
t.xfail('not implemented yet')
251+
local tuples, tuples_count = {}, 1000
252+
for i = 1, tuples_count do
253+
tuples[i] = {i, box.NULL, 'John Fedor', 42}
254+
end
255+
256+
local _, err = g.router:call('crud.replace_many', {'customers', tuples})
257+
t.assert_equals(err, nil)
258+
local cfg = g.cluster:cfg()
259+
cfg.groups.storages.replicasets["s-1"].sharding = {
260+
weight = 0,
261+
}
262+
g.cluster:cfg(cfg)
263+
local tuple_id = 1
264+
local not_applied_ids = {}
265+
local applied_ids = {}
266+
t.helpers.retrying({timeout=30}, function()
267+
if tuple_id > tuples_count then
268+
return
269+
end
270+
271+
local buckets_count = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()")
272+
local res, err = not_applied_operations[g.params.operation].call(g, tuple_id)
273+
if err == nil then
274+
if #res.rows == 0 then
275+
not_applied_ids[tuple_id] = true
276+
else
277+
applied_ids[tuple_id] = true
278+
end
279+
tuple_id = tuple_id + 1
280+
end
281+
282+
t.assert_equals(buckets_count, 0)
283+
end)
284+
285+
cfg.groups.storages.replicasets["s-2"].sharding = {
286+
weight = 0,
287+
}
288+
cfg.groups.storages.replicasets["s-1"].sharding = {
289+
weight = 1,
290+
}
291+
g.cluster:cfg(cfg)
292+
t.helpers.retrying({timeout=30}, function()
293+
if tuple_id > tuples_count then
294+
return
295+
end
296+
297+
local buckets_count = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()")
298+
local res, err = not_applied_operations[g.params.operation].call(g, tuple_id)
299+
300+
if err == nil then
301+
if #res.rows == 0 then
302+
not_applied_ids[tuple_id] = true
303+
else
304+
applied_ids[tuple_id] = true
305+
end
306+
tuple_id = tuple_id + 1
307+
end
308+
309+
t.assert_equals(buckets_count, 0)
310+
end)
311+
312+
local res = g.router:call('crud.select', {'customers'})
313+
not_applied_operations[g.params.operation].check_applied(res.rows, applied_ids)
314+
not_applied_operations[g.params.operation].check_not_applied(not_applied_ids)
315+
end
316+
end

test/tarantool3_helpers/cluster.lua

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -297,34 +297,7 @@ function Cluster:cfg(new_config)
297297
return table.deepcopy(self.config)
298298
end
299299

300-
local function strip_all_entries(t, name)
301-
if type(t) ~= 'table' then
302-
return t
303-
end
304-
305-
t[name] = nil
306-
307-
for k, v in pairs(t) do
308-
t[k] = strip_all_entries(v, name)
309-
end
310-
311-
return t
312-
end
313-
314-
local function check_only_roles_cfg_changed_in_groups(old_groups, new_groups)
315-
old_groups = table.deepcopy(old_groups)
316-
new_groups = table.deepcopy(new_groups)
317-
318-
local old_groups_no_roles_cfg = strip_all_entries(old_groups, 'roles_cfg')
319-
local new_groups_no_roles_cfg = strip_all_entries(new_groups, 'roles_cfg')
320-
321-
t.assert_equals(new_groups_no_roles_cfg, old_groups_no_roles_cfg,
322-
'groups reload supports only roles_cfg reload')
323-
end
324-
325300
function Cluster:reload_config(new_config)
326-
check_only_roles_cfg_changed_in_groups(self.config.groups, new_config.groups)
327-
328301
for _, server in ipairs(self.servers) do
329302
write_config(self.dirs[server], new_config)
330303
end

0 commit comments

Comments
 (0)