Skip to content

Commit f62af29

Browse files
LiuRuoyu01yuhaijun999
authored andcommitted
[fix][coordinator]Fixup restore meta
1 parent b4147ec commit f62af29

File tree

3 files changed

+346
-7
lines changed

3 files changed

+346
-7
lines changed

src/client_v2/meta.cc

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2842,8 +2842,6 @@ void RunImportMeta(const ImportMetaOptions &opt) {
28422842
if (Helper::SetUp(opt.coor_url) < 0) {
28432843
exit(-1);
28442844
}
2845-
dingodb::pb::meta::ImportMetaRequest request;
2846-
dingodb::pb::meta::ImportMetaResponse response;
28472845

28482846
std::map<std::string, std::string> internal_coordinator_sdk_meta_kvs;
28492847

@@ -2885,11 +2883,51 @@ void RunImportMeta(const ImportMetaOptions &opt) {
28852883
DINGO_LOG(ERROR) << s;
28862884
return;
28872885
}
2888-
request.mutable_meta_all()->CopyFrom(meta_all);
2889-
auto status = CoordinatorInteraction::GetInstance().GetCoorinatorInteractionMeta()->SendRequest("ImportMeta", request,
2890-
response);
2891-
DINGO_LOG(INFO) << "SendRequest status=" << status;
2892-
DINGO_LOG(INFO) << response.DebugString();
2886+
2887+
{
2888+
dingodb::pb::meta::CreateTenantsRequest request;
2889+
dingodb::pb::meta::CreateTenantsResponse response;
2890+
request.mutable_request_info()->set_request_id(1);
2891+
request.mutable_tenants()->CopyFrom(meta_all.tenants());
2892+
2893+
auto status = CoordinatorInteraction::GetInstance().GetCoorinatorInteractionMeta()->SendRequest("CreateTenants",
2894+
request, response);
2895+
if (!status.ok()) {
2896+
std::cout << "CreateTenants failed , status :" << status.error_str() << std::endl;
2897+
return;
2898+
}
2899+
std::cout << "CreateTenants success" << std::endl;
2900+
}
2901+
2902+
{
2903+
dingodb::pb::meta::CreateSchemasRequest request;
2904+
dingodb::pb::meta::CreateSchemasResponse response;
2905+
request.mutable_request_info()->set_request_id(2);
2906+
request.mutable_schemas()->insert(meta_all.schemas().begin(), meta_all.schemas().end());
2907+
auto status = CoordinatorInteraction::GetInstance().GetCoorinatorInteractionMeta()->SendRequest("CreateSchemas",
2908+
request, response);
2909+
if (!status.ok()) {
2910+
std::cout << "CreateSchemas failed , status :" << status.error_str() << std::endl;
2911+
return;
2912+
}
2913+
std::cout << "CreateSchemas success" << std::endl;
2914+
}
2915+
{
2916+
dingodb::pb::meta::CreateIndexMetasRequest request;
2917+
dingodb::pb::meta::CreateIndexMetasResponse response;
2918+
2919+
request.mutable_request_info()->set_request_id(3);
2920+
request.mutable_tables_and_indexes()->insert(meta_all.tables_and_indexes().begin(),
2921+
meta_all.tables_and_indexes().end());
2922+
2923+
auto status = CoordinatorInteraction::GetInstance().GetCoorinatorInteractionMeta()->SendRequest("CreateIndexMetas",
2924+
request, response);
2925+
if (!status.ok()) {
2926+
std::cout << "CreateIndexMetas failed , status :" << status.error_str() << std::endl;
2927+
return;
2928+
}
2929+
std::cout << "CreateIndexMetas success" << std::endl;
2930+
}
28932931
}
28942932

28952933
void SetUpExportMeta(CLI::App &app) {

src/server/meta_service.cc

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <cstdint>
1818
#include <map>
1919
#include <memory>
20+
#include <string>
2021
#include <utility>
2122
#include <vector>
2223

@@ -4344,4 +4345,295 @@ void MetaServiceImpl::GetTenants(google::protobuf::RpcController *controller,
43444345
}
43454346
}
43464347

