Skip to content

Commit 7a85550

Browse files
committed
Add bucket ref/unref for crud operations
In fast mode ref/unref do nothing. In safe mode they call bucket_refro,bucket_refrw, bucket_unrefo,bucket_unrefrw. Also added ref error handle from storages. On ref error router will reset bucket, change replicaset for single operations and retry request.
1 parent 134ead9 commit 7a85550

File tree

18 files changed

+478
-167
lines changed

18 files changed

+478
-167
lines changed

crud/common/call.lua

Lines changed: 39 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
local errors = require('errors')
2-
local vshard = require('vshard')
32

43
local call_cache = require('crud.common.call_cache')
54
local dev_checks = require('crud.common.dev_checks')
@@ -8,6 +7,7 @@ local sharding_utils = require('crud.common.sharding.utils')
87
local fiber = require('fiber')
98
local const = require('crud.common.const')
109
local rebalance = require('crud.common.rebalance')
10+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1111

1212
local BaseIterator = require('crud.common.map_call_cases.base_iter')
1313
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
@@ -20,53 +20,13 @@ local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/'
2020

2121
local call = {}
2222

23-
local function bucket_unref_many(bucket_ids, mode)
24-
local all_ok = true
25-
local last_err = nil
26-
for _, bucket_id in pairs(bucket_ids) do
27-
local ok, err = vshard.storage.bucket_unref(bucket_id, mode)
28-
if not ok then
29-
all_ok = nil
30-
last_err = err
31-
end
32-
end
33-
return all_ok, last_err
34-
end
35-
36-
local function bucket_ref_many(bucket_ids, mode)
37-
local reffed = {}
38-
for _, bucket_id in pairs(bucket_ids) do
39-
local ok, err = vshard.storage.bucket_ref(bucket_id, mode)
40-
if not ok then
41-
bucket_unref_many(reffed, mode)
42-
return nil, err
43-
end
44-
table.insert(reffed, bucket_id)
45-
end
46-
return true, nil
47-
end
48-
49-
local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...)
23+
local function call_on_storage_safe(run_as_user, func_name, ...)
5024
fiber.name(CRUD_CALL_FIBER_NAME .. 'safe/' .. func_name)
51-
52-
local ok, ref_err = bucket_ref_many(bucket_ids, mode)
53-
if not ok then
54-
return nil, ref_err
55-
end
56-
57-
local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)}
58-
59-
ok, ref_err = bucket_unref_many(bucket_ids, mode)
60-
if not ok then
61-
return nil, ref_err
62-
end
63-
64-
return unpack(res, 1, table.maxn(res))
25+
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
6526
end
6627

67-
local function call_on_storage_fast(run_as_user, _, _, func_name, ...)
28+
local function call_on_storage_fast(run_as_user, func_name, ...)
6829
fiber.name(CRUD_CALL_FIBER_NAME .. 'fast/' .. func_name)
69-
7030
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
7131
end
7232

@@ -149,10 +109,11 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
149109
))
150110
end
151111

152-
local function retry_call_with_master_discovery(vshard_router, replicaset,
153-
method, func_name, func_args,
154-
call_opts, mode, bucket_ids)
155-
local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)
112+
--- Executes a vshard call and retries once after performing recovery actions
113+
--- like bucket cache reset, destination redirect (for single calls), or master discovery.
114+
local function call_with_retry_and_recovery(vshard_router,
115+
replicaset, method, func_name, func_args, call_opts, is_single_call)
116+
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
156117

