Skip to content

Commit 6560c23

Browse files
authored
Refactor select plan (#32)
1 parent 9929417 commit 6560c23

File tree

9 files changed

+790
-661
lines changed

9 files changed

+790
-661
lines changed

crud/select.lua

Lines changed: 51 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ local select_conditions = require('crud.select.conditions')
1010
local select_plan = require('crud.select.plan')
1111
local select_executor = require('crud.select.executor')
1212
local select_comparators = require('crud.select.comparators')
13+
local select_filters = require('crud.select.filters')
1314

1415
local Iterator = require('crud.select.iterator')
1516

@@ -22,29 +23,42 @@ local select_module = {}
2223

2324
local SELECT_FUNC_NAME = '__select'
2425

25-
local function call_select_on_storage(space_name, conditions, opts)
26-
checks('string', '?table', {
27-
limit = '?number',
26+
local DEFAULT_BATCH_SIZE = 100
27+
28+
local function call_select_on_storage(space_name, index_id, conditions, opts)
29+
checks('string', 'number', '?table', {
30+
scan_value = 'table',
2831
after_tuple = '?table',
32+
iter = 'number',
33+
limit = 'number',
34+
scan_condition_num = '?number',
2935
})
3036

3137
local space = box.space[space_name]
3238
if space == nil then
3339
return nil, SelectError:new("Space %s doesn't exists", space_name)
3440
end
3541

36-
-- plan select
37-
local plan, err = select_plan.new(space, conditions, {
38-
limit = opts.limit,
39-
after_tuple = opts.after_tuple,
40-
})
42+
local index = space.index[index_id]
43+
if index == nil then
44+
return nil, SelectError:new("Index with ID %s doesn't exists", index_id)
45+
end
4146

47+
local filter_func, err = select_filters.gen_func(space, conditions, {
48+
iter = opts.iter,
49+
scan_condition_num = opts.scan_condition_num,
50+
})
4251
if err ~= nil then
43-
return nil, SelectError:new("Failed to plan select: %s", err)
52+
return nil, SelectError:new("Failed to generate tuples filter: %s", err)
4453
end
4554

4655
-- execute select
47-
local tuples, err = select_executor.execute(plan)
56+
local tuples, err = select_executor.execute(space, index, filter_func, {
57+
scan_value = opts.scan_value,
58+
after_tuple = opts.after_tuple,
59+
iter = opts.iter,
60+
limit = opts.limit,
61+
})
4862
if err ~= nil then
4963
return nil, SelectError:new("Failed to execute select: %s", err)
5064
end
@@ -58,21 +72,26 @@ function select_module.init()
5872
})
5973
end
6074

61-
local function select_iteration(space_name, conditions, opts)
75+
local function select_iteration(space_name, plan, opts)
6276
checks('string', '?table', {
6377
after_tuple = '?table',
6478
replicasets = 'table',
6579
timeout = '?number',
66-
batch_size = '?number',
80+
limit = 'number',
6781
})
6882

6983
-- call select on storages
7084
local storage_select_opts = {
85+
scan_value = plan.scan_value,
7186
after_tuple = opts.after_tuple,
72-
limit = opts.batch_size,
87+
iter = plan.iter,
88+
limit = opts.limit,
89+
scan_condition_num = plan.scan_condition_num,
7390
}
7491

75-
local storage_select_args = {space_name, conditions, storage_select_opts}
92+
local storage_select_args = {
93+
space_name, plan.index_id, plan.conditions, storage_select_opts,
94+
}
7695

7796
local results, err = call.ro(SELECT_FUNC_NAME, storage_select_args, {
7897
replicasets = opts.replicasets,
@@ -86,14 +105,8 @@ local function select_iteration(space_name, conditions, opts)
86105
return results
87106
end
88107

89-
local function get_replicasets_to_select_from(plan, all_replicasets)
90-
if not plan.is_scan_by_full_sharding_key_eq then
91-
return all_replicasets
92-
end
93-
94-
plan.scanner.limit = 1
95-
96-
local bucket_id = vshard.router.bucket_id_strcrc32(plan.scanner.value)
108+
local function get_replicasets_by_sharding_key(sharding_key)
109+
local bucket_id = vshard.router.bucket_id_strcrc32(sharding_key)
97110
local replicaset, err = vshard.router.route(bucket_id)
98111
if replicaset == nil then
99112
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
@@ -118,6 +131,8 @@ local function build_select_iterator(space_name, user_conditions, opts)
118131
return nil, SelectError:new("batch_size should be > 0")
119132
end
120133

134+
local batch_size = opts.batch_size or DEFAULT_BATCH_SIZE
135+
121136
if opts.limit ~= nil and opts.limit < 0 then
122137
return nil, SelectError:new("limit should be >= 0")
123138
end
@@ -137,32 +152,32 @@ local function build_select_iterator(space_name, user_conditions, opts)
137152
if space == nil then
138153
return nil, SelectError:new("Space %s doesn't exists", space_name)
139154
end
140-
141155
local space_format = space:format()
142156

143-
local after_tuple = utils.flatten(opts.after, space_format)
144-
145157
-- plan select
146158
local plan, err = select_plan.new(space, conditions, {
147159
limit = opts.limit,
148-
after_tuple = after_tuple
149160
})
150161

151162
if err ~= nil then
152163
return nil, SelectError:new("Failed to plan select: %s", err)
153164
end
154165

155-
-- get replicasets to select from
156-
local replicasets, err = get_replicasets_to_select_from(plan, replicasets)
157-
if err ~= nil then
158-
return nil, SelectError:new("Failed to get replicasets to select from: %s", err)
166+
-- set limit and replicasets to select from
167+
local replicasets_to_select = replicasets
168+
169+
if plan.sharding_key ~= nil then
170+
replicasets_to_select = get_replicasets_by_sharding_key(plan.sharding_key)
159171
end
160172

161-
local scan_index = space.index[plan.scanner.index_id]
162-
local primary_index = space.index[0]
173+
-- set after tuple
174+
local after_tuple = utils.flatten(opts.after, space_format)
163175

176+
-- generate tuples comparator
177+
local scan_index = space.index[plan.index_id]
178+
local primary_index = space.index[0]
164179
local cmp_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)
165-
local cmp_operator = select_comparators.get_cmp_operator(plan.scanner.iter)
180+
local cmp_operator = select_comparators.get_cmp_operator(plan.iter)
166181
local tuples_comparator, err = select_comparators.gen_tuples_comparator(
167182
cmp_operator, cmp_key_parts
168183
)
@@ -176,12 +191,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
176191
iteration_func = select_iteration,
177192
comparator = tuples_comparator,
178193

179-
conditions = conditions,
194+
plan = plan,
180195
after_tuple = after_tuple,
181-
limit = plan.scanner.limit,
182196

183-
batch_size = opts.batch_size,
184-
replicasets = replicasets,
197+
batch_size = batch_size,
198+
replicasets = replicasets_to_select,
185199

186200
timeout = opts.timeout,
187201
})

