diff --git a/cloud/blockstore/libs/storage/service/service_actor.h b/cloud/blockstore/libs/storage/service/service_actor.h index 4010d77c247..70207733c29 100644 --- a/cloud/blockstore/libs/storage/service/service_actor.h +++ b/cloud/blockstore/libs/storage/service/service_actor.h @@ -267,6 +267,10 @@ class TServiceActor final TRequestInfoPtr requestInfo, TString input); + TResultOrError CreateDiskRegistryEnsureStateIntegrityActor( + TRequestInfoPtr requestInfo, + TString input); + TResultOrError CreateReplaceDeviceActionActor( TRequestInfoPtr requestInfo, TString input); diff --git a/cloud/blockstore/libs/storage/service/service_actor_actions.cpp b/cloud/blockstore/libs/storage/service/service_actor_actions.cpp index 02eb61d5737..6ab526e7077 100644 --- a/cloud/blockstore/libs/storage/service/service_actor_actions.cpp +++ b/cloud/blockstore/libs/storage/service/service_actor_actions.cpp @@ -49,6 +49,7 @@ void TServiceActor::HandleExecuteAction( {"describevolume", &TServiceActor::CreateDescribeVolumeActionActor }, {"diskregistrychangestate", &TServiceActor::CreateDiskRegistryChangeStateActor }, {"drainnode", &TServiceActor::CreateDrainNodeActionActor }, + {"ensurediskregistrystateintegrity", &TServiceActor::CreateDiskRegistryEnsureStateIntegrityActor }, {"finishfilldisk", &TServiceActor::CreateFinishFillDiskActionActor }, {"getcompactionstatus", &TServiceActor::CreateGetCompactionStatusActionActor }, {"getdependentdisks", &TServiceActor::CreateGetDependentDisksActionActor }, diff --git a/cloud/blockstore/libs/storage/service/service_actor_actions_disk_registry_ensure_state_integrity.cpp b/cloud/blockstore/libs/storage/service/service_actor_actions_disk_registry_ensure_state_integrity.cpp new file mode 100644 index 00000000000..396b53309c4 --- /dev/null +++ b/cloud/blockstore/libs/storage/service/service_actor_actions_disk_registry_ensure_state_integrity.cpp @@ -0,0 +1,150 @@ +#include "service_actor.h" + +#include +#include +#include + +#include +#include +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER) + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TEnsureDiskRegistryStateIntegrityActor final + : public TActorBootstrapped +{ +private: + const TRequestInfoPtr RequestInfo; + const TString Input; + + NProto::TError Error; + +public: + TEnsureDiskRegistryStateIntegrityActor( + TRequestInfoPtr requestInfo, + TString input); + + void Bootstrap(const TActorContext& ctx); + +private: + void ReplyAndDie( + const TActorContext& ctx, + std::unique_ptr response); + +private: + STFUNC(StateEnsureDiskRegistryStateIntegrity); + + void HandleEnsureDiskRegistryStateIntegrityResponse( + const TEvDiskRegistry::TEvEnsureDiskRegistryStateIntegrityResponse:: + TPtr& ev, + const TActorContext& ctx); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TEnsureDiskRegistryStateIntegrityActor::TEnsureDiskRegistryStateIntegrityActor( + TRequestInfoPtr requestInfo, + TString input) + : RequestInfo(std::move(requestInfo)) + , Input(std::move(input)) +{} + +void TEnsureDiskRegistryStateIntegrityActor::Bootstrap(const TActorContext& ctx) +{ + auto request = std::make_unique< + TEvDiskRegistry::TEvEnsureDiskRegistryStateIntegrityRequest>(); + + if (!google::protobuf::util::JsonStringToMessage(Input, &request->Record) + .ok()) + { + Error = MakeError(E_ARGUMENT, "Failed to parse input"); + ReplyAndDie( + ctx, + std::make_unique()); + return; + } + + Become(&TThis::StateEnsureDiskRegistryStateIntegrity); + + NCloud::Send(ctx, MakeDiskRegistryProxyServiceId(), std::move(request)); +} + +void TEnsureDiskRegistryStateIntegrityActor::ReplyAndDie( + const TActorContext& ctx, + std::unique_ptr response) +{ + LWTRACK( + ResponseSent_Service, + RequestInfo->CallContext->LWOrbit, + "ExecuteAction_ensurediskregistrystateintegrity", + RequestInfo->CallContext->RequestId); + + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + Die(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TEnsureDiskRegistryStateIntegrityActor:: + HandleEnsureDiskRegistryStateIntegrityResponse( + const TEvDiskRegistry::TEvEnsureDiskRegistryStateIntegrityResponse:: + TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + TString output; + google::protobuf::util::MessageToJsonString(msg->Record, &output); + auto response = std::make_unique(); + response->Record.SetOutput(output); + ReplyAndDie(ctx, std::move(response)); +} + +//////////////////////////////////////////////////////////////////////////////// + +STFUNC( + TEnsureDiskRegistryStateIntegrityActor:: + StateEnsureDiskRegistryStateIntegrity) +{ + switch (ev->GetTypeRewrite()) { + HFunc( + TEvDiskRegistry::TEvEnsureDiskRegistryStateIntegrityResponse, + HandleEnsureDiskRegistryStateIntegrityResponse); + + default: + HandleUnexpectedEvent( + ev, + TBlockStoreComponents::SERVICE, + __PRETTY_FUNCTION__); + break; + } +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +TResultOrError +TServiceActor::CreateDiskRegistryEnsureStateIntegrityActor( + TRequestInfoPtr requestInfo, + TString input) +{ + return {std::make_unique( + std::move(requestInfo), + std::move(input))}; +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/service/ya.make b/cloud/blockstore/libs/storage/service/ya.make index 441ebea6048..8d9ca9c73f4 100644 --- a/cloud/blockstore/libs/storage/service/ya.make +++ b/cloud/blockstore/libs/storage/service/ya.make @@ -18,6 +18,7 @@ SRCS( service_actor_actions_describe_blocks.cpp service_actor_actions_describe.cpp service_actor_actions_disk_registry_change_state.cpp + service_actor_actions_disk_registry_ensure_state_integrity.cpp service_actor_actions_drain_node.cpp service_actor_actions_finish_fill_disk.cpp service_actor_actions_flush_profile_log.cpp diff --git a/cloud/blockstore/libs/storage/testlib/disk_registry_proxy_mock.h b/cloud/blockstore/libs/storage/testlib/disk_registry_proxy_mock.h index d1c5862c371..410e1e9aa8e 100644 --- a/cloud/blockstore/libs/storage/testlib/disk_registry_proxy_mock.h +++ b/cloud/blockstore/libs/storage/testlib/disk_registry_proxy_mock.h @@ -158,6 +158,10 @@ class TDiskRegistryProxyMock final TEvDiskRegistry::TEvGetClusterCapacityRequest, HandleGetClusterCapacity); + HFunc( + TEvDiskRegistry::TEvEnsureDiskRegistryStateIntegrityRequest, + HandleEnsureDiskRegistryStateIntegrity); + IgnoreFunc(NKikimr::TEvLocal::TEvTabletMetrics); default: @@ -1100,6 +1104,16 @@ class TDiskRegistryProxyMock final *response->Record.AddCapacity() = std::move(capacityInfo); NCloud::Reply(ctx, *ev, std::move(response)); } + + void HandleEnsureDiskRegistryStateIntegrity( + const TEvDiskRegistry::TEvEnsureDiskRegistryStateIntegrityRequest::TPtr& ev, + const NActors::TActorContext& ctx) + { + NCloud::Reply( + ctx, + *ev, + std::make_unique()); + } }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/tests/ensure_disk_registry_state_integrity/test.py b/cloud/blockstore/tests/ensure_disk_registry_state_integrity/test.py new file mode 100644 index 00000000000..29e4bd237f5 --- /dev/null +++ b/cloud/blockstore/tests/ensure_disk_registry_state_integrity/test.py @@ -0,0 +1,240 @@ +import logging +import os +import pytest +import time + +from cloud.blockstore.tests.python.lib.client import NbsClient +from cloud.blockstore.tests.python.lib.test_client import CreateTestClient + +from cloud.blockstore.public.sdk.python.protos import \ + STORAGE_MEDIA_SSD_NONREPLICATED, \ + STORAGE_MEDIA_SSD_MIRROR3 + +from cloud.blockstore.tests.python.lib.config import NbsConfigurator, \ + generate_disk_agent_txt +from cloud.blockstore.tests.python.lib.daemon import start_ydb, start_nbs, \ + start_disk_agent + +import yatest.common as yatest_common + +from contrib.ydb.tests.library.harness.kikimr_runner import \ + get_unique_path_for_current_test, ensure_path_exists + +DEVICE_SIZE = 1024 ** 3 # 1 GiB +DEVICES_PER_PATH = 6 + +KNOWN_DEVICE_POOLS = { + "KnownDevicePools": [ + {"Kind": "DEVICE_POOL_KIND_DEFAULT", "AllocationUnit": DEVICE_SIZE}, + ]} + +DISK_STATE_MIGRATION_MESSAGE = "data migration in progress, slight performance decrease may be experienced" + + +@pytest.fixture(name='ydb') +def start_ydb_cluster(): + + ydb_cluster = start_ydb() + + yield ydb_cluster + + ydb_cluster.stop() + + +@pytest.fixture(name='nbs') +def start_nbs_daemon(ydb): + + cfg = NbsConfigurator(ydb) + cfg.generate_default_nbs_configs() + + cfg.files["storage"].DisableLocalService = False + cfg.files["storage"].NonReplicatedDontSuspendDevices = True + cfg.files["storage"].AllocationUnitNonReplicatedSSD = 1 + cfg.files["storage"].AllocationUnitMirror3SSD = 1 + cfg.files["storage"].NonReplicatedAgentMinTimeout = 10000 # 10s + cfg.files["storage"].NonReplicatedAgentMaxTimeout = 10000 # 10s + + daemon = start_nbs(cfg) + + client = CreateTestClient(f"localhost:{daemon.port}") + client.execute_DiskRegistrySetWritableState(State=True) + client.update_disk_registry_config(KNOWN_DEVICE_POOLS) + + yield daemon + + daemon.stop() + + +@pytest.fixture(name='agent_ids') +def create_agent_ids(): + return ['agent-1', 'agent-2', 'agent-3'] + + +def _get_agent_data_path(agent_id, data_path): + return os.path.join(data_path, agent_id, "dev", "disk", "by-partlabel") + + +@pytest.fixture(name='data_path') +def create_data_path(agent_ids): + + data = get_unique_path_for_current_test( + output_path=yatest_common.output_path(), + sub_folder="data") + + for agent_id in agent_ids: + ensure_path_exists(_get_agent_data_path(agent_id, data)) + + return data + + +@pytest.fixture(autouse=True) +def create_device_files(data_path, agent_ids): + + for agent_id in agent_ids: + p = _get_agent_data_path(agent_id, data_path) + with open(os.path.join(p, 'NVMENBS01'), 'wb') as f: + os.truncate(f.fileno(), DEVICES_PER_PATH * (DEVICE_SIZE + 4096)) + + +def _create_disk_agent_configurator(ydb, agent_id, data_path): + assert agent_id is not None + + configurator = NbsConfigurator(ydb, node_type='disk-agent') + configurator.generate_default_nbs_configs() + + disk_agent_config = generate_disk_agent_txt( + agent_id=agent_id, + device_erase_method='DEVICE_ERASE_METHOD_NONE', # speed up tests + storage_discovery_config={ + "PathConfigs": [{ + "PathRegExp": f"{data_path}/NVMENBS([0-9]+)", + "PoolConfigs": [{ + "MinSize": 4096 + DEVICE_SIZE, + "Layout": { + "DeviceSize": DEVICE_SIZE, + "DevicePadding": 4096, + "HeaderSize": 4096 + } + }]} + ]}) + + caches = os.path.join( + get_unique_path_for_current_test( + output_path=yatest_common.output_path(), + sub_folder="caches"), + agent_id) + ensure_path_exists(caches) + + disk_agent_config.CachedConfigPath = os.path.join(caches, 'config.txt') + disk_agent_config.DisableBrokenDevices = True + + configurator.files["disk-agent"] = disk_agent_config + configurator.files["location"].Rack = 'RACK:' + agent_id + + return configurator + + +@pytest.fixture(name='disk_agent_configurators') +def create_disk_agent_configurators(ydb, agent_ids, data_path): + configurators = [] + + for agent_id in agent_ids: + p = _get_agent_data_path(agent_id, data_path) + configurators.append(_create_disk_agent_configurator(ydb, agent_id, p)) + + return configurators + + +def test_statuses(nbs, agent_ids, disk_agent_configurators): + + logger = logging.getLogger("client") + logger.setLevel(logging.DEBUG) + + client = CreateTestClient(f"localhost:{nbs.port}", log=logger) + nbs_client = NbsClient(nbs.port) + + # run agents + assert nbs_client.ensure_disk_registry_state_integrity() == {} + + agents = [] + for agent_id, configurator in zip(agent_ids, disk_agent_configurators): + agents.append(start_disk_agent(configurator, name=agent_id)) + + for agent_id, agent in zip(agent_ids, agents): + agent.wait_for_registration() + client.add_host(agent_id) + + client.wait_for_devices_to_be_cleared() + + assert nbs_client.ensure_disk_registry_state_integrity() == {} + + # create volumes + + client.create_volume( + disk_id="m3", + block_size=4096, + blocks_count=3*DEVICE_SIZE//4096, + storage_media_kind=STORAGE_MEDIA_SSD_MIRROR3) + + for i, agent_id in enumerate(agent_ids): + client.create_volume( + disk_id=f"vol{i + 1}", + block_size=4096, + blocks_count=1*DEVICE_SIZE//4096, + storage_media_kind=STORAGE_MEDIA_SSD_NONREPLICATED, + agent_ids=[agent_id]) + + assert nbs_client.ensure_disk_registry_state_integrity() == {} + # remove agent-2 + client.remove_host(agent_ids[1]) + + vol3_device = client.stat_volume("vol3")["Volume"].Devices[0] + + client.remove_device(vol3_device.AgentId, vol3_device.DeviceName) + + client.execute_DiskRegistryChangeState( + Message="test", + ChangeDeviceState={ + "DeviceUUID": vol3_device.DeviceUUID, + "State": 2, # DEVICE_STATE_ERROR + }) + + assert nbs_client.ensure_disk_registry_state_integrity() == {} + # kill agent-1 + agents[0].kill() + + def wait_agent1_state(desired_state): + for _ in range(120): + bkp = client.backup_disk_registry_state() + bkp['Agents'].sort(key=lambda a: a['AgentId']) + assert bkp['Agents'][0]['AgentId'] == agent_ids[0] + if bkp['Agents'][0].get('State') == desired_state: + return + time.sleep(1) + pytest.fail(f"{agent_ids[0]} is not in {desired_state} state") + + # wait until agent-1 becomes unavailable + wait_agent1_state('AGENT_STATE_UNAVAILABLE') + + assert nbs_client.ensure_disk_registry_state_integrity() == {} + + # restart agent-1 + + agents[0] = start_disk_agent(disk_agent_configurators[0], name=agent_ids[0]) + agents[0].wait_for_registration() + + # wait until agent-1 returns from the unavailable state + wait_agent1_state('AGENT_STATE_WARNING') + + # fix agent-1 + client.execute_DiskRegistryChangeState( + Message="test", + ChangeAgentState={ + "AgentId": agent_ids[0], + "State": 0, # AGENT_STATE_ONLINE + }) + + assert nbs_client.ensure_disk_registry_state_integrity() == {} + + for agent in agents: + agent.stop() diff --git a/cloud/blockstore/tests/ensure_disk_registry_state_integrity/ya.make b/cloud/blockstore/tests/ensure_disk_registry_state_integrity/ya.make new file mode 100644 index 00000000000..6186ab57cc3 --- /dev/null +++ b/cloud/blockstore/tests/ensure_disk_registry_state_integrity/ya.make @@ -0,0 +1,18 @@ +PY3TEST() + +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/medium.inc) + +TEST_SRCS(test.py) + +DEPENDS( + cloud/blockstore/apps/client + cloud/blockstore/apps/disk_agent + cloud/blockstore/apps/server + contrib/ydb/apps/ydbd +) + +PEERDIR( + cloud/blockstore/tests/python/lib +) + +END() diff --git a/cloud/blockstore/tests/python/lib/client.py b/cloud/blockstore/tests/python/lib/client.py index 1357b0fde5d..dde3f44f357 100644 --- a/cloud/blockstore/tests/python/lib/client.py +++ b/cloud/blockstore/tests/python/lib/client.py @@ -216,3 +216,10 @@ def change_agent_state(self, agent_id, state, timeout=300): resp = self.__execute_action("diskregistrychangestate", req, timeout) return json.loads(resp) + + def ensure_disk_registry_state_integrity(self, timeout=300): + req = {} + + resp = self.__execute_action("ensurediskregistrystateintegrity", req, timeout) + + return json.loads(resp) diff --git a/cloud/blockstore/tests/ya.make b/cloud/blockstore/tests/ya.make index e3410a02ed4..94f2a43cac1 100644 --- a/cloud/blockstore/tests/ya.make +++ b/cloud/blockstore/tests/ya.make @@ -13,6 +13,7 @@ RECURSE( disk_agent_config disk_states default_encryption + ensure_disk_registry_state_integrity external_endpoint e2e-tests e2e-tests-vhost