4348+
void DoCreateTenants(google::protobuf::RpcController * /*controller*/, const pb::meta::CreateTenantsRequest *request,
4349+
pb::meta::CreateTenantsResponse *response, TrackClosure *done,
4350+
std::shared_ptr<CoordinatorControl> coordinator_control, std::shared_ptr<Engine> raft_engine) {
4351+
brpc::ClosureGuard done_guard(done);
4352+
4353+
if (!coordinator_control->IsLeader()) {
4354+
return coordinator_control->RedirectResponse(response);
4355+
}
4356+
4357+
DINGO_LOG(INFO) << request->ShortDebugString();
4358+
4359+
pb::coordinator_internal::MetaIncrement meta_increment;
4360+
for (auto tenant : request->tenants()) {
4361+
auto ret = coordinator_control->CreateTenant(tenant, meta_increment);
4362+
if (!ret.ok()) {
4363+
DINGO_LOG(ERROR) << "CreateTenants failed in meta_service, error code=" << ret;
4364+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret.error_code()));
4365+
response->mutable_error()->set_errmsg(ret.error_str());
4366+
return;
4367+
}
4368+
}
4369+
4370+
std::shared_ptr<Context> ctx = std::make_shared<Context>();
4371+
ctx->SetRegionId(Constant::kMetaRegionId);
4372+
ctx->SetTracker(done->Tracker());
4373+
4374+
// this is a async operation will be block by closure
4375+
auto ret1 = raft_engine->Write(ctx, WriteDataBuilder::BuildWrite(ctx->CfName(), meta_increment));
4376+
if (!ret1.ok()) {
4377+
DINGO_LOG(ERROR) << "CreateTenants failed in meta_service, error code=" << ret1.error_code()
4378+
<< ", error str=" << ret1.error_str();
4379+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret1.error_code()));
4380+
response->mutable_error()->set_errmsg(ret1.error_str());
4381+
return;
4382+
}
4383+
4384+
DINGO_LOG(INFO) << "CreateTenants Success. response: " << response->ShortDebugString();
4385+
}
4386+
4387+
void MetaServiceImpl::CreateTenants(google::protobuf::RpcController *controller,
4388+
const pb::meta::CreateTenantsRequest *request,
4389+
pb::meta::CreateTenantsResponse *response, google::protobuf::Closure *done) {
4390+
brpc::ClosureGuard done_guard(done);
4391+
4392+
if (!coordinator_control_->IsLeader()) {
4393+
return RedirectResponse(response);
4394+
}
4395+
4396+
DINGO_LOG(INFO) << request->ShortDebugString();
4397+
4398+
// Run in queue.
4399+
auto *svr_done = new CoordinatorServiceClosure(__func__, done_guard.release(), request, response);
4400+
auto task = std::make_shared<ServiceTask>([this, controller, request, response, svr_done]() {
4401+
DoCreateTenants(controller, request, response, svr_done, coordinator_control_, engine_);
4402+
});
4403+
bool ret = worker_set_->ExecuteRR(task);
4404+
if (!ret) {
4405+
brpc::ClosureGuard done_guard(svr_done);
4406+
ServiceHelper::SetError(response->mutable_error(), pb::error::EREQUEST_FULL, "Commit execute queue failed");
4407+
}
4408+
}
4409+
4410+
void DoCreateSchemas(google::protobuf::RpcController * /*controller*/, const pb::meta::CreateSchemasRequest *request,
4411+
pb::meta::CreateSchemasResponse *response, TrackClosure *done,
4412+
std::shared_ptr<CoordinatorControl> coordinator_control, std::shared_ptr<Engine> raft_engine) {
4413+
brpc::ClosureGuard done_guard(done);
4414+
4415+
if (!coordinator_control->IsLeader()) {
4416+
return coordinator_control->RedirectResponse(response);
4417+
}
4418+
4419+
DINGO_LOG(INFO) << request->ShortDebugString();
4420+
4421+
pb::coordinator_internal::MetaIncrement meta_increment;
4422+
for (const auto &[id, schemas] : request->schemas()) {
4423+
for (const pb::meta::Schema &schema : schemas.schemas()) {
4424+
if (id == 0 && (schema.name() == "ROOT" || schema.name() == "META" || schema.name() == "DINGO" ||
4425+
schema.name() == "MYSQL" || schema.name() == "INFORMATION_SCHEMA")) {
4426+
DINGO_LOG(INFO) << "Already auto creat, Skip create schema: " << schema.name();
4427+
} else {
4428+
int64_t schema_id = schema.id().entity_id();
4429+
auto ret = coordinator_control->CreateSchema(schema.tenant_id(), schema.name(), schema_id, meta_increment);
4430+
if (!ret.ok()) {
4431+
DINGO_LOG(ERROR) << "CreateSchema failed in meta_service, error code=" << ret;
4432+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret.error_code()));
4433+
response->mutable_error()->set_errmsg("restore schema : " + ret.error_str());
4434+
return;
4435+
}
4436+
if (schema.id().entity_id() != schema_id) {
4437+
DINGO_LOG(ERROR) << "schema_id is not equal to schema.id().entity_id()";
4438+
response->mutable_error()->set_errcode(pb::error::Errno::EILLEGAL_PARAMTETERS);
4439+
response->mutable_error()->set_errmsg("restore schema : schema_id is not equal to schema.id().entity_id()");
4440+
return;
4441+
}
4442+
}
4443+
}
4444+
}
4445+
4446+
std::shared_ptr<Context> ctx = std::make_shared<Context>();
4447+
ctx->SetRegionId(Constant::kMetaRegionId);
4448+
ctx->SetTracker(done->Tracker());
4449+
4450+
// this is a async operation will be block by closure
4451+
auto ret1 = raft_engine->Write(ctx, WriteDataBuilder::BuildWrite(ctx->CfName(), meta_increment));
4452+
if (!ret1.ok()) {
4453+
DINGO_LOG(ERROR) << "CreateSchemas failed in meta_service, error code=" << ret1.error_code()
4454+
<< ", error str=" << ret1.error_str();
4455+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret1.error_code()));
4456+
response->mutable_error()->set_errmsg(ret1.error_str());
4457+
return;
4458+
}
4459+
4460+
DINGO_LOG(INFO) << "CreateSchemas Success. response: " << response->ShortDebugString();
4461+
}
4462+
4463+
void MetaServiceImpl::CreateSchemas(google::protobuf::RpcController *controller,
4464+
const pb::meta::CreateSchemasRequest *request,
4465+
pb::meta::CreateSchemasResponse *response, google::protobuf::Closure *done) {
4466+
brpc::ClosureGuard done_guard(done);
4467+
4468+
if (!this->coordinator_control_->IsLeader()) {
4469+
return RedirectResponse(response);
4470+
}
4471+
4472+
DINGO_LOG(INFO) << request->ShortDebugString();
4473+
4474+
// Run in queue.
4475+
auto *svr_done = new CoordinatorServiceClosure(__func__, done_guard.release(), request, response);
4476+
auto task = std::make_shared<ServiceTask>([this, controller, request, response, svr_done]() {
4477+
DoCreateSchemas(controller, request, response, svr_done, coordinator_control_, engine_);
4478+
});
4479+
bool ret = worker_set_->ExecuteRR(task);
4480+
if (!ret) {
4481+
brpc::ClosureGuard done_guard(svr_done);
4482+
ServiceHelper::SetError(response->mutable_error(), pb::error::EREQUEST_FULL, "Commit execute queue failed");
4483+
}
4484+
}
4485+
4486+
void DoCreateIndexMetas(google::protobuf::RpcController * /*controller*/,
4487+
const pb::meta::CreateIndexMetasRequest *request, pb::meta::CreateIndexMetasResponse *response,
4488+
TrackClosure *done, std::shared_ptr<CoordinatorControl> coordinator_control,
4489+
std::shared_ptr<Engine> raft_engine) {
4490+
brpc::ClosureGuard done_guard(done);
4491+
4492+
if (!coordinator_control->IsLeader()) {
4493+
return coordinator_control->RedirectResponse(response);
4494+
}
4495+
4496+
DINGO_LOG(INFO) << request->ShortDebugString();
4497+
4498+
pb::coordinator_internal::MetaIncrement meta_increment;
4499+
std::vector<pb::meta::Tenant> tenants;
4500+
4501+
// get tenants
4502+
{
4503+
auto ret1 = coordinator_control->GetAllTenants(tenants);
4504+
if (!ret1.ok()) {
4505+
DINGO_LOG(ERROR) << "GetAllTenants failed in meta_service, error code=" << ret1;
4506+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret1.error_code()));
4507+
response->mutable_error()->set_errmsg(ret1.error_str());
4508+
return;
4509+
}
4510+
}
4511+
4512+
// get schema_dingo_common_id to schema
4513+
std::map<std::string, pb::meta::Schema> common_id_to_schema;
4514+
{
4515+
for (const auto &tenant : tenants) {
4516+
std::vector<pb::meta::Schema> sub_schemas;
4517+
auto ret2 = coordinator_control->GetSchemas(tenant.id(), sub_schemas);
4518+
if (!ret2.ok()) {
4519+
DINGO_LOG(ERROR) << "GetSchemas failed in meta_service, error code=" << ret2;
4520+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret2.error_code()));
4521+
response->mutable_error()->set_errmsg(ret2.error_str());
4522+
return;
4523+
}
4524+
for (const auto &schema : sub_schemas) {
4525+
std::string schema_dingo_common_id = fmt::format("{}|{}|{}", static_cast<int32_t>(schema.id().entity_type()),
4526+
schema.id().parent_entity_id(), schema.id().entity_id());
4527+
if (common_id_to_schema.find(schema_dingo_common_id) != common_id_to_schema.end()) {
4528+
DINGO_LOG(ERROR) << "schema_dingo_common_id is duplicate : " << schema_dingo_common_id;
4529+
response->mutable_error()->set_errcode(pb::error::Errno::EILLEGAL_PARAMTETERS);
4530+
response->mutable_error()->set_errmsg("Restore indexes meta : schema_dingo_common_id is duplicate");
4531+
return;
4532+
} else {
4533+
common_id_to_schema[schema_dingo_common_id] = schema;
4534+
}
4535+
}
4536+
}
4537+
}
4538+
4539+
for (const auto &[schema_dingo_common_id, tables_and_indexes] : request->tables_and_indexes()) {
4540+
if (common_id_to_schema.find(schema_dingo_common_id) == common_id_to_schema.end()) {
4541+
DINGO_LOG(ERROR) << "schema_dingo_common_id is not exist : " << schema_dingo_common_id;
4542+
response->mutable_error()->set_errcode(pb::error::Errno::EILLEGAL_PARAMTETERS);
4543+
response->mutable_error()->set_errmsg("Restore indexes meta : schema_dingo_common_id is not exist");
4544+
return;
4545+
}
4546+
4547+
if (tables_and_indexes.tables_size() > FLAGS_max_table_definition_count_in_create_tables) {
4548+
DINGO_LOG(ERROR) << "table definition_with_ids_size is too big, size=" << tables_and_indexes.tables_size()
4549+
<< ", max=" << FLAGS_max_table_definition_count_in_create_tables;
4550+
response->mutable_error()->set_errcode(pb::error::Errno::EILLEGAL_PARAMTETERS);
4551+
response->mutable_error()->set_errmsg("Restore indexes meta : tables size is too large.");
4552+
return;
4553+
}
4554+
4555+
int64_t schema_id;
4556+
// decode schema id
4557+
{
4558+
size_t pos = schema_dingo_common_id.rfind('|');
4559+
if (pos == std::string::npos) {
4560+
std::string err_msg = fmt::format("decode schema id error, common schema id {}", schema_dingo_common_id);
4561+
DINGO_LOG(ERROR) << err_msg;
4562+
response->mutable_error()->set_errcode(pb::error::Errno::EILLEGAL_PARAMTETERS);
4563+
response->mutable_error()->set_errmsg(err_msg);
4564+
return;
4565+
}
4566+
4567+
std::string entity_id_str = schema_dingo_common_id.substr(pos + 1);
4568+
schema_id = std::stoll(entity_id_str);
4569+
DINGO_LOG(INFO) << "decode schema id success, schema_id=" << schema_id
4570+
<< " original schema_dingo_common_id=" << schema_dingo_common_id;
4571+
}
4572+
4573+
// restore index meta
4574+
for (const auto &index_with_id : tables_and_indexes.indexes()) {
4575+
const pb::meta::DingoCommonId &index_common_id = index_with_id.index_id();
4576+
4577+
pb::meta::TableDefinitionWithId table_definition_with_id;
4578+
MetaServiceImpl::IndexDefinitionToTableDefinition(index_with_id.index_definition(),
4579+
*(table_definition_with_id.mutable_table_definition()));
4580+
4581+
const auto &definition = table_definition_with_id.table_definition();
4582+
4583+
if (index_common_id.entity_type() == pb::meta::EntityType::ENTITY_TYPE_INDEX) {
4584+
auto ret3 =
4585+
coordinator_control->RestoreIndexMeta(schema_id, index_common_id.entity_id(), definition, meta_increment);
4586+
if (!ret3.ok()) {
4587+
DINGO_LOG(ERROR) << "CreateIndex failed in meta_service, error code=" << ret3;
4588+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret3.error_code()));
4589+
response->mutable_error()->set_errmsg("Restore indexes meta : " + ret3.error_str());
4590+
return;
4591+
}
4592+
DINGO_LOG(INFO) << "type: " << index_common_id.entity_type() << ", index_id=" << index_common_id.entity_id();
4593+
} else {
4594+
DINGO_LOG(ERROR) << "entity type is illegal : " << index_common_id.entity_type();
4595+
response->mutable_error()->set_errcode(pb::error::Errno::EILLEGAL_PARAMTETERS);
4596+
response->mutable_error()->set_errmsg("Restore indexes meta : entity type is illegal");
4597+
return;
4598+
}
4599+
}
4600+
}
4601+
4602+
std::shared_ptr<Context> ctx = std::make_shared<Context>();
4603+
ctx->SetRegionId(Constant::kMetaRegionId);
4604+
ctx->SetTracker(done->Tracker());
4605+
4606+
// this is a async operation will be block by closure
4607+
auto ret4 = raft_engine->Write(ctx, WriteDataBuilder::BuildWrite(ctx->CfName(), meta_increment));
4608+
if (!ret4.ok()) {
4609+
DINGO_LOG(ERROR) << "CreateIndexMetas failed in meta_service, error code=" << ret4.error_code()
4610+
<< ", error str=" << ret4.error_str();
4611+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret4.error_code()));
4612+
response->mutable_error()->set_errmsg(ret4.error_str());
4613+
return;
4614+
}
4615+
}
4616+
4617+
void MetaServiceImpl::CreateIndexMetas(google::protobuf::RpcController *controller,
4618+
const pb::meta::CreateIndexMetasRequest *request,
4619+
pb::meta::CreateIndexMetasResponse *response, google::protobuf::Closure *done) {
4620+
brpc::ClosureGuard done_guard(done);
4621+
if (!this->coordinator_control_->IsLeader()) {
4622+
return RedirectResponse(response);
4623+
}
4624+
4625+
DINGO_LOG(INFO) << request->ShortDebugString();
4626+
4627+
// Run in queue.
4628+
auto *svr_done = new CoordinatorServiceClosure(__func__, done_guard.release(), request, response);
4629+
auto task = std::make_shared<ServiceTask>([this, controller, request, response, svr_done]() {
4630+
DoCreateIndexMetas(controller, request, response, svr_done, coordinator_control_, engine_);
4631+
});
4632+
bool ret = worker_set_->ExecuteRR(task);
4633+
if (!ret) {
4634+
brpc::ClosureGuard done_guard(svr_done);
4635+
ServiceHelper::SetError(response->mutable_error(), pb::error::EREQUEST_FULL, "Commit execute queue failed");
4636+
}
4637+
}
4638+
43474639
} // namespace dingodb

src/server/meta_service.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,15 @@ class MetaServiceImpl : public pb::meta::MetaService {
234234
pb::meta::CreateOrUpdateAutoIncrementsResponse* response,
235235
google::protobuf::Closure* done) override;
236236

237+
void CreateTenants(google::protobuf::RpcController* controller, const pb::meta::CreateTenantsRequest* request,
238+
pb::meta::CreateTenantsResponse* response, google::protobuf::Closure* done) override;
239+
240+
void CreateSchemas(google::protobuf::RpcController* controller, const pb::meta::CreateSchemasRequest* request,
241+
pb::meta::CreateSchemasResponse* response, google::protobuf::Closure* done) override;
242+
243+
void CreateIndexMetas(google::protobuf::RpcController* controller, const pb::meta::CreateIndexMetasRequest* request,
244+
pb::meta::CreateIndexMetasResponse* response, google::protobuf::Closure* done) override;
245+
237246
void SetWorkSet(WorkerSetPtr worker_set) { worker_set_ = worker_set; }
238247

239248
// table and index definition convertor

0 commit comments

Comments
 (0)