Skip to content

Commit b48fe50

Browse files
AnaNekTotktonada
authored andcommitted
Implementation of batch upsert
Batch upsert is mostly used for operation with one bucket / one Tarantool node in a transaction. In this case batch upsert is more efficient then upserting tuple-by-tuple. Right now CRUD cannot provide batch upsert with full consistency. CRUD offers batch upsert with partial consistency. That means that full consistency can be provided only on single replicaset using `box` transactions. Part of #193
1 parent 82294c8 commit b48fe50

File tree

11 files changed

+2722
-7
lines changed

11 files changed

+2722
-7
lines changed

README.md

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ It also provides the `crud-storage` and `crud-router` roles for
2222
- [Delete](#delete)
2323
- [Replace](#replace)
2424
- [Upsert](#upsert)
25+
- [Upsert many](#upsert-many)
2526
- [Select](#select)
2627
- [Select conditions](#select-conditions)
2728
- [Pairs](#pairs)
@@ -567,6 +568,134 @@ crud.upsert_object('customers',
567568
...
568569
```
569570

571+
### Upsert many
572+
573+
```lua
574+
-- Upsert batch of tuples
575+
local result, err = crud.upsert_many(space_name, tuples_operation_data, opts)
576+
-- Upsert batch of objects
577+
local result, err = crud.upsert_object_many(space_name, objects_operation_data, opts)
578+
```
579+
580+
where:
581+
582+
* `space_name` (`string`) - name of the space to insert an object
583+
* `tuples_operation_data` / `objects_operation_data` (`table`) - array of
584+
tuples/objects to insert
585+
and update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update)
586+
in format {{tuple_1, operation_1}, ..., {tuple_n, operation_n}},
587+
if there is tuple with duplicate key then existing tuple will
588+
be updated with update operations
589+
* `opts`:
590+
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
591+
* `fields` (`?table`) - field names for getting only a subset of fields
592+
* `stop_on_error` (`?boolean`) - stop on a first error and report error
593+
regarding the failed operation and error about what tuples were not
594+
performed, default is `false`
595+
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
596+
rollback on a storage, where the operation is failed, report error
597+
about what tuples were rollback, default is `false`
598+
599+
Returns metadata and array of errors.
600+
Each error object can contain field `operation_data`.
601+
602+
`operation_data` field can contain:
603+
* tuple for which the error occurred;
604+
* object with an incorrect format;
605+
* tuple the operation on which was performed but
606+
operation was rollback;
607+
* tuple the operation on which was not performed
608+
because operation was stopped by error.
609+
610+
Right now CRUD cannot provide batch upsert with full consistency.
611+
CRUD offers batch upsert with partial consistency. That means
612+
that full consistency can be provided only on single replicaset
613+
using `box` transactions.
614+
615+
**Example:**
616+
617+
```lua
618+
crud.upsert_many('customers', {
619+
{{1, box.NULL, 'Elizabeth', 23}, {{'+', 'age', 1}}},
620+
{{2, box.NULL, 'Anastasia', 22}, {{'+', 'age', 2}, {'=', 'name', 'Oleg'}}}
621+
})
622+
---
623+
- metadata:
624+
- {'name': 'id', 'type': 'unsigned'}
625+
- {'name': 'bucket_id', 'type': 'unsigned'}
626+
- {'name': 'name', 'type': 'string'}
627+
- {'name': 'age', 'type': 'number'}
628+
629+
...
630+
crud.upsert_object_many('customers', {
631+
{{id = 3, name = 'Elizabeth', age = 24}, {{'+', 'age', 1}}},
632+
{{id = 10, name = 'Anastasia', age = 21}, {{'+', 'age', 2}}}
633+
})
634+
---
635+
- metadata:
636+
- {'name': 'id', 'type': 'unsigned'}
637+
- {'name': 'bucket_id', 'type': 'unsigned'}
638+
- {'name': 'name', 'type': 'string'}
639+
- {'name': 'age', 'type': 'number'}
640+
641+
-- Partial success
642+
local res, errs = crud.upsert_object_many('customers', {
643+
{{id = 22, name = 'Alex', age = 34}, {{'+', 'age', 12}}},
644+
{{id = 3, name = 'Anastasia', age = 22}, {{'=', 'age', 'invalid type'}}},
645+
{{id = 5, name = 'Sergey', age = 25}, {{'+', 'age', 10}}}
646+
})
647+
---
648+
res
649+
- metadata:
650+
- {'name': 'id', 'type': 'unsigned'}
651+
- {'name': 'bucket_id', 'type': 'unsigned'}
652+
- {'name': 'name', 'type': 'string'}
653+
- {'name': 'age', 'type': 'number'}
654+
655+
#errs -- 1
656+
errs[1].class_name -- BatchUpsertError
657+
errs[1].err -- 'Tuple field 4 (age) type does not match one required by operation <...>'
658+
errs[1].tuple -- {3, 2804, 'Anastasia', 22}
659+
...
660+
-- Partial success success with stop and rollback on error
661+
-- stop_on_error = true, rollback_on_error = true
662+
-- two error on one storage with rollback,
663+
-- inserts stop by error on this storage
664+
-- inserts before error are rollback
665+
local res, errs = crud.upsert_object_many('customers', {
666+
{{id = 6, name = 'Alex', age = 34}, {{'+', 'age', 1}}},
667+
{{id = 92, name = 'Artur', age = 29}, {{'+', 'age', 2}}},
668+
{{id = 3, name = 'Anastasia', age = 22}, {{'+', 'age', '3'}}},
669+
{{id = 4, name = 'Sergey', age = 25}, {{'+', 'age', 4}}},
670+
{{id = 9, name = 'Anna', age = 30}, {{'+', 'age', 5}}},
671+
{{id = 71, name = 'Oksana', age = 29}, {{'+', 'age', '6'}}},
672+
}, {
673+
stop_on_error = true,
674+
rollback_on_error = true,
675+
})
676+
res
677+
- metadata:
678+
- {'name': 'id', 'type': 'unsigned'}
679+
- {'name': 'bucket_id', 'type': 'unsigned'}
680+
- {'name': 'name', 'type': 'string'}
681+
- {'name': 'age', 'type': 'number'}
682+
#errs -- 4
683+
errs[1].class_name -- UpsertManyError
684+
errs[1].err -- 'Duplicate key exists <...>'
685+
errs[1].tuple -- {3, 2804, 'Anastasia', 22}
686+
687+
errs[2].class_name -- NotPerformedError
688+
errs[2].err -- 'Operation with tuple was not performed'
689+
errs[2].tuple -- {9, 1644, "Anna", 30}
690+
691+
errs[3].class_name -- NotPerformedError
692+
errs[3].err -- 'Operation with tuple was not performed'
693+
errs[3].tuple -- {71, 1802, "Oksana", 29}
694+
695+
errs[4].class_name -- NotPerformedError
696+
errs[4].err -- 'Operation with tuple was rollback'
697+
errs[4].tuple -- {92, 2040, "Artur", 29}
698+
```
570699

571700
### Select
572701

crud.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ local replace = require('crud.replace')
99
local get = require('crud.get')
1010
local update = require('crud.update')
1111
local upsert = require('crud.upsert')
12+
local upsert_many = require('crud.upsert_many')
1213
local delete = require('crud.delete')
1314
local select = require('crud.select')
1415
local truncate = require('crud.truncate')
@@ -60,6 +61,14 @@ crud.update = stats.wrap(update.call, stats.op.UPDATE)
6061
-- @function upsert
6162
crud.upsert = stats.wrap(upsert.tuple, stats.op.UPSERT)
6263

64+
-- @refer upsert_many.tuples
65+
-- @function upsert_many
66+
crud.upsert_many = upsert_many.tuples
67+
68+
-- @refer upsert_many.objects
69+
-- @function upsert_object_many
70+
crud.upsert_object_many = upsert_many.objects
71+
6372
-- @refer upsert.object
6473
-- @function upsert
6574
crud.upsert_object = stats.wrap(upsert.object, stats.op.UPSERT)
@@ -138,6 +147,7 @@ function crud.init_storage()
138147
replace.init()
139148
update.init()
140149
upsert.init()
150+
upsert_many.init()
141151
delete.init()
142152
select.init()
143153
truncate.init()

crud/common/map_call_cases/batch_insert_iter.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ function BatchInsertIterator:get()
7070
local replicaset = self.next_index
7171
local func_args = {
7272
self.space_name,
73-
self.next_batch,
73+
self.next_batch.tuples,
7474
self.opts,
7575
}
7676

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
local errors = require('errors')
2+
3+
local dev_checks = require('crud.common.dev_checks')
4+
local sharding = require('crud.common.sharding')
5+
6+
local BaseIterator = require('crud.common.map_call_cases.base_iter')
7+
8+
local SplitTuplesError = errors.new_class('SplitTuplesError')
9+
10+
local BatchUpsertIterator = {}
11+
-- inheritance from BaseIterator
12+
setmetatable(BatchUpsertIterator, {__index = BaseIterator})
13+
14+
--- Create new batch upsert iterator for map call
15+
--
16+
-- @function new
17+
--
18+
-- @tparam[opt] table opts
19+
-- Options of BatchUpsertIterator:new
20+
-- @tparam[opt] table opts.tuples
21+
-- Tuples to be upserted
22+
-- @tparam[opt] table opts.space
23+
-- Space to be upserted into
24+
-- @tparam[opt] table opts.operations
25+
-- Operations to be performed on tuples
26+
-- @tparam[opt] table opts.execute_on_storage_opts
27+
-- Additional opts for call on storage
28+
--
29+
-- @return[1] table iterator
30+
-- @treturn[2] nil
31+
-- @treturn[2] table of tables Error description
32+
function BatchUpsertIterator:new(opts)
33+
dev_checks('table', {
34+
tuples = 'table',
35+
space = 'table',
36+
operations = 'table',
37+
execute_on_storage_opts = 'table',
38+
})
39+
40+
local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space, {
41+
operations = opts.operations,
42+
})
43+
if err ~= nil then
44+
return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err)
45+
end
46+
47+
local next_replicaset, next_batch = next(sharding_data.batches)
48+
49+
local execute_on_storage_opts = opts.execute_on_storage_opts
50+
execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash
51+
execute_on_storage_opts.sharding_key_hash = sharding_data.sharding_key_hash
52+
execute_on_storage_opts.skip_sharding_hash_check = sharding_data.skip_sharding_hash_check
53+
54+
local iter = {
55+
space_name = opts.space.name,
56+
opts = execute_on_storage_opts,
57+
batches_by_replicasets = sharding_data.batches,
58+
next_index = next_replicaset,
59+
next_batch = next_batch,
60+
}
61+
62+
setmetatable(iter, self)
63+
self.__index = self
64+
65+
return iter
66+
end
67+
68+
--- Get function arguments and next replicaset
69+
--
70+
-- @function get
71+
--
72+
-- @return[1] table func_args
73+
-- @return[2] table replicaset
74+
function BatchUpsertIterator:get()
75+
local replicaset = self.next_index
76+
local func_args = {
77+
self.space_name,
78+
self.next_batch.tuples,
79+
self.next_batch.operations,
80+
self.opts,
81+
}
82+
83+
self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)
84+
85+
return func_args, replicaset
86+
end
87+
88+
return BatchUpsertIterator

crud/common/sharding/init.lua

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,12 @@ end
209209
-- @return[1] batches
210210
-- Map where key is a replicaset and value
211211
-- is table of tuples related to this replicaset
212-
function sharding.split_tuples_by_replicaset(tuples, space)
213-
dev_checks('table', 'table')
212+
function sharding.split_tuples_by_replicaset(tuples, space, opts)
213+
dev_checks('table', 'table', {
214+
operations = '?table',
215+
})
216+
217+
opts = opts or {}
214218

215219
local batches = {}
216220

@@ -219,7 +223,7 @@ function sharding.split_tuples_by_replicaset(tuples, space)
219223
local skip_sharding_hash_check
220224
local sharding_data
221225
local err
222-
for _, tuple in ipairs(tuples) do
226+
for i, tuple in ipairs(tuples) do
223227
sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space)
224228
if err ~= nil then
225229
return nil, BucketIDError:new("Failed to get bucket ID: %s", err)
@@ -244,9 +248,15 @@ function sharding.split_tuples_by_replicaset(tuples, space)
244248
sharding_data.bucket_id, err.err)
245249
end
246250

247-
local tuples_by_replicaset = batches[replicaset] or {}
248-
table.insert(tuples_by_replicaset, tuple)
249-
batches[replicaset] = tuples_by_replicaset
251+
local record_by_replicaset = batches[replicaset] or {tuples = {}}
252+
table.insert(record_by_replicaset.tuples, tuple)
253+
254+
if opts.operations ~= nil then
255+
record_by_replicaset.operations = record_by_replicaset.operations or {}
256+
table.insert(record_by_replicaset.operations, opts.operations[i])
257+
end
258+
259+
batches[replicaset] = record_by_replicaset
250260
end
251261

252262
return {

0 commit comments

Comments
 (0)