Skip to content

Commit 82294c8

Browse files
AnaNekTotktonada
authored andcommitted
Implementation of batch insert
Batch insert is mostly used for operation with one bucket / one Tarantool node in a transaction. In this case batch insert is more efficient then inserting tuple-by-tuple. Right now CRUD cannot provide batch insert with full consistency. CRUD offers batch insert with partial consistency. That means that full consistency can be provided only on single replicaset using `box` transactions. Part of #193
1 parent 8e00652 commit 82294c8

17 files changed

+3141
-41
lines changed

README.md

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ It also provides the `crud-storage` and `crud-router` roles for
1616
- [Quickstart](#quickstart)
1717
- [API](#api)
1818
- [Insert](#insert)
19+
- [Insert many](#insert-many)
1920
- [Get](#get)
2021
- [Update](#update)
2122
- [Delete](#delete)
@@ -233,6 +234,143 @@ crud.insert_object('customers', {
233234
...
234235
```
235236

237+
### Insert many
238+
239+
```lua
240+
-- Insert batch of tuples
241+
local result, err = crud.insert_many(space_name, tuples, opts)
242+
-- Insert batch of objects
243+
local result, err = crud.insert_object_many(space_name, objects, opts)
244+
```
245+
246+
where:
247+
248+
* `space_name` (`string`) - name of the space to insert an object
249+
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
250+
* `opts`:
251+
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
252+
* `fields` (`?table`) - field names for getting only a subset of fields
253+
* `stop_on_error` (`?boolean`) - stop on a first error and report error
254+
regarding the failed operation and error about what tuples were not
255+
performed, default is `false`
256+
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
257+
rollback on a storage, where the operation is failed, report error
258+
about what tuples were rollback, default is `false`
259+
260+
Returns metadata and array with inserted rows, array of errors.
261+
Each error object can contain field `operation_data`.
262+
263+
`operation_data` field can contain:
264+
* tuple for which the error occurred;
265+
* object with an incorrect format;
266+
* tuple the operation on which was performed but
267+
operation was rollback;
268+
* tuple the operation on which was not performed
269+
because operation was stopped by error.
270+
271+
Right now CRUD cannot provide batch insert with full consistency.
272+
CRUD offers batch insert with partial consistency. That means
273+
that full consistency can be provided only on single replicaset
274+
using `box` transactions.
275+
276+
**Example:**
277+
278+
```lua
279+
crud.insert_many('customers', {
280+
{1, box.NULL, 'Elizabeth', 23},
281+
{2, box.NULL, 'Anastasia', 22},
282+
})
283+
---
284+
- metadata:
285+
- {'name': 'id', 'type': 'unsigned'}
286+
- {'name': 'bucket_id', 'type': 'unsigned'}
287+
- {'name': 'name', 'type': 'string'}
288+
- {'name': 'age', 'type': 'number'}
289+
rows:
290+
- [1, 477, 'Elizabeth', 23]
291+
- [2, 401, 'Anastasia', 22]
292+
...
293+
crud.insert_object_many('customers', {
294+
{id = 3, name = 'Elizabeth', age = 24},
295+
{id = 10, name = 'Anastasia', age = 21},
296+
})
297+
---
298+
- metadata:
299+
- {'name': 'id', 'type': 'unsigned'}
300+
- {'name': 'bucket_id', 'type': 'unsigned'}
301+
- {'name': 'name', 'type': 'string'}
302+
- {'name': 'age', 'type': 'number'}
303+
rows:
304+
- [3, 2804, 'Elizabeth', 24]
305+
- [10, 569, 'Anastasia', 21]
306+
307+
-- Partial success
308+
local res, errs = crud.insert_object_many('customers', {
309+
{id = 22, name = 'Alex', age = 34},
310+
{id = 3, name = 'Anastasia', age = 22},
311+
{id = 5, name = 'Sergey', age = 25},
312+
})
313+
---
314+
res
315+
- metadata:
316+
- {'name': 'id', 'type': 'unsigned'}
317+
- {'name': 'bucket_id', 'type': 'unsigned'}
318+
- {'name': 'name', 'type': 'string'}
319+
- {'name': 'age', 'type': 'number'}
320+
rows:
321+
- [5, 1172, 'Sergey', 25],
322+
- [22, 655, 'Alex', 34],
323+
324+
#errs -- 1
325+
errs[1].class_name -- BatchInsertError
326+
errs[1].err -- 'Duplicate key exists <...>'
327+
errs[1].tuple -- {3, 2804, 'Anastasia', 22}
328+
...
329+
330+
-- Partial success with stop and rollback on error
331+
-- stop_on_error = true, rollback_on_error = true
332+
-- two error on one storage with rollback, inserts
333+
-- stop by error on this storage inserts before
334+
-- error are rollback
335+
local res, errs = crud.insert_object_many('customers', {
336+
{id = 6, name = 'Alex', age = 34},
337+
{id = 92, name = 'Artur', age = 29},
338+
{id = 3, name = 'Anastasia', age = 22},
339+
{id = 4, name = 'Sergey', age = 25},
340+
{id = 9, name = 'Anna', age = 30},
341+
{id = 71, name = 'Oksana', age = 29},
342+
}, {
343+
stop_on_error = true,
344+
rollback_on_error = true,
345+
}})
346+
---
347+
res
348+
- metadata:
349+
- {'name': 'id', 'type': 'unsigned'}
350+
- {'name': 'bucket_id', 'type': 'unsigned'}
351+
- {'name': 'name', 'type': 'string'}
352+
- {'name': 'age', 'type': 'number'}
353+
rows:
354+
- [4, 1161, 'Sergey', 25],
355+
- [6, 1064, 'Alex', 34],
356+
#errs -- 4
357+
errs[1].class_name -- InsertManyError
358+
errs[1].err -- 'Duplicate key exists <...>'
359+
errs[1].tuple -- {3, 2804, 'Anastasia', 22}
360+
361+
errs[2].class_name -- NotPerformedError
362+
errs[2].err -- 'Operation with tuple was not performed'
363+
errs[2].tuple -- {9, 1644, "Anna", 30}
364+
365+
errs[3].class_name -- NotPerformedError
366+
errs[3].err -- 'Operation with tuple was not performed'
367+
errs[3].tuple -- {71, 1802, "Oksana", 29}
368+
369+
errs[4].class_name -- NotPerformedError
370+
errs[4].err -- 'Operation with tuple was rollback'
371+
errs[4].tuple -- {92, 2040, "Artur", 29}
372+
```
373+
236374
### Get
237375

238376
```lua

crud.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
local cfg = require('crud.cfg')
66
local insert = require('crud.insert')
7+
local insert_many = require('crud.insert_many')
78
local replace = require('crud.replace')
89
local get = require('crud.get')
910
local update = require('crud.update')
@@ -31,6 +32,14 @@ crud.insert = stats.wrap(insert.tuple, stats.op.INSERT)
3132
-- @function insert_object
3233
crud.insert_object = stats.wrap(insert.object, stats.op.INSERT)
3334

35+
-- @refer insert_many.tuples
36+
-- @function insert_many
37+
crud.insert_many = insert_many.tuples
38+
39+
-- @refer insert_many.objects
40+
-- @function insert_object_many
41+
crud.insert_object_many = insert_many.objects
42+
3443
-- @refer get.call
3544
-- @function get
3645
crud.get = stats.wrap(get.call, stats.op.GET)
@@ -124,6 +133,7 @@ function crud.init_storage()
124133
end
125134

126135
insert.init()
136+
insert_many.init()
127137
get.init()
128138
replace.init()
129139
update.init()

crud/common/batching_utils.lua

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
local errors = require('errors')
2+
local dev_checks = require('crud.common.dev_checks')
3+
local sharding_utils = require('crud.common.sharding.utils')
4+
5+
local NotPerformedError = errors.new_class('NotPerformedError', {capture_stack = false})
6+
7+
local batching_utils = {}
8+
9+
batching_utils.stop_on_error_msg = "Operation with tuple was not performed"
10+
batching_utils.rollback_on_error_msg = "Operation with tuple was rollback"
11+
12+
function batching_utils.construct_sharding_hash_mismatch_errors(err_msg, tuples)
13+
dev_checks('string', 'table')
14+
15+
local errs = {}
16+
17+
for _, tuple in ipairs(tuples) do
18+
local err_obj = sharding_utils.ShardingHashMismatchError:new(err_msg)
19+
err_obj.operation_data = tuple
20+
table.insert(errs, err_obj)
21+
end
22+
23+
return errs
24+
end
25+
26+
function batching_utils.complement_batching_errors(errs, err_msg, tuples)
27+
dev_checks('table', 'string', 'table')
28+
29+
for _, tuple in ipairs(tuples) do
30+
local err_obj = NotPerformedError:new(err_msg)
31+
err_obj.operation_data = tuple
32+
table.insert(errs, err_obj)
33+
end
34+
35+
return errs
36+
end
37+
38+
return batching_utils

crud/common/call.lua

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ local utils = require('crud.common.utils')
66
local sharding_utils = require('crud.common.sharding.utils')
77
local fiber_clock = require('fiber').clock
88

9+
local BaseIterator = require('crud.common.map_call_cases.base_iter')
10+
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
11+
912
local CallError = errors.new_class('CallError')
1013

1114
local call = {}
@@ -71,6 +74,8 @@ function call.map(func_name, func_args, opts)
7174
balance = '?boolean',
7275
timeout = '?number',
7376
replicasets = '?table',
77+
iter = '?table',
78+
postprocessor = '?table',
7479
})
7580
opts = opts or {}
7681

@@ -81,24 +86,27 @@ function call.map(func_name, func_args, opts)
8186

8287
local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
8388

84-
local replicasets, err
85-
if opts.replicasets ~= nil then
86-
replicasets = opts.replicasets
87-
else
88-
replicasets, err = vshard.router.routeall()
89-
if replicasets == nil then
90-
return nil, CallError:new("Failed to get all replicasets: %s", err.err)
89+
local iter = opts.iter
90+
if iter == nil then
91+
iter, err = BaseIterator:new({func_args = func_args, replicasets = opts.replicasets})
92+
if err ~= nil then
93+
return nil, err
9194
end
9295
end
9396

97+
local postprocessor = opts.postprocessor
98+
if postprocessor == nil then
99+
postprocessor = BasePostprocessor:new()
100+
end
101+
94102
local futures_by_replicasets = {}
95103
local call_opts = {is_async = true}
96-
for _, replicaset in pairs(replicasets) do
97-
local future = replicaset[vshard_call_name](replicaset, func_name, func_args, call_opts)
104+
while iter:has_next() do
105+
local args, replicaset = iter:get()
106+
local future = replicaset[vshard_call_name](replicaset, func_name, args, call_opts)
98107
futures_by_replicasets[replicaset.uuid] = future
99108
end
100109

101-
local results = {}
102110
local deadline = fiber_clock() + timeout
103111
for replicaset_uuid, future in pairs(futures_by_replicasets) do
104112
local wait_timeout = deadline - fiber_clock()
@@ -107,18 +115,25 @@ function call.map(func_name, func_args, opts)
107115
end
108116

109117
local result, err = future:wait_result(wait_timeout)
110-
if err == nil and result[1] == nil then
111-
err = result[2]
112-
end
113118

114-
if err ~= nil then
115-
return nil, wrap_vshard_err(err, func_name, replicaset_uuid)
116-
end
119+
local result_info = {
120+
key = replicaset_uuid,
121+
value = result,
122+
}
117123

118-
results[replicaset_uuid] = result
124+
local err_info = {
125+
err_wrapper = wrap_vshard_err,
126+
err = err,
127+
wrapper_args = {func_name, replicaset_uuid},
128+
}
129+
130+
local early_exit = postprocessor:collect(result_info, err_info)
131+
if early_exit then
132+
break
133+
end
119134
end
120135

121-
return results
136+
return postprocessor:get()
122137
end
123138

124139
function call.single(bucket_id, func_name, func_args, opts)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
local errors = require('errors')
2+
local vshard = require('vshard')
3+
4+
local dev_checks = require('crud.common.dev_checks')
5+
local GetReplicasetsError = errors.new_class('GetReplicasetsError')
6+
7+
local BaseIterator = {}
8+
9+
--- Create new base iterator for map call
10+
--
11+
-- @function new
12+
--
13+
-- @tparam[opt] table opts
14+
-- Options of BaseIterator:new
15+
-- @tparam[opt] table opts.func_args
16+
-- Function arguments to call
17+
-- @tparam[opt] table opts.replicasets
18+
-- Replicasets to call
19+
--
20+
-- @return[1] table iterator
21+
-- @treturn[2] nil
22+
-- @treturn[2] table of tables Error description
23+
function BaseIterator:new(opts)
24+
dev_checks('table', {
25+
func_args = '?table',
26+
replicasets = '?table',
27+
})
28+
29+
local replicasets, err
30+
if opts.replicasets ~= nil then
31+
replicasets = opts.replicasets
32+
else
33+
replicasets, err = vshard.router.routeall()
34+
if replicasets == nil then
35+
return nil, GetReplicasetsError:new("Failed to get all replicasets: %s", err.err)
36+
end
37+
end
38+
39+
local next_index, next_replicaset = next(replicasets)
40+
41+
local iter = {
42+
func_args = opts.func_args,
43+
replicasets = replicasets,
44+
next_replicaset = next_replicaset,
45+
next_index = next_index
46+
}
47+
48+
setmetatable(iter, self)
49+
self.__index = self
50+
51+
return iter
52+
end
53+
54+
--- Check there is next replicaset to call
55+
--
56+
-- @function has_next
57+
--
58+
-- @return[1] boolean
59+
function BaseIterator:has_next()
60+
return self.next_index ~= nil
61+
end
62+
63+
--- Get function arguments and next replicaset
64+
--
65+
-- @function get
66+
--
67+
-- @return[1] table func_args
68+
-- @return[2] table replicaset
69+
function BaseIterator:get()
70+
local replicaset = self.next_replicaset
71+
self.next_index, self.next_replicaset = next(self.replicasets, self.next_index)
72+
73+
return self.func_args, replicaset
74+
end
75+
76+
return BaseIterator

0 commit comments

Comments
 (0)