crud/select/executor.lua

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
local checks = require('checks')
12
local errors = require('errors')
23
local log = require('log')
34

4-
local select_filters = require('crud.select.filters')
55
local select_comparators = require('crud.select.comparators')
66

77
local utils = require('crud.common.utils')
@@ -11,72 +11,72 @@ local ExecuteSelectError = errors.new_class('ExecuteSelectError')
1111

1212
local executor = {}
1313

14-
local function scroll_to_after_tuple(gen, param, state, space, scanner)
15-
local scan_index = space.index[scanner.index_id]
14+
local function scroll_to_after_tuple(gen, space, scan_index, iter, after_tuple)
1615
local primary_index = space.index[0]
1716

1817
local scroll_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)
1918

20-
local cmp_operator = select_comparators.get_cmp_operator(scanner.iter)
19+
local cmp_operator = select_comparators.get_cmp_operator(iter)
2120
local scroll_comparator, err = select_comparators.gen_tuples_comparator(cmp_operator, scroll_key_parts)
2221
if err ~= nil then
2322
return nil, ScrollToAfterError:new("Failed to generate comparator to scroll: %s", err)
2423
end
2524

2625
while true do
2726
local tuple
28-
state, tuple = gen(param, state)
27+
gen.state, tuple = gen(gen.param, gen.state)
2928

