Skip to content

Commit b5e2b27

Browse files
select: yield on storage tuple lookup
Yield on `select`/`pairs` tuple lookup on storage. Fiber yields each `yield_every` records (same as in `count`). `yield_every` should be a positive integer, default is 1000. crud code contains several `while true` loops. Three of them are related to the retry strategy. Since retries wraps around net box calls which yield, it shouldn't be dangerous. The other two are in storage select procedure: on after tuple scroll and records filtering. If there are a lot of records which not satisfy any conditions, storage will stuck with 100% CPU load, like in #312. This patch covers these two cases. There are pairs loops without any yields on router: fields extraction. Since the count of records is not expected to be that big there (we already work with the final set which would be sent to user), this patch doesn't change the behavior on router. Closes #312
1 parent 7c4bb08 commit b5e2b27

File tree

8 files changed

+213
-2
lines changed

8 files changed

+213
-2
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
77

8+
## Unreleased
9+
10+
### Changed
11+
* Yield on select/pairs storage tuple lookup (#312).
12+
813
## [1.1.0] - 13-03-23
914

1015
### Added

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,8 @@ where:
959959
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
960960
vshard router instance. Set this parameter if your space is not
961961
a part of the default vshard cluster
962+
* `yield_every` (`?number`) - number of tuples processed on storage to yield after,
963+
`yield_every` should be > 0, default value is 1000
962964

963965

964966
Returns metadata and array of rows, error.

crud/select.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ local function select_on_storage(space_name, index_id, conditions, opts)
4242
sharding_key_hash = '?number',
4343
sharding_func_hash = '?number',
4444
skip_sharding_hash_check = '?boolean',
45+
yield_every = '?number',
4546
})
4647

4748
local space = box.space[space_name]
@@ -77,6 +78,7 @@ local function select_on_storage(space_name, index_id, conditions, opts)
7778
after_tuple = opts.after_tuple,
7879
tarantool_iter = opts.tarantool_iter,
7980
limit = opts.limit,
81+
yield_every = opts.yield_every,
8082
})
8183
if err ~= nil then
8284
return nil, SelectError:new("Failed to execute select: %s", err)

crud/select/compat/select.lua

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
2727
bucket_id = '?number|cdata',
2828
force_map_call = '?boolean',
2929
field_names = '?table',
30+
yield_every = '?number',
3031
call_opts = 'table',
3132
})
3233

@@ -36,6 +37,12 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
3637
return nil, SelectError:new("batch_size should be > 0")
3738
end
3839

40+
if opts.yield_every ~= nil and opts.yield_every < 1 then
41+
return nil, SelectError:new("yield_every should be > 0")
42+
end
43+
44+
local yield_every = opts.yield_every or const.DEFAULT_YIELD_EVERY
45+
3946
-- check conditions
4047
local conditions, err = compare_conditions.parse(user_conditions)
4148
if err ~= nil then
@@ -163,6 +170,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
163170
sharding_func_hash = sharding_func_hash,
164171
sharding_key_hash = sharding_key_data.hash,
165172
skip_sharding_hash_check = skip_sharding_hash_check,
173+
yield_every = yield_every,
166174
}
167175

168176
local merger = Merger.new(vshard_router, replicasets_to_select, space, plan.index_id,
@@ -202,6 +210,8 @@ function select_module.pairs(space_name, user_conditions, opts)
202210
timeout = '?number',
203211

204212
vshard_router = '?string|table',
213+
214+
yield_every = '?number',
205215
})
206216

207217
opts = opts or {}
@@ -222,6 +232,7 @@ function select_module.pairs(space_name, user_conditions, opts)
222232
bucket_id = opts.bucket_id,
223233
force_map_call = opts.force_map_call,
224234
field_names = opts.fields,
235+
yield_every = opts.yield_every,
225236
call_opts = {
226237
mode = opts.mode,
227238
prefer_replica = opts.prefer_replica,
@@ -273,6 +284,8 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions,
273284
timeout = '?number',
274285

275286
vshard_router = '?string|table',
287+
288+
yield_every = '?number',
276289
})
277290

278291
if opts.first ~= nil and opts.first < 0 then
@@ -288,6 +301,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions,
288301
bucket_id = opts.bucket_id,
289302
force_map_call = opts.force_map_call,
290303
field_names = opts.fields,
304+
yield_every = opts.yield_every,
291305
call_opts = {
292306
mode = opts.mode,
293307
prefer_replica = opts.prefer_replica,

crud/select/compat/select_old.lua

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ local function select_iteration(space_name, plan, opts)
3030
call_opts = 'table',
3131
sharding_hash = 'table',
3232
vshard_router = 'table',
33+
yield_every = 'number',
3334
})
3435

3536
local call_opts = opts.call_opts
@@ -45,6 +46,7 @@ local function select_iteration(space_name, plan, opts)
4546
sharding_func_hash = opts.sharding_hash.sharding_func_hash,
4647
sharding_key_hash = opts.sharding_hash.sharding_key_hash,
4748
skip_sharding_hash_check = opts.sharding_hash.skip_sharding_hash_check,
49+
yield_every = opts.yield_every,
4850
}
4951

5052
local storage_select_args = {
@@ -90,6 +92,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
9092
bucket_id = '?number|cdata',
9193
force_map_call = '?boolean',
9294
field_names = '?table',
95+
yield_every = '?number',
9396
call_opts = 'table',
9497
})
9598

@@ -99,6 +102,12 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
99102
return nil, SelectError:new("batch_size should be > 0")
100103
end
101104