157118
-- In case cluster was just bootstrapped with auto master discovery,
158119
-- replicaset may miss master.
@@ -164,16 +125,29 @@ local function retry_call_with_master_discovery(vshard_router, replicaset,
164125

165126
-- This is a partial copy of error handling from vshard.router.router_call_impl()
166127
-- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
167-
if err.name == 'WRONG_BUCKET' or
168-
err.name == 'BUCKET_IS_LOCKED' or
169-
err.name == 'TRANSFER_IS_IN_PROGRESS' then
170-
vshard_router:_bucket_reset(err.bucket_id)
171-
172-
-- Substitute replicaset only for single bucket_id calls.
173-
if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then
174-
replicaset = vshard_router.replicasets[err.destination]
175-
else
176-
return nil, err
128+
-- may be we cant get this -- todo: check
129+
if err.class_name == bucket_ref_unref.BucketRefError.name then
130+
local redirect_replicaset
131+
if is_single_call and #err.bucket_ref_errs == 1 then
132+
local single_err = err.bucket_ref_errs[1]
133+
local destination = single_err.vshard_err.destination
134+
if destination and vshard_router.replicasets[destination] then
135+
redirect_replicaset = vshard_router.replicasets[destination]
136+
end
137+
end
138+
139+
for _, bucket_ref_err in pairs(err.bucket_ref_errs) do
140+
local bucket_id = bucket_ref_err.bucket_id
141+
local vshard_err = bucket_ref_err.vshard_err
142+
if vshard_err.name == 'WRONG_BUCKET' or
143+
vshard_err.name == 'BUCKET_IS_LOCKED' or
144+
vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then
145+
vshard_router:_bucket_reset(bucket_id)
146+
end
147+
end
148+
149+
if redirect_replicaset ~= nil then
150+
replicaset = redirect_replicaset
177151
end
178152
elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
179153
replicaset:locate_master()
@@ -227,10 +201,10 @@ function call.map(vshard_router, func_name, func_args, opts)
227201
request_timeout = opts.mode == 'read' and opts.request_timeout or nil,
228202
}
229203
while iter:has_next() do
230-
local args, replicaset, replicaset_id, bucket_ids = iter:get()
204+
local args, replicaset, replicaset_id = iter:get()
231205

232-
local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
233-
func_name, args, call_opts, opts.mode, bucket_ids)
206+
local future, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
207+
func_name, args, call_opts, false)
234208

235209
if err ~= nil then
236210
local result_info = {
@@ -303,9 +277,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
303277
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
304278
local request_timeout = opts.mode == 'read' and opts.request_timeout or nil
305279

306-
local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
307-
func_name, func_args, {timeout = timeout, request_timeout = request_timeout},
308-
opts.mode, {bucket_id})
280+
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
281+
func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, true)
309282
if err ~= nil then
310283
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
311284
end
@@ -330,9 +303,8 @@ function call.any(vshard_router, func_name, func_args, opts)
330303
end
331304
local replicaset_id, replicaset = next(replicasets)
332305

333-
local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call',
334-
func_name, func_args, {timeout = timeout},
335-
'read', {})
306+
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, 'call',
307+
func_name, func_args, {timeout = timeout}, false)
336308
if err ~= nil then
337309
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
338310
end

crud/common/map_call_cases/base_iter.lua

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,12 @@ end
6767
-- @return[1] table func_args
6868
-- @return[2] table replicaset
6969
-- @return[3] string replicaset_id
70-
-- @return[4] table bucket_ids
7170
function BaseIterator:get()
7271
local replicaset_id = self.next_index
7372
local replicaset = self.next_replicaset
7473
self.next_index, self.next_replicaset = next(self.replicasets, self.next_index)
7574

76-
return self.func_args, replicaset, replicaset_id, {}
75+
return self.func_args, replicaset, replicaset_id
7776
end
7877

7978
return BaseIterator

crud/common/map_call_cases/batch_insert_iter.lua

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ end
6868
-- @return[1] table func_args
6969
-- @return[2] table replicaset
7070
-- @return[3] string replicaset_id
71-
-- @return[4] table bucket_ids
7271
function BatchInsertIterator:get()
7372
local replicaset_id = self.next_index
7473
local replicaset = self.next_batch.replicaset
@@ -77,11 +76,10 @@ function BatchInsertIterator:get()
7776
self.next_batch.tuples,
7877
self.opts,
7978
}
80-
local bucket_ids = self.next_batch.bucket_ids
8179

8280
self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)
8381

84-
return func_args, replicaset, replicaset_id, bucket_ids
82+
return func_args, replicaset, replicaset_id
8583
end
8684

8785
return BatchInsertIterator

crud/common/map_call_cases/batch_upsert_iter.lua

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ end
7676
-- @return[1] table func_args
7777
-- @return[2] table replicaset
7878
-- @return[3] string replicaset_id
79-
-- @return[4] table bucket_ids
8079
function BatchUpsertIterator:get()
8180
local replicaset_id = self.next_index
8281
local replicaset = self.next_batch.replicaset
@@ -86,11 +85,10 @@ function BatchUpsertIterator:get()
8685
self.next_batch.operations,
8786
self.opts,
8887
}
89-
local bucket_ids = self.next_batch.bucket_ids
9088

9189
self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)
9290

93-
return func_args, replicaset, replicaset_id, bucket_ids
91+
return func_args, replicaset, replicaset_id
9492
end
9593

9694
return BatchUpsertIterator

0 commit comments

Comments
 (0)