3029
if tuple == nil then
3130
return nil
3231
end
3332

34-
if scroll_comparator(tuple, scanner.after_tuple) then
33+
if scroll_comparator(tuple, after_tuple) then
3534
return tuple
3635
end
3736
end
3837
end
3938

40-
function executor.execute(plan)
41-
local scanner = plan.scanner
39+
function executor.execute(space, index, filter_func, opts)
40+
checks('table', 'table', 'function', {
41+
scan_value = 'table',
42+
after_tuple = '?cdata|table',
43+
iter = 'number',
44+
limit = '?number',
45+
})
4246

43-
if scanner.limit == 0 then
47+
opts = opts or {}
48+
49+
if opts.limit == 0 then
4450
return {}
4551
end
4652

47-
local filter = select_filters.gen_code(plan.filter_conditions)
48-
local filer_func = select_filters.compile(filter)
49-
5053
local tuples = {}
5154
local tuples_count = 0
5255

53-
local space = box.space[scanner.space_name]
54-
local index = space.index[scanner.index_id]
55-
56-
local scan_value = scanner.value
57-
if scanner.after_tuple ~= nil then
58-
if scan_value == nil then
59-
scan_value = scanner.after_tuple
56+
local value = opts.scan_value
57+
if opts.after_tuple ~= nil then
58+
if value == nil then
59+
value = opts.after_tuple
6060
else
61-
local cmp_operator = select_comparators.get_cmp_operator(scanner.iter)
61+
local cmp_operator = select_comparators.get_cmp_operator(opts.iter)
6262
local scan_comparator, err = select_comparators.gen_tuples_comparator(cmp_operator, index.parts)
6363
if err ~= nil then
6464
log.warn("Failed to generate comparator for scan value: %s", err)
65-
elseif scan_comparator(scanner.after_tuple, scan_value) then
66-
local after_tuple_key = utils.extract_key(scanner.after_tuple, index.parts)
67-
scan_value = after_tuple_key
65+
elseif scan_comparator(opts.after_tuple, opts.scan_value) then
66+
local after_tuple_key = utils.extract_key(opts.after_tuple, index.parts)
67+
value = after_tuple_key
6868
end
6969
end
7070
end
7171

7272
local tuple
73-
local gen,param,state = index:pairs(scan_value, {iterator = scanner.iter})
73+
local gen = index:pairs(value, {iterator = opts.iter})
7474

75-
if scanner.after_tuple ~= nil then
75+
if opts.after_tuple ~= nil then
7676
local err
77-
tuple, err = scroll_to_after_tuple(gen, param, state, space, scanner)
77+
tuple, err = scroll_to_after_tuple(gen, space, index, opts.iter, opts.after_tuple)
7878
if err ~= nil then
79-
return nil, ExecuteSelectError:new("Failed to scroll to the last tuple: %s", err)
79+
return nil, ExecuteSelectError:new("Failed to scroll to the after_tuple: %s", err)
8080
end
8181

8282
if tuple == nil then
@@ -85,28 +85,28 @@ function executor.execute(plan)
8585
end
8686

8787
if tuple == nil then
88-
state, tuple = gen(param, state)
88+
gen.state, tuple = gen(gen.param, gen.state)
8989
end
9090

9191
while true do
9292
if tuple == nil then
9393
break
9494
end
9595

96-
local matched, early_exit = filer_func(tuple)
96+
local matched, early_exit = filter_func(tuple)
9797

9898
if matched then
9999
table.insert(tuples, tuple)
100100
tuples_count = tuples_count + 1
101101

102-
if scanner.limit ~= nil and tuples_count >= scanner.limit then
102+
if opts.limit ~= nil and tuples_count >= opts.limit then
103103
break
104104
end
105105
elseif early_exit then
106106
break
107107
end
108108

109-
state, tuple = gen(param, state)
109+
gen.state, tuple = gen(gen.param, gen.state)
110110
end
111111

112112
return tuples

0 commit comments

Comments
 (0)