diff --git a/crud/common/call.lua b/crud/common/call.lua index 66616bb9..40bd8351 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -1,5 +1,4 @@ local errors = require('errors') -local vshard = require('vshard') local call_cache = require('crud.common.call_cache') local dev_checks = require('crud.common.dev_checks') @@ -8,6 +7,7 @@ local sharding_utils = require('crud.common.sharding.utils') local fiber = require('fiber') local const = require('crud.common.const') local rebalance = require('crud.common.rebalance') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local BaseIterator = require('crud.common.map_call_cases.base_iter') local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') @@ -20,53 +20,13 @@ local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/' local call = {} -local function bucket_unref_many(bucket_ids, mode) - local all_ok = true - local last_err = nil - for _, bucket_id in pairs(bucket_ids) do - local ok, err = vshard.storage.bucket_unref(bucket_id, mode) - if not ok then - all_ok = nil - last_err = err - end - end - return all_ok, last_err -end - -local function bucket_ref_many(bucket_ids, mode) - local reffed = {} - for _, bucket_id in pairs(bucket_ids) do - local ok, err = vshard.storage.bucket_ref(bucket_id, mode) - if not ok then - bucket_unref_many(reffed, mode) - return nil, err - end - table.insert(reffed, bucket_id) - end - return true, nil -end - -local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...) +local function call_on_storage_safe(run_as_user, func_name, ...) fiber.name(CRUD_CALL_FIBER_NAME .. 'safe/' .. func_name) - - local ok, ref_err = bucket_ref_many(bucket_ids, mode) - if not ok then - return nil, ref_err - end - - local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)} - - ok, ref_err = bucket_unref_many(bucket_ids, mode) - if not ok then - return nil, ref_err - end - - return unpack(res, 1, table.maxn(res)) + return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) end -local function call_on_storage_fast(run_as_user, _, _, func_name, ...) +local function call_on_storage_fast(run_as_user, func_name, ...) fiber.name(CRUD_CALL_FIBER_NAME .. 'fast/' .. func_name) - return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) end @@ -149,10 +109,11 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc )) end -local function retry_call_with_master_discovery(vshard_router, replicaset, - method, func_name, func_args, - call_opts, mode, bucket_ids) - local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) +--- Executes a vshard call and retries once after performing recovery actions +--- like bucket cache reset, destination redirect (for single calls), or master discovery. +local function call_with_retry_and_recovery(vshard_router, + replicaset, method, func_name, func_args, call_opts, is_single_call) + local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) -- In case cluster was just bootstrapped with auto master discovery, -- replicaset may miss master. @@ -164,16 +125,28 @@ local function retry_call_with_master_discovery(vshard_router, replicaset, -- This is a partial copy of error handling from vshard.router.router_call_impl() -- It is much simpler mostly because bucket_set() can't be accessed from outside vshard. - if err.name == 'WRONG_BUCKET' or - err.name == 'BUCKET_IS_LOCKED' or - err.name == 'TRANSFER_IS_IN_PROGRESS' then - vshard_router:_bucket_reset(err.bucket_id) - - -- Substitute replicaset only for single bucket_id calls. - if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then - replicaset = vshard_router.replicasets[err.destination] - else - return nil, err + if err.class_name == bucket_ref_unref.BucketRefError.name then + local redirect_replicaset + if is_single_call and #err.bucket_ref_errs == 1 then + local single_err = err.bucket_ref_errs[1] + local destination = single_err.vshard_err.destination + if destination and vshard_router.replicasets[destination] then + redirect_replicaset = vshard_router.replicasets[destination] + end + end + + for _, bucket_ref_err in pairs(err.bucket_ref_errs) do + local bucket_id = bucket_ref_err.bucket_id + local vshard_err = bucket_ref_err.vshard_err + if vshard_err.name == 'WRONG_BUCKET' or + vshard_err.name == 'BUCKET_IS_LOCKED' or + vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then + vshard_router:_bucket_reset(bucket_id) + end + end + + if redirect_replicaset ~= nil then + replicaset = redirect_replicaset end elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then replicaset:locate_master() @@ -227,10 +200,10 @@ function call.map(vshard_router, func_name, func_args, opts) request_timeout = opts.mode == 'read' and opts.request_timeout or nil, } while iter:has_next() do - local args, replicaset, replicaset_id, bucket_ids = iter:get() + local args, replicaset, replicaset_id = iter:get() - local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name, - func_name, args, call_opts, opts.mode, bucket_ids) + local future, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name, + func_name, args, call_opts, false) if err ~= nil then local result_info = { @@ -303,9 +276,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts) local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local request_timeout = opts.mode == 'read' and opts.request_timeout or nil - local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name, - func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, - opts.mode, {bucket_id}) + local res, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name, + func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, true) if err ~= nil then return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id) end @@ -330,9 +302,8 @@ function call.any(vshard_router, func_name, func_args, opts) end local replicaset_id, replicaset = next(replicasets) - local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call', - func_name, func_args, {timeout = timeout}, - 'read', {}) + local res, err = call_with_retry_and_recovery(vshard_router, replicaset, 'call', + func_name, func_args, {timeout = timeout}, false) if err ~= nil then return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id) end diff --git a/crud/common/map_call_cases/base_iter.lua b/crud/common/map_call_cases/base_iter.lua index e52c8eb4..452e1599 100644 --- a/crud/common/map_call_cases/base_iter.lua +++ b/crud/common/map_call_cases/base_iter.lua @@ -67,13 +67,12 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id --- @return[4] table bucket_ids function BaseIterator:get() local replicaset_id = self.next_index local replicaset = self.next_replicaset self.next_index, self.next_replicaset = next(self.replicasets, self.next_index) - return self.func_args, replicaset, replicaset_id, {} + return self.func_args, replicaset, replicaset_id end return BaseIterator diff --git a/crud/common/map_call_cases/batch_insert_iter.lua b/crud/common/map_call_cases/batch_insert_iter.lua index 2c22a798..37867f1b 100644 --- a/crud/common/map_call_cases/batch_insert_iter.lua +++ b/crud/common/map_call_cases/batch_insert_iter.lua @@ -68,7 +68,6 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id --- @return[4] table bucket_ids function BatchInsertIterator:get() local replicaset_id = self.next_index local replicaset = self.next_batch.replicaset @@ -77,11 +76,10 @@ function BatchInsertIterator:get() self.next_batch.tuples, self.opts, } - local bucket_ids = self.next_batch.bucket_ids self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset, replicaset_id, bucket_ids + return func_args, replicaset, replicaset_id end return BatchInsertIterator diff --git a/crud/common/map_call_cases/batch_upsert_iter.lua b/crud/common/map_call_cases/batch_upsert_iter.lua index 658db9d4..d25e3ee8 100644 --- a/crud/common/map_call_cases/batch_upsert_iter.lua +++ b/crud/common/map_call_cases/batch_upsert_iter.lua @@ -76,7 +76,6 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id --- @return[4] table bucket_ids function BatchUpsertIterator:get() local replicaset_id = self.next_index local replicaset = self.next_batch.replicaset @@ -86,11 +85,10 @@ function BatchUpsertIterator:get() self.next_batch.operations, self.opts, } - local bucket_ids = self.next_batch.bucket_ids self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset, replicaset_id, bucket_ids + return func_args, replicaset, replicaset_id end return BatchUpsertIterator diff --git a/crud/common/sharding/bucket_ref_unref.lua b/crud/common/sharding/bucket_ref_unref.lua new file mode 100644 index 00000000..aa6404c2 --- /dev/null +++ b/crud/common/sharding/bucket_ref_unref.lua @@ -0,0 +1,183 @@ +--- module to call vshard.storage.bucket_ref / vshard.storage.bucket_unref +--- on write requests +--- there are two modes: safe and fast. on safe mode module +--- calls vshard.storage.bucket_ref / vshard.storage.bucket_unref +--- on fast mode it does nothing. +--- default is fast mode. + +--- bucket_refw and bucket_unrefrw must be called in one transaction in order to prevent +--- safe_mode change during execution. + +local vshard = require('vshard') +local errors = require('errors') +local rebalance = require('crud.common.rebalance') + +local safe_methods +local fast_methods + +local M = { + BucketRefError = errors.new_class('bucket_ref_error', {capture_stack = false}) +} + +local function make_bucket_ref_err(bucket_id, vshard_ref_err) + local err = M.BucketRefError:new(M.BucketRefError:new( + "failed bucket_ref: %s, bucket_id: %s", + vshard_ref_err.name, + bucket_id + )) + err.bucket_ref_errs = { + { + bucket_id = bucket_id, + vshard_err = vshard_ref_err, + } + } + return err +end + +--- on module initialization safe_mode_status func must be set +--- it's rebalance.safe_mode_status +function M.safe_mode_status() + error('safe_mode_status not set') +end + +--- Slow bucket_refrw implementation that calls vshard.storage.bucket_refrw. +--- must be called with bucket_unrefrw in transaction +function M._bucket_refrw(bucket_id) + local ref_ok, vshard_ref_err = vshard.storage.bucket_refrw(bucket_id) + if not ref_ok then + return false, make_bucket_ref_err(bucket_id, vshard_ref_err) + end + + return true +end + +--- Slow bucket_unrefrw implementation that calls vshard.storage.bucket_unrefrw. +--- must be called with bucket_refrw in transaction +function M._bucket_unrefrw(bucket_id) + return vshard.storage.bucket_unrefrw(bucket_id) +end + +--- Slow bucket_refro implementation that calls vshard.storage.bucket_refro. +function M._bucket_refro(bucket_id) + local ref_ok, vshard_ref_err = vshard.storage.bucket_refro(bucket_id) + if not ref_ok then + return false, make_bucket_ref_err(bucket_id, vshard_ref_err) + end + + return true +end + +--- Slow bucket_unrefro implementation that calls vshard.storage.bucket_unrefro. +--- must be called in one transaction with bucket_refrw_many +function M._bucket_unrefro(bucket_id) + return vshard.storage.bucket_unrefro(bucket_id) +end + +--- Slow bucket_refrw_many that calls bucket_refrw for every bucket and aggregates errors +--- @param bucket_ids table +function M._bucket_refrw_many(bucket_ids) + local bucket_ref_errs = {} + local reffed_bucket_ids = {} + for bucket_id in pairs(bucket_ids) do + local ref_ok, bucket_refrw_err = safe_methods.bucket_refrw(bucket_id) + if not ref_ok then + + table.insert(bucket_ref_errs, bucket_refrw_err.bucket_ref_errs[1]) + goto continue + end + + reffed_bucket_ids[bucket_id] = true + ::continue:: + end + + if next(bucket_ref_errs) ~= nil then + local err = M.BucketRefError:new(M.BucketRefError:new("failed bucket_ref")) + err.bucket_ref_errs = bucket_ref_errs + safe_methods.bucket_unrefrw_many(reffed_bucket_ids) + return nil, err + end + + return true +end + +--- Slow bucket_unrefrw_many that calls vshard.storage.bucket_unrefrw for every bucket. +--- must be called in one transaction with bucket_refrw_many +--- will never happen in called in one transaction with bucket_refrw_many +--- @param bucket_ids table +function M._bucket_unrefrw_many(bucket_ids) + local unref_all_ok = true + local unref_last_err + for reffed_bucket_id in pairs(bucket_ids) do + local unref_ok, unref_err = safe_methods.bucket_unrefrw(reffed_bucket_id) + if not unref_ok then + unref_all_ok = nil + unref_last_err = unref_err + end + end + + if not unref_all_ok then + return nil, unref_last_err + end + return true +end + +--- _fast implements module logic for fast mode +function M._fast() + return true +end + +safe_methods = { + bucket_refrw = M._bucket_refrw, + bucket_unrefrw = M._bucket_unrefrw, + bucket_refro = M._bucket_refro, + bucket_unrefro = M._bucket_unrefro, + bucket_refrw_many = M._bucket_refrw_many, + bucket_unrefrw_many = M._bucket_unrefrw_many, +} + +fast_methods = { + bucket_refrw = M._fast, + bucket_unrefrw = M._fast, + bucket_refro = M._fast, + bucket_unrefro = M._fast, + bucket_refrw_many = M._fast, + bucket_unrefrw_many = M._fast, +} + +local function set_methods(methods) + for method_name, func in pairs(methods) do + M[method_name] = func + end +end + +local function set_safe_mode() + set_methods(safe_methods) +end + +local function set_fast_mode() + set_methods(fast_methods) +end + +local hooks_registered = false + +--- set safe mode func +--- from rebalance.safe_mode_status +function M.set_safe_mode_status(safe_mode_status) + M.safe_mode_status = safe_mode_status + + if safe_mode_status() then + set_safe_mode() + else + set_fast_mode() + end + + if not hooks_registered then + rebalance.register_safe_mode_enable_hook(set_safe_mode) + rebalance.register_safe_mode_disable_hook(set_fast_mode) + hooks_registered = true + end +end + +set_fast_mode() + +return M diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index 83314391..02bb6beb 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -324,10 +324,8 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) local record_by_replicaset = batches[replicaset_id] or { replicaset = replicaset, tuples = {}, - bucket_ids = {}, } table.insert(record_by_replicaset.tuples, tuple) - record_by_replicaset.bucket_ids[sharding_data.bucket_id] = true if opts.operations ~= nil then record_by_replicaset.operations = record_by_replicaset.operations or {} @@ -337,14 +335,6 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) batches[replicaset_id] = record_by_replicaset end - for _, rbr in pairs(batches) do - local bucket_ids = {} - for bid, _ in pairs(rbr.bucket_ids) do - table.insert(bucket_ids, bid) - end - rbr.bucket_ids = bucket_ids - end - return { batches = batches, sharding_func_hash = sharding_func_hash, diff --git a/crud/delete.lua b/crud/delete.lua index 6f15b9af..4c78692d 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local DeleteError = errors.new_class('DeleteError', {capture_stack = false}) @@ -19,6 +20,7 @@ local CRUD_DELETE_FUNC_NAME = utils.get_storage_call(DELETE_FUNC_NAME) local function delete_on_storage(space_name, key, field_names, opts) dev_checks('string', '?', '?table', { + bucket_id = 'number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -42,14 +44,29 @@ local function delete_on_storage(space_name, key, field_names, opts) return nil, err end - -- add_space_schema_hash is false because - -- reloading space format on router can't avoid delete error on storage - return schema.wrap_box_space_func_result(space, 'delete', {key}, { - add_space_schema_hash = false, - field_names = field_names, - noreturn = opts.noreturn, - fetch_latest_metadata = opts.fetch_latest_metadata, - }) + local function make_delete() + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(opts.bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end + + -- add_space_schema_hash is false because + -- reloading space format on router can't avoid delete error on storage + local result = schema.wrap_box_space_func_result(space, 'delete', {key}, { + add_space_schema_hash = false, + field_names = field_names, + noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, + }) + local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(opts.bucket_id) + if not unref_ok then + return nil, err_unref + end + + return result + end + + return box.atomic(make_delete) end delete.storage_api = {[DELETE_FUNC_NAME] = delete_on_storage} @@ -116,6 +133,7 @@ local function call_delete_on_router(vshard_router, space_name, key, opts) sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id) local delete_on_storage_opts = { + bucket_id = bucket_id_data.bucket_id, sharding_func_hash = bucket_id_data.sharding_func_hash, sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, diff --git a/crud/get.lua b/crud/get.lua index feca57c5..8f98c572 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local GetError = errors.new_class('GetError', {capture_stack = false}) @@ -19,6 +20,7 @@ local CRUD_GET_FUNC_NAME = utils.get_storage_call(GET_FUNC_NAME) local function get_on_storage(space_name, key, field_names, opts) dev_checks('string', '?', '?table', { + bucket_id = 'number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -41,13 +43,23 @@ local function get_on_storage(space_name, key, field_names, opts) return nil, err end + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refro(opts.bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end -- add_space_schema_hash is false because -- reloading space format on router can't avoid get error on storage - return schema.wrap_box_space_func_result(space, 'get', {key}, { + local result = schema.wrap_box_space_func_result(space, 'get', {key}, { add_space_schema_hash = false, field_names = field_names, fetch_latest_metadata = opts.fetch_latest_metadata, }) + local unref_ok, err_unref = bucket_ref_unref.bucket_unrefro(opts.bucket_id) + if not unref_ok then + return nil, err_unref + end + + return result end get.storage_api = {[GET_FUNC_NAME] = get_on_storage} @@ -114,6 +126,7 @@ local function call_get_on_router(vshard_router, space_name, key, opts) sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id) local get_on_storage_opts = { + bucket_id = bucket_id_data.bucket_id, sharding_func_hash = bucket_id_data.sharding_func_hash, sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, diff --git a/crud/insert.lua b/crud/insert.lua index ebb3f865..9eba289a 100644 --- a/crud/insert.lua +++ b/crud/insert.lua @@ -7,6 +7,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local InsertError = errors.new_class('InsertError', {capture_stack = false}) @@ -42,15 +43,33 @@ local function insert_on_storage(space_name, tuple, opts) return nil, err end - -- add_space_schema_hash is true only in case of insert_object - -- the only one case when reloading schema can avoid insert error - -- is flattening object on router - return schema.wrap_box_space_func_result(space, 'insert', {tuple}, { - add_space_schema_hash = opts.add_space_schema_hash, - field_names = opts.fields, - noreturn = opts.noreturn, - fetch_latest_metadata = opts.fetch_latest_metadata, - }) + local function make_insert() + local bucket_id = tuple[utils.get_bucket_id_fieldno(space)] + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id) + + if not ref_ok then + return nil, bucket_ref_err + end + + -- add_space_schema_hash is true only in case of insert_object + -- the only one case when reloading schema can avoid insert error + -- is flattening object on router + local result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, { + add_space_schema_hash = opts.add_space_schema_hash, + field_names = opts.fields, + noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, + }) + + local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id) + if not unref_ok then + return nil, err_unref + end + + return result + end + + return box.atomic(make_insert) end insert.storage_api = {[INSERT_FUNC_NAME] = insert_on_storage} diff --git a/crud/insert_many.lua b/crud/insert_many.lua index f4299fd6..2864bd8a 100644 --- a/crud/insert_many.lua +++ b/crud/insert_many.lua @@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter') local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor') @@ -48,6 +49,16 @@ local function insert_many_on_storage(space_name, tuples, opts) return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples) end + local bucket_ids = {} + for _, tuple in ipairs(tuples) do + bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true + end + + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw_many(bucket_ids) + if not ref_ok then + return nil, bucket_ref_err + end + local inserted_tuples = {} local errs = {} local replica_schema_version = nil @@ -84,7 +95,11 @@ local function insert_many_on_storage(space_name, tuples, opts) end if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(inserted_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, inserted_tuples) @@ -93,7 +108,11 @@ local function insert_many_on_storage(space_name, tuples, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, errs, replica_schema_version end @@ -104,7 +123,11 @@ local function insert_many_on_storage(space_name, tuples, opts) if next(errs) ~= nil then if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(inserted_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, inserted_tuples) @@ -113,12 +136,20 @@ local function insert_many_on_storage(space_name, tuples, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, nil, replica_schema_version end diff --git a/crud/replace.lua b/crud/replace.lua index 5e36906f..e39d319d 100644 --- a/crud/replace.lua +++ b/crud/replace.lua @@ -7,6 +7,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local ReplaceError = errors.new_class('ReplaceError', { capture_stack = false }) @@ -42,15 +43,29 @@ local function replace_on_storage(space_name, tuple, opts) return nil, err end - -- add_space_schema_hash is true only in case of replace_object - -- the only one case when reloading schema can avoid insert error - -- is flattening object on router - return schema.wrap_box_space_func_result(space, 'replace', {tuple}, { - add_space_schema_hash = opts.add_space_schema_hash, - field_names = opts.fields, - noreturn = opts.noreturn, - fetch_latest_metadata = opts.fetch_latest_metadata, - }) + local function make_replace() + local bucket_id = tuple[utils.get_bucket_id_fieldno(space)] + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end + -- add_space_schema_hash is true only in case of replace_object + -- the only one case when reloading schema can avoid insert error + -- is flattening object on router + local result = schema.wrap_box_space_func_result(space, 'replace', {tuple}, { + add_space_schema_hash = opts.add_space_schema_hash, + field_names = opts.fields, + noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, + }) + local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id) + if not unref_ok then + return nil, err_unref + end + return result + end + + return box.atomic(make_replace) end replace.storage_api = {[REPLACE_FUNC_NAME] = replace_on_storage} diff --git a/crud/replace_many.lua b/crud/replace_many.lua index d047a3b0..39eb90eb 100644 --- a/crud/replace_many.lua +++ b/crud/replace_many.lua @@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter') local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor') @@ -52,6 +53,16 @@ local function replace_many_on_storage(space_name, tuples, opts) local errs = {} local replica_schema_version = nil + local bucket_ids = {} + for _, tuple in ipairs(tuples) do + bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true + end + + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw_many(bucket_ids) + if not ref_ok then + return nil, bucket_ref_err + end + box.begin() for i, tuple in ipairs(tuples) do -- add_space_schema_hash is true only in case of replace_object_many @@ -87,17 +98,23 @@ local function replace_many_on_storage(space_name, tuples, opts) end if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(inserted_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, inserted_tuples) end - return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() - + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, errs, replica_schema_version end end @@ -107,7 +124,11 @@ local function replace_many_on_storage(space_name, tuples, opts) if next(errs) ~= nil then if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(inserted_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, inserted_tuples) @@ -116,13 +137,20 @@ local function replace_many_on_storage(space_name, tuples, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() - + if not unref_ok then + return nil, bucket_unref_err + end return inserted_tuples, nil, replica_schema_version end diff --git a/crud/select/merger.lua b/crud/select/merger.lua index 48dfe1e4..1fd75320 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -171,11 +171,7 @@ local function fetch_chunk(context, state) -- change context.func_args too, but it does not matter next_func_args[4].after_tuple = cursor.after_tuple - local mode = "read" - local bucket_ids = {} - local func_args_ext = utils.append_array( - { box.session.effective_user(), bucket_ids, mode, func_name }, - next_func_args) + local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, next_func_args) if context.readview then next_state = {future = context.future_replica.conn:call("_crud.call_on_storage", @@ -207,10 +203,7 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_ local buf = buffer.ibuf() local net_box_opts = {is_async = true, buffer = buf, skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} - local bucket_ids = {} - local func_args_ext = utils.append_array( - { box.session.effective_user(), bucket_ids, mode, func_name }, - func_args) + local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) local future = replicaset[vshard_call_name](replicaset, "_crud.call_on_storage", func_args_ext, net_box_opts) @@ -286,13 +279,8 @@ local function new_readview(vshard_router, replicasets, readview_info, space, in local net_box_opts = {is_async = true, buffer = buf, skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} func_args[4].readview_id = replicaset_info.id - local mode = "read" - local bucket_ids = {} - local func_args_ext = utils.append_array( - { box.session.effective_user(), bucket_ids, mode, func_name }, - func_args) - local future = replica.conn:call("_crud.call_on_storage", - func_args_ext, net_box_opts) + local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) + local future = replica.conn:call("_crud.call_on_storage", func_args_ext, net_box_opts) -- Create a source. local context = { diff --git a/crud/storage.lua b/crud/storage.lua index c6356008..eccef0d8 100644 --- a/crud/storage.lua +++ b/crud/storage.lua @@ -23,6 +23,7 @@ local count = require('crud.count') local borders = require('crud.borders') local readview = require('crud.readview') local storage_info = require('crud.storage_info') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local storage = {} @@ -106,6 +107,7 @@ local function init_impl() end rebalance.init() + bucket_ref_unref.set_safe_mode_status(rebalance.safe_mode_status) for _, module in ipairs(modules_with_storage_api) do init_storage_call(user, module.storage_api) diff --git a/crud/update.lua b/crud/update.lua index 36ce242d..d6b646fc 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local UpdateError = errors.new_class('UpdateError', {capture_stack = false}) @@ -19,6 +20,7 @@ local CRUD_UPDATE_FUNC_NAME = utils.get_storage_call(UPDATE_FUNC_NAME) local function update_on_storage(space_name, key, operations, field_names, opts) dev_checks('string', '?', 'table', '?table', { + bucket_id = 'number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -42,39 +44,45 @@ local function update_on_storage(space_name, key, operations, field_names, opts) return nil, err end - -- add_space_schema_hash is false because - -- reloading space format on router can't avoid update error on storage - local res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, { - add_space_schema_hash = false, - field_names = field_names, - noreturn = opts.noreturn, - fetch_latest_metadata = opts.fetch_latest_metadata, - }) - - if err ~= nil then - return nil, err - end - - if res.err == nil then - return res, nil - end + local function make_update() + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(opts.bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end - -- Relevant for Tarantool older than 2.8.1. - -- We can only add fields to end of the tuple. - -- If schema is updated and nullable fields are added, then we will get error. - -- Therefore, we need to add filling of intermediate nullable fields. - -- More details: https://github.com/tarantool/tarantool/issues/3378 - if utils.is_field_not_found(res.err.code) then - operations = utils.add_intermediate_nullable_fields(operations, space:format(), space:get(key)) - res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, { + -- add_space_schema_hash is false because + -- reloading space format on router can't avoid update error on storage + local res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, { add_space_schema_hash = false, field_names = field_names, noreturn = opts.noreturn, fetch_latest_metadata = opts.fetch_latest_metadata, }) + + if err == nil and res.err ~= nil and utils.is_field_not_found(res.err.code) then + -- Relevant for Tarantool older than 2.8.1. + -- We can only add fields to end of the tuple. + -- If schema is updated and nullable fields are added, then we will get error. + -- Therefore, we need to add filling of intermediate nullable fields. + -- More details: https://github.com/tarantool/tarantool/issues/3378 + operations = utils.add_intermediate_nullable_fields(operations, space:format(), space:get(key)) + res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, { + add_space_schema_hash = false, + field_names = field_names, + noreturn = opts.noreturn, + fetch_latest_metadata = opts.fetch_latest_metadata, + }) + end + + local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(opts.bucket_id) + if not unref_ok then + return nil, err_unref + end + + return res, err end - return res, err + return box.atomic(make_update) end update.storage_api = {[UPDATE_FUNC_NAME] = update_on_storage} @@ -148,6 +156,7 @@ local function call_update_on_router(vshard_router, space_name, key, user_operat sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id) local update_on_storage_opts = { + bucket_id = bucket_id_data.bucket_id, sharding_func_hash = bucket_id_data.sharding_func_hash, sharding_key_hash = sharding_key_hash, skip_sharding_hash_check = skip_sharding_hash_check, diff --git a/crud/upsert.lua b/crud/upsert.lua index 5be7bc4a..319ad09c 100644 --- a/crud/upsert.lua +++ b/crud/upsert.lua @@ -7,6 +7,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local UpsertError = errors.new_class('UpsertError', { capture_stack = false}) @@ -41,13 +42,30 @@ local function upsert_on_storage(space_name, tuple, operations, opts) return nil, err end - -- add_space_schema_hash is true only in case of upsert_object - -- the only one case when reloading schema can avoid insert error - -- is flattening object on router - return schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, { - add_space_schema_hash = opts.add_space_schema_hash, - fetch_latest_metadata = opts.fetch_latest_metadata, - }) + local function make_upsert() + local bucket_id = tuple[utils.get_bucket_id_fieldno(space)] + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id) + if not ref_ok then + return nil, bucket_ref_err + end + + -- add_space_schema_hash is true only in case of upsert_object + -- the only one case when reloading schema can avoid insert error + -- is flattening object on router + local result = schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, { + add_space_schema_hash = opts.add_space_schema_hash, + fetch_latest_metadata = opts.fetch_latest_metadata, + }) + + local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id) + if not unref_ok then + return nil, err_unref + end + + return result + end + + return box.atomic(make_upsert) end upsert.storage_api = {[UPSERT_FUNC_NAME] = upsert_on_storage} diff --git a/crud/upsert_many.lua b/crud/upsert_many.lua index f030e778..b242f577 100644 --- a/crud/upsert_many.lua +++ b/crud/upsert_many.lua @@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref') local BatchUpsertIterator = require('crud.common.map_call_cases.batch_upsert_iter') local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor') @@ -47,6 +48,16 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples) end + local bucket_ids = {} + for _, tuple in ipairs(tuples) do + bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true + end + + local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw_many(bucket_ids) + if not ref_ok then + return nil, bucket_ref_err + end + local processed_tuples = {} local errs = {} local replica_schema_version = nil @@ -81,7 +92,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) end if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(processed_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, processed_tuples) @@ -90,7 +105,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return nil, errs, replica_schema_version end @@ -101,7 +120,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) if next(errs) ~= nil then if opts.rollback_on_error == true then + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.rollback() + if not unref_ok then + return nil, bucket_unref_err + end if next(processed_tuples) then errs = batching_utils.complement_batching_errors(errs, batching_utils.rollback_on_error_msg, processed_tuples) @@ -110,12 +133,20 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts) return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return nil, errs, replica_schema_version end + local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids) box.commit() + if not unref_ok then + return nil, bucket_unref_err + end return nil, nil, replica_schema_version end diff --git a/test/integration/cartridge_reload_test.lua b/test/integration/cartridge_reload_test.lua index 5ddeeacf..130dffcf 100644 --- a/test/integration/cartridge_reload_test.lua +++ b/test/integration/cartridge_reload_test.lua @@ -113,6 +113,11 @@ function g.test_storage() t.skip_if(not helpers.is_cartridge_hotreload_supported(), "Cartridge roles reload is not supported") helpers.skip_old_tarantool_cartridge_hotreload() + helpers.call_on_storages(g.cluster, function(server) + server.net_box:eval([[ + require('crud.common.rebalance').safe_mode_disable() + ]]) + end) g.highload_fiber = fiber.new(highload_loop, 'B') diff --git a/test/integration/double_buckets_test.lua b/test/integration/double_buckets_test.lua index 2c3922cf..2e371ec1 100644 --- a/test/integration/double_buckets_test.lua +++ b/test/integration/double_buckets_test.lua @@ -57,11 +57,11 @@ end) --- Rebalance stalls if we move all buckets at once; use a small subset. local test_tuples = { {22, box.NULL, 'Alex', 34}, - -- {92, box.NULL, 'Artur', 29}, - -- {3, box.NULL, 'Anastasia', 22}, - -- {5, box.NULL, 'Sergey', 25}, - -- {9, box.NULL, 'Anna', 30}, - -- {71, box.NULL, 'Oksana', 29}, + {92, box.NULL, 'Artur', 29}, + {3, box.NULL, 'Anastasia', 22}, + {5, box.NULL, 'Sergey', 25}, + {9, box.NULL, 'Anna', 30}, + {71, box.NULL, 'Oksana', 29}, } local last_call = fiber.time() @@ -120,7 +120,6 @@ pgroup_duplicates.test_duplicates = function(g) 'test implemented only for 3.1 and greater' ) if g.params.backend == "config" then - t.xfail('not implemented yet') duplicate_operations[g.params.operation](g) local cfg = g.cluster:cfg() @@ -246,8 +245,8 @@ pgroup_not_applied.test_not_applied = function(g) ), 'test implemented only for 3.1 and greater' ) + t.skip_if(g.params.operation == 'get' and g.params.safe_mode == false, 'todo: rework get') if g.params.backend == "config" then - t.xfail('not implemented yet') local tuples, tuples_count = {}, 1000 for i = 1, tuples_count do tuples[i] = {i, box.NULL, 'John Fedor', 42} diff --git a/test/integration/privileges_test.lua b/test/integration/privileges_test.lua index 74816417..ec8f8da5 100644 --- a/test/integration/privileges_test.lua +++ b/test/integration/privileges_test.lua @@ -123,6 +123,9 @@ local function privilegies_test_base_init(g, access_operation_type) if access_operation_type and box.space.customers then box.schema.user.grant('testuser1', access_operation_type, 'space', 'customers') end + if box.space._bucket then + box.schema.user.grant('testuser1', 'read', 'space', '_bucket') + end box.session.su(user) end diff --git a/test/integration/select_readview_test.lua b/test/integration/select_readview_test.lua index 980f123e..fed95584 100644 --- a/test/integration/select_readview_test.lua +++ b/test/integration/select_readview_test.lua @@ -2353,6 +2353,11 @@ end) pgroup.test_select_switch_master = function(g) helpers.skip_not_cartridge_backend(g.params.backend) + helpers.call_on_storages(g.cluster, function(server) + server.net_box:eval([[ + require('crud.common.rebalance').safe_mode_disable() + ]]) + end) helpers.insert_objects(g, 'customers', { { diff --git a/test/unit/call_test.lua b/test/unit/call_test.lua index f4e21926..912fc692 100644 --- a/test/unit/call_test.lua +++ b/test/unit/call_test.lua @@ -242,6 +242,11 @@ pgroup.test_any_vshard_call = function(g) end pgroup.test_any_vshard_call_timeout = function(g) + helpers.call_on_storages(g.cluster, function(server) + server.net_box:eval([[ + require('crud.common.rebalance').safe_mode_disable() + ]]) + end) local timeout = 0.2 local results, err = g.router:eval([[ diff --git a/test/unit/privileges_test.lua b/test/unit/privileges_test.lua index 7a3b6036..51cec2c5 100644 --- a/test/unit/privileges_test.lua +++ b/test/unit/privileges_test.lua @@ -17,14 +17,13 @@ g.before_all(function() end) g.test_prepend_current_user_smoke = function() - local res = call.storage_api.call_on_storage( - box.session.effective_user(), {}, "read", "unittestfunc", {"too", "foo"}) + local res = call.storage_api.call_on_storage(box.session.effective_user(), "unittestfunc", {"too", "foo"}) t.assert_equals(res, {"too", "foo"}) end g.test_non_existent_user = function() t.assert_error_msg_contains("User 'non_existent_user' is not found", - call.storage_api.call_on_storage, "non_existent_user", {}, "read", "unittestfunc") + call.storage_api.call_on_storage, "non_existent_user", "unittestfunc") end g.test_that_the_session_switches_back = function() @@ -35,7 +34,7 @@ g.test_that_the_session_switches_back = function() local reference_user = box.session.effective_user() t.assert_not_equals(reference_user, "unittestuser") - local res = call.storage_api.call_on_storage("unittestuser", {}, "read", "unittestfunc2") + local res = call.storage_api.call_on_storage("unittestuser", "unittestfunc2") t.assert_equals(res, "unittestuser") t.assert_equals(box.session.effective_user(), reference_user) end