105+
if opts.yield_every ~= nil and opts.yield_every < 1 then
106+
return nil, SelectError:new("yield_every should be > 0")
107+
end
108+
109+
local yield_every = opts.yield_every or const.DEFAULT_YIELD_EVERY
110+
102111
local batch_size = opts.batch_size or common.DEFAULT_BATCH_SIZE
103112

104113
-- check conditions
@@ -214,6 +223,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
214223
call_opts = opts.call_opts,
215224
sharding_hash = sharding_hash,
216225
vshard_router = vshard_router,
226+
yield_every = yield_every,
217227
})
218228

219229
return iter
@@ -235,6 +245,8 @@ function select_module.pairs(space_name, user_conditions, opts)
235245
timeout = '?number',
236246

237247
vshard_router = '?string|table',
248+
249+
yield_every = '?number',
238250
})
239251

240252
opts = opts or {}
@@ -255,6 +267,7 @@ function select_module.pairs(space_name, user_conditions, opts)
255267
bucket_id = opts.bucket_id,
256268
force_map_call = opts.force_map_call,
257269
field_names = opts.fields,
270+
yield_every = opts.yield_every,
258271
call_opts = {
259272
mode = opts.mode,
260273
prefer_replica = opts.prefer_replica,
@@ -326,6 +339,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions,
326339
bucket_id = opts.bucket_id,
327340
force_map_call = opts.force_map_call,
328341
field_names = opts.fields,
342+
yield_every = opts.yield_every,
329343
call_opts = {
330344
mode = opts.mode,
331345
prefer_replica = opts.prefer_replica,
@@ -387,6 +401,8 @@ function select_module.call(space_name, user_conditions, opts)
387401
timeout = '?number',
388402

389403
vshard_router = '?string|table',
404+
405+
yield_every = '?number',
390406
})
391407

392408
opts = opts or {}

crud/select/executor.lua

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local errors = require('errors')
2+
local fiber = require('fiber')
23
local fun = require('fun')
34

45
local dev_checks = require('crud.common.dev_checks')
@@ -17,17 +18,23 @@ local ExecuteSelectError = errors.new_class('ExecuteSelectError')
1718

1819
local executor = {}
1920

20-
local function scroll_to_after_tuple(gen, space, scan_index, tarantool_iter, after_tuple)
21+
local function scroll_to_after_tuple(gen, space, scan_index, tarantool_iter, after_tuple, yield_every)
2122
local primary_index = space.index[0]
2223

2324
local scroll_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts)
2425

2526
local cmp_operator = select_comparators.get_cmp_operator(tarantool_iter)
2627
local scroll_comparator = select_comparators.gen_tuples_comparator(cmp_operator, scroll_key_parts)
2728

29+
local looked_up_tuples = 0
2830
while true do
2931
local tuple
3032
gen.state, tuple = gen(gen.param, gen.state)
33+
looked_up_tuples = looked_up_tuples + 1
34+
35+
if yield_every ~= nil and looked_up_tuples % yield_every == 0 then
36+
fiber.yield()
37+
end
3138

3239
if tuple == nil then
3340
return nil
@@ -73,6 +80,7 @@ function executor.execute(space, index, filter_func, opts)
7380
after_tuple = '?table',
7481
tarantool_iter = 'number',
7582
limit = '?number',
83+
yield_every = '?number',
7684
})
7785

7886
opts = opts or {}
@@ -137,7 +145,7 @@ function executor.execute(space, index, filter_func, opts)
137145

138146
if opts.after_tuple ~= nil then
139147
local err
140-
tuple, err = scroll_to_after_tuple(gen, space, index, opts.tarantool_iter, opts.after_tuple)
148+
tuple, err = scroll_to_after_tuple(gen, space, index, opts.tarantool_iter, opts.after_tuple, opts.yield_every)
141149
if err ~= nil then
142150
return nil, ExecuteSelectError:new("Failed to scroll to the after_tuple: %s", err)
143151
end
@@ -151,6 +159,7 @@ function executor.execute(space, index, filter_func, opts)
151159
gen.state, tuple = gen(gen.param, gen.state)
152160
end
153161

162+
local looked_up_tuples = 0
154163
while true do
155164
if tuple == nil then
156165
break
@@ -170,6 +179,11 @@ function executor.execute(space, index, filter_func, opts)
170179
end
171180

172181
gen.state, tuple = gen(gen.param, gen.state)
182+
looked_up_tuples = looked_up_tuples + 1
183+
184+
if opts.yield_every ~= nil and looked_up_tuples % opts.yield_every == 0 then
185+
fiber.yield()
186+
end
173187
end
174188

175189
return resp

crud/select/iterator.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ function Iterator.new(opts)
2828

2929
call_opts = 'table',
3030
sharding_hash = 'table',
31+
32+
yield_every = '?number',
3133
})
3234

3335
local iter = {
@@ -57,6 +59,8 @@ function Iterator.new(opts)
5759
wait_for_update = false,
5860

5961
sharding_hash = opts.sharding_hash,
62+
63+
yield_every = opts.yield_every,
6064
}
6165

6266
setmetatable(iter, Iterator)
@@ -108,6 +112,7 @@ local function update_replicasets_tuples(iter, after_tuple, replicaset_uuid)
108112
call_opts = iter.call_opts,
109113
sharding_hash = iter.sharding_hash,
110114
vshard_router = iter.vshard_router,
115+
yield_every = iter.yield_every,
111116
})
112117
if err ~= nil then
113118
if sharding.result_needs_sharding_reload(err) then

0 commit comments

Comments
 (0)