diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 8a6ebf9f..7d0dd4c0 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -149,7 +149,7 @@ test_run:switch('box_1_a') vshard.storage.rebalancer_enable() --- ... -wait_rebalancer_state("Rebalance routes are sent", test_run) +wait_rebalancer_state("The following rebalancer routes were sent", test_run) --- ... wait_rebalancer_state('The cluster is balanced ok', test_run) @@ -239,7 +239,7 @@ cfg.rebalancer_disbalance_threshold = 0.01 vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) --- ... -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) --- ... wait_rebalancer_state('The cluster is balanced ok', test_run) @@ -318,12 +318,13 @@ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}}) --- - [150, 'receiving'] ... -wait_rebalancer_state("Some buckets are not active", test_run) ---- -... +-- We should not check the certain status of buckets (e.g. receiving) because +-- in rare cases we accidentally can get the wrong one. For example we wait for +-- "receiving" status, but get "garbage" due to some previous rebalancer error. +wait_rebalancer_state('Error during downloading rebalancer states:.*' .. \ + 'REBALANCER_INVALID_STATE', test_run) \ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}}) --- -- [150, 'active'] ... vshard.storage.sync() --- diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index ec7ebcf2..7f443a9b 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -78,7 +78,7 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, true) test_run:switch('box_1_a') vshard.storage.rebalancer_enable() -wait_rebalancer_state("Rebalance routes are sent", test_run) +wait_rebalancer_state("The following rebalancer routes were sent", test_run) wait_rebalancer_state('The cluster is balanced ok', test_run) _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) @@ -118,7 +118,7 @@ _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) -- Return 1%. cfg.rebalancer_disbalance_threshold = 0.01 vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) wait_rebalancer_state('The cluster is balanced ok', test_run) _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) _bucket.index.status:min({vshard.consts.BUCKET.ACTIVE}) @@ -156,7 +156,11 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, false) test_run:switch('box_1_a') vshard.storage.rebalancer_enable() _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}}) -wait_rebalancer_state("Some buckets are not active", test_run) +-- We should not check the certain status of buckets (e.g. receiving) because +-- in rare cases we accidentally can get the wrong one. For example we wait for +-- "receiving" status, but get "garbage" due to some previous rebalancer error. +wait_rebalancer_state('Error during downloading rebalancer states:.*' .. \ + 'REBALANCER_INVALID_STATE', test_run) \ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}}) vshard.storage.sync() diff --git a/test/rebalancer/stress_add_remove_several_rs.result b/test/rebalancer/stress_add_remove_several_rs.result index 6a9b0ffb..194c99c4 100644 --- a/test/rebalancer/stress_add_remove_several_rs.result +++ b/test/rebalancer/stress_add_remove_several_rs.result @@ -175,7 +175,7 @@ add_replicaset() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) --- ... -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) --- ... -- Now, add a second replicaset. @@ -422,7 +422,7 @@ remove_second_replicaset_first_stage() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) --- ... -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) --- ... -- Rebalancing has been started - now remove second replicaset. diff --git a/test/rebalancer/stress_add_remove_several_rs.test.lua b/test/rebalancer/stress_add_remove_several_rs.test.lua index f62400f2..7c1ae3bc 100644 --- a/test/rebalancer/stress_add_remove_several_rs.test.lua +++ b/test/rebalancer/stress_add_remove_several_rs.test.lua @@ -71,7 +71,7 @@ fiber.sleep(0.5) test_run:switch('box_1_a') add_replicaset() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) -- Now, add a second replicaset. @@ -153,7 +153,7 @@ fiber.sleep(0.5) test_run:switch('box_1_a') remove_second_replicaset_first_stage() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) -- Rebalancing has been started - now remove second replicaset. remove_replicaset_first_stage() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) diff --git a/test/storage-luatest/storage_1_1_1_test.lua b/test/storage-luatest/storage_1_1_1_test.lua index 694baa79..b250dcff 100644 --- a/test/storage-luatest/storage_1_1_1_test.lua +++ b/test/storage-luatest/storage_1_1_1_test.lua @@ -50,7 +50,18 @@ test_group.before_all(function(g) vtest.cluster_new(g, global_cfg) vtest.cluster_bootstrap(g, global_cfg) - vtest.cluster_rebalancer_disable(g) + vtest.cluster_wait_vclock_all(g) + + vtest.cluster_exec_each_master(g, function() + box.schema.create_space('test_space') + box.space.test_space:format({ + {name = 'pk', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + }) + box.space.test_space:create_index('primary', {parts = {'pk'}}) + box.space.test_space:create_index( + 'bucket_id', {parts = {'bucket_id'}, unique = false}) + end) end) test_group.after_all(function(g) @@ -101,3 +112,136 @@ test_group.test_manual_bucket_send_doubled_buckets = function(g) ilt.assert_equals(box.space._bucket:get(bid), nil) end, {bid}) end + +local function start_partial_bucket_move(src_storage, dest_storage, bucket_id) + src_storage:exec(function(bucket_id, replicaset_id) + local res, err = ivshard.storage.bucket_send(bucket_id, replicaset_id) + t.assert_not(res) + t.assert(err) + t.helpers.retrying({}, function() + -- The bucket on src_storage must be in "sending" state. The + -- recovery service on src_storage should not erase this bucket. + t.assert_equals(box.space._bucket:select(bucket_id)[1].status, + 'sending') + end) + end, {bucket_id, dest_storage:replicaset_uuid()}) + + dest_storage:exec(function(bucket_id) + t.helpers.retrying({}, function() + -- The recovery service on dest_storage should clear this bucket. + t.assert_equals(box.space._bucket:select(bucket_id), {}) + end) + end, {bucket_id}) +end + +local function wait_for_bucket_is_transferred(src_storage, dest_storage, + bucket_id) + src_storage:exec(function(bucket_id) + t.helpers.retrying({}, function() + t.assert_equals(box.space._bucket:select(bucket_id), {}) + end) + end, {bucket_id}) + dest_storage:exec(function(bucket_id) + t.helpers.retrying({}, function() + t.assert_equals(box.space._bucket:select(bucket_id)[1].status, + 'active') + end) + end, {bucket_id}) +end + +local function move_bucket(src_storage, dest_storage, bucket_id) + src_storage:exec(function(bucket_id, replicaset_id) + local res, err = ivshard.storage.bucket_send(bucket_id, replicaset_id) + t.assert(res and not err, 'Error during transferring bucket') + end, {bucket_id, dest_storage:replicaset_uuid()}) + wait_for_bucket_is_transferred(src_storage, dest_storage, bucket_id) +end + +-- +-- Reduce spam of "Finish bucket recovery step" logs and add logging of +-- recovered buckets in recovery service (gh-212). +-- +test_group.test_no_logs_while_unsuccess_recovery = function(g) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'recovery_bucket_stat' then + return error('TimedOut') + end + return _G.old_call(service_name, ...) + end + end) + local hanged_bucket_id_1 = vtest.storage_first_bucket(g.replica_1_a) + start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_1) + local hanged_bucket_id_2 = vtest.storage_first_bucket(g.replica_1_a) + start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_2) + t.helpers.retrying({}, function() + g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end) + t.assert(g.replica_1_a:grep_log('Error during recovery of bucket')) + end) + t.assert_not(g.replica_1_a:grep_log('Finish bucket recovery step, 0')) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false + ivshard.storage._call = _G.old_call + end) + t.helpers.retrying({timeout = 60}, function() + g.replica_2_a:exec(function() + ivshard.storage.garbage_collector_wakeup() + ivshard.storage.recovery_wakeup() + end) + g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end) + -- In some rare cases the recovery service can recover buckets one + -- by one. As a result we get multiple "Finish bucket recovery" and + -- "Recovery buckets" logs with different bucket ids and buckets' + -- count. That is why we should grep general logs without buckets' + -- count and bucket ids to avoid flakiness. + t.assert(g.replica_1_a:grep_log('Finish bucket recovery step')) + t.assert(g.replica_1_a:grep_log('Recovered buckets')) + end) + wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a, + hanged_bucket_id_1) + wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a, + hanged_bucket_id_2) +end + +-- +-- Add logging of routes in rebalancer service (gh-212). +-- +test_group.test_rebalancer_routes_logging = function(g) + local moved_bucket_from_2 = vtest.storage_first_bucket(g.replica_2_a) + move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket_from_2) + local moved_bucket_from_3 = vtest.storage_first_bucket(g.replica_3_a) + move_bucket(g.replica_3_a, g.replica_1_a, moved_bucket_from_3) + t.helpers.retrying({timeout = 60}, function() + g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end) + t.assert(g.replica_1_a:grep_log('Apply rebalancer routes with 1 ' .. + 'workers')) + end) + local rebalancer_routes_msg = string.format( + "{\"%s\":{\"%s\":1}}", g.replica_1_a:replicaset_uuid(), + g.replica_2_a:replicaset_uuid()) + t.helpers.retrying({}, function() + t.assert(g.replica_1_a:grep_log(rebalancer_routes_msg)) + g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end) + g.replica_1_a:grep_log('The cluster is balanced ok.') + end) +end + +-- +-- Add replicaset.id into rebalancer_request_state errors (gh-212). +-- +test_group.test_no_log_spam_when_buckets_no_active = function(g) + local moved_bucket = vtest.storage_first_bucket(g.replica_1_a) + move_bucket(g.replica_1_a, g.replica_2_a, moved_bucket) + vtest.storage_stop(g.replica_2_a) + local err_log = string.format('Error during downloading rebalancer ' .. + 'states:.*"replicaset_id":"%s"', + g.replica_2_a:replicaset_uuid()) + t.helpers.retrying({timeout = 60}, function() + g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end) + t.assert(g.replica_1_a:grep_log(err_log)) + end) + vtest.storage_start(g.replica_2_a, global_cfg) + move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket) +end diff --git a/test/unit/rebalancer.result b/test/unit/rebalancer.result index d312a411..7cb73553 100644 --- a/test/unit/rebalancer.result +++ b/test/unit/rebalancer.result @@ -290,9 +290,11 @@ build_routes(replicasets) vshard.storage.internal.is_master = true --- ... -get_state = vshard.storage._rebalancer_request_state ---- -... +get_state = function() \ + local res, err = vshard.storage._rebalancer_request_state() \ + if res == nil then err.trace = nil end \ + return res, err \ +end \ _bucket = box.schema.create_space('_bucket') --- ... @@ -318,6 +320,7 @@ get_state() --- - bucket_active_count: 2 bucket_pinned_count: 0 +- null ... _bucket:replace{1, consts.BUCKET.RECEIVING} --- @@ -325,6 +328,13 @@ _bucket:replace{1, consts.BUCKET.RECEIVING} ... get_state() --- +- null +- buckets_state: receiving + code: 42 + replica_id: _ + type: ShardingError + message: Replica _ has receiving buckets during rebalancing + name: REBALANCER_INVALID_STATE ... vshard.storage.internal.is_master = false --- diff --git a/test/unit/rebalancer.test.lua b/test/unit/rebalancer.test.lua index e6d54b81..4a6d556c 100644 --- a/test/unit/rebalancer.test.lua +++ b/test/unit/rebalancer.test.lua @@ -76,7 +76,11 @@ build_routes(replicasets) -- Test rebalancer local state. -- vshard.storage.internal.is_master = true -get_state = vshard.storage._rebalancer_request_state +get_state = function() \ + local res, err = vshard.storage._rebalancer_request_state() \ + if res == nil then err.trace = nil end \ + return res, err \ +end \ _bucket = box.schema.create_space('_bucket') pk = _bucket:create_index('pk') status = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false}) diff --git a/vshard/error.lua b/vshard/error.lua index 0892758c..27a59b0d 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -207,6 +207,11 @@ local error_message_template = { msg = 'Mismatch server name: expected "%s", but got "%s"', args = {'expected_name', 'actual_name'}, }, + [42] = { + name = 'REBALANCER_INVALID_STATE', + msg = 'Replica %s has %s buckets during rebalancing', + args = {'replica_id', 'buckets_state'} + } } -- diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 97642250..cb1b3d43 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -5,6 +5,7 @@ local lmsgpack = require('msgpack') local netbox = require('net.box') -- for net.box:self() local trigger = require('internal.trigger') local ffi = require('ffi') +local json_encode = require('json').encode local yaml_encode = require('yaml').encode local fiber_clock = lfiber.clock local fiber_yield = lfiber.yield @@ -922,6 +923,12 @@ local function recovery_local_bucket_is_active(local_bucket, remote_bucket) return status == BSENT or status == BGARBAGE end +local function save_recovered(dict, id, status) + local ids = dict[status] or {} + table.insert(ids, id) + dict[status] = ids +end + -- -- Check status of each transferring bucket. Resolve status where -- possible. @@ -932,6 +939,7 @@ local function recovery_step_by_type(type, limiter) local recovered = 0 local total = 0 local start_format = 'Starting %s buckets recovery step' + local recovered_buckets = {} for _, bucket in _bucket.index.status:pairs(type) do lfiber.testcancel() total = total + 1 @@ -992,12 +1000,15 @@ local function recovery_step_by_type(type, limiter) if recovery_local_bucket_is_sent(bucket, remote_bucket) then _bucket:update({bucket_id}, {{'=', 2, BSENT}}) recovered = recovered + 1 + save_recovered(recovered_buckets, bucket_id, BSENT) elseif recovery_local_bucket_is_garbage(bucket, remote_bucket) then _bucket:update({bucket_id}, {{'=', 2, BGARBAGE}}) recovered = recovered + 1 + save_recovered(recovered_buckets, bucket_id, BGARBAGE) elseif recovery_local_bucket_is_active(bucket, remote_bucket) then _bucket:replace({bucket_id, BACTIVE}) recovered = recovered + 1 + save_recovered(recovered_buckets, bucket_id, BACTIVE) elseif is_step_empty then log.info('Bucket %s is %s local and %s on replicaset %s, waiting', bucket_id, bucket.status, remote_bucket.status, peer_id) @@ -1005,9 +1016,10 @@ local function recovery_step_by_type(type, limiter) is_step_empty = false ::continue:: end - if not is_step_empty then + if recovered > 0 then log.info('Finish bucket recovery step, %d %s buckets are recovered '.. - 'among %d', recovered, type, total) + 'among %d. Recovered buckets: %s', recovered, type, total, + json_encode(recovered_buckets)) end return total, recovered end @@ -2800,6 +2812,7 @@ local function rebalancer_download_states() replicaset, 'vshard.storage.rebalancer_request_state', {}, {timeout = consts.REBALANCER_GET_STATE_TIMEOUT}) if state == nil then + err.replicaset_id = replicaset.id return nil, err end local bucket_count = state.bucket_active_count + @@ -2845,12 +2858,10 @@ local function rebalancer_service_f(service, limiter) return end if not status or replicasets == nil then - local err = status and total_bucket_active_count or replicasets - if err then - limiter:log_error(err, service:set_status_error( - 'Error during downloading rebalancer states: %s', err)) - end - log.info('Some buckets are not active, retry rebalancing later') + local err = total_bucket_active_count + limiter:log_error(err, service:set_status_error( + 'Error during downloading rebalancer states: %s, ' .. + 'retry rebalancing later', err)) service:set_activity('idling') lfiber.testcancel() lfiber.sleep(consts.REBALANCER_WORK_INTERVAL) @@ -2903,8 +2914,9 @@ local function rebalancer_service_f(service, limiter) goto continue end end - log.info('Rebalance routes are sent. Schedule next wakeup after '.. - '%f seconds', consts.REBALANCER_WORK_INTERVAL) + log.info('The following rebalancer routes were sent: %s. ' .. + 'Schedule next wakeup after %f seconds', json_encode(routes), + consts.REBALANCER_WORK_INTERVAL) service:set_activity('idling') lfiber.testcancel() lfiber.sleep(consts.REBALANCER_WORK_INTERVAL) @@ -2938,18 +2950,16 @@ local function rebalancer_request_state() return nil, err end if not M.is_rebalancer_active or rebalancing_is_in_progress() then - return + return nil, lerror.make('Rebalancer is not active or is in progress') end local _bucket = box.space._bucket local status_index = _bucket.index.status - if #status_index:select({BSENDING}, {limit = 1}) > 0 then - return - end - if #status_index:select({BRECEIVING}, {limit = 1}) > 0 then - return - end - if #status_index:select({BGARBAGE}, {limit = 1}) > 0 then - return + local repl_id = M.this_replica and M.this_replica.id or '_' + for _, status in pairs({BSENDING, BRECEIVING, BGARBAGE}) do + if #status_index:select({status}, {limit = 1}) > 0 then + return nil, lerror.vshard(lerror.code.REBALANCER_INVALID_STATE, + repl_id, status) + end end return { bucket_active_count = status_index:count({BACTIVE}),