Skip to content

Commit 59b185b

Browse files
LiuRuoyu01yuhaijun999
authored andcommitted
[feat][coordinator]Add restore autoincrement
1 parent b2a57ad commit 59b185b

File tree

5 files changed

+132
-0
lines changed

5 files changed

+132
-0
lines changed

src/coordinator/auto_increment_control.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "common/logging.h"
2626
#include "common/synchronization.h"
2727
#include "engine/snapshot.h"
28+
#include "fmt/format.h"
2829
#include "proto/error.pb.h"
2930
#include "server/server.h"
3031

@@ -255,6 +256,45 @@ void AutoIncrementControl::GetLeaderLocation(pb::common::Location& leader_server
255256
GetServerLocation(leader_raft_location, leader_server_location);
256257
}
257258

259+
butil::Status AutoIncrementControl::CreateOrUpdateAutoIncrement(
260+
int64_t table_id, int64_t start_id, pb::coordinator_internal::MetaIncrement& meta_increment) {
261+
DINGO_LOG(INFO) << table_id << " | " << start_id << " | ";
262+
int64_t source_start_id = 0;
263+
butil::Status ret = GetAutoIncrement(table_id, source_start_id);
264+
if (ret.ok()) {
265+
// update
266+
DINGO_LOG(INFO) << "[restore] update auto increment, table id: " << table_id << ", start id: " << start_id;
267+
if (start_id <= source_start_id) {
268+
DINGO_LOG(ERROR) << "table id: " << table_id << " : " << start_id << " <= " << source_start_id;
269+
return butil::Status(
270+
pb::error::Errno::EILLEGAL_PARAMTETERS,
271+
fmt::format("[restore] illegal parameters, start id :{} equal or smaller than source start id :{}", start_id,
272+
source_start_id));
273+
}
274+
auto* auto_increment = meta_increment.add_auto_increment();
275+
auto_increment->set_id(table_id);
276+
auto* increment = auto_increment->mutable_increment();
277+
increment->set_start_id(start_id);
278+
increment->set_source_start_id(source_start_id);
279+
increment->set_update_type(pb::coordinator_internal::AutoIncrementUpdateType::UPDATE_ONLY);
280+
auto_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::UPDATE);
281+
282+
} else if (ret.error_code() == pb::error::Errno::EAUTO_INCREMENT_NOT_FOUND) {
283+
// create
284+
DINGO_LOG(INFO) << "[restore] create auto increment, table id: " << table_id << ", start id: " << start_id;
285+
auto* auto_increment = meta_increment.add_auto_increment();
286+
auto_increment->set_id(table_id);
287+
auto* increment = auto_increment->mutable_increment();
288+
increment->set_start_id(start_id);
289+
auto_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::CREATE);
290+
} else {
291+
DINGO_LOG(ERROR) << ret.error_cstr();
292+
return ret;
293+
}
294+
295+
return butil::Status::OK();
296+
}
297+
258298
void AutoIncrementControl::GetServerLocation(pb::common::Location& raft_location,
259299
pb::common::Location& server_location) {
260300
// find in cache

src/coordinator/auto_increment_control.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <string>
2222

2323
#include "butil/containers/flat_map.h"
24+
#include "butil/status.h"
2425
#include "common/meta_control.h"
2526
#include "engine/engine.h"
2627
#include "proto/coordinator_internal.pb.h"
@@ -73,6 +74,10 @@ class AutoIncrementControl : public MetaControl {
7374
pb::coordinator_internal::MetaIncrement &meta_increment);
7475
butil::Status DeleteAutoIncrement(int64_t table_id, pb::coordinator_internal::MetaIncrement &meta_increment);
7576

77+
// restore auto_increment_map_
78+
butil::Status CreateOrUpdateAutoIncrement(int64_t table_id, int64_t start_id,
79+
pb::coordinator_internal::MetaIncrement &meta_increment);
80+
7681
// Get raft leader's server location
7782
void GetLeaderLocation(pb::common::Location &leader_server_location) override;
7883

src/coordinator/coordinator_control_meta.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1789,6 +1789,20 @@ butil::Status CoordinatorControl::RestoreIndexMeta(int64_t schema_id, int64_t in
17891789
return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "[br] index_id is already used by other index");
17901790
}
17911791

1792+
if (table_definition.auto_increment() > 0) {
1793+
auto status =
1794+
AutoIncrementControl::SyncSendCreateAutoIncrementInternal(index_id, table_definition.auto_increment());
1795+
if (!status.ok()) {
1796+
DINGO_LOG(ERROR) << fmt::format("send create auto increment internal error, code: {}, message: {} ",
1797+
status.error_code(), status.error_str())
1798+
<< ", table_definition=" << table_definition.ShortDebugString();
1799+
return butil::Status(pb::error::Errno::EAUTO_INCREMENT_WHILE_CREATING_TABLE,
1800+
fmt::format("[restore] send create auto increment internal error, code: {}, message: {}",
1801+
status.error_code(), status.error_str()));
1802+
}
1803+
DINGO_LOG(INFO) << "Restore AutoIncrement send create auto increment internal success";
1804+
}
1805+
17921806
// update index_name_map_safe_temp_
17931807
if (index_name_map_safe_temp_.PutIfAbsent(new_index_check_name, index_id) < 0) {
17941808
DINGO_LOG(INFO) << " [br] index_name" << table_definition.name() << " is exist, when insert index_id=" << index_id;

src/server/meta_service.cc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "butil/containers/flat_map.h"
2525
#include "butil/status.h"
2626
#include "common/constant.h"
27+
#include "common/context.h"
2728
#include "common/logging.h"
2829
#include "coordinator/auto_increment_control.h"
2930
#include "coordinator/coordinator_control.h"
@@ -4029,6 +4030,73 @@ void MetaServiceImpl::ImportIdEpochType(google::protobuf::RpcController *control
40294030
}
40304031
}
40314032

4033+
void DoCreateOrUpdateAutoIncrements(google::protobuf::RpcController *controller,
4034+
const pb::meta::CreateOrUpdateAutoIncrementsRequest *request,
4035+
pb::meta::CreateOrUpdateAutoIncrementsResponse *response, TrackClosure *done,
4036+
std::shared_ptr<AutoIncrementControl> auto_increment_control,
4037+
std::shared_ptr<Engine> raft_engine) {
4038+
brpc::ClosureGuard done_guard(done);
4039+
4040+
DINGO_LOG(INFO) << request->ShortDebugString();
4041+
4042+
pb::coordinator_internal::MetaIncrement meta_increment;
4043+
4044+
for (const auto &table_increment : request->table_increment_group().table_increments()) {
4045+
if (!auto_increment_control->IsLeader()) {
4046+
return auto_increment_control->RedirectResponse(response);
4047+
}
4048+
auto table_id = table_increment.table_id();
4049+
auto start_id = table_increment.start_id();
4050+
auto ret = auto_increment_control->CreateOrUpdateAutoIncrement(table_id, start_id, meta_increment);
4051+
if (!ret.ok()) {
4052+
DINGO_LOG(ERROR) << "failed, " << table_id << "|" << start_id << " , error_msg : " << ret.error_str();
4053+
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret.error_code()));
4054+
response->mutable_error()->set_errmsg(ret.error_str());
4055+
return;
4056+
}
4057+
}
4058+
std::shared_ptr<Context> ctx =
4059+
std::make_shared<Context>(static_cast<brpc::Controller *>(controller), nullptr, response);
4060+
ctx->SetRegionId(Constant::kAutoIncrementRegionId);
4061+
ctx->SetTracker(done->Tracker());
4062+
4063+
// this is a async operation will be block by closure
4064+
auto ret2 = raft_engine->Write(ctx, WriteDataBuilder::BuildWrite(ctx->CfName(), meta_increment));
4065+
if (!ret2.ok()) {
4066+
DINGO_LOG(ERROR) << "failed, " << " | " << ret2.error_str();
4067+
ServiceHelper::SetError(response->mutable_error(), ret2.error_code(), ret2.error_str());
4068+
4069+
if (ret2.error_code() == pb::error::Errno::ERAFT_NOTLEADER) {
4070+
auto_increment_control->RedirectResponse(response);
4071+
}
4072+
return;
4073+
}
4074+
4075+
DINGO_LOG(INFO) << "CreateOrUpdateAutoIncrements Success. ";
4076+
}
4077+
4078+
void MetaServiceImpl::CreateOrUpdateAutoIncrements(google::protobuf::RpcController *controller,
4079+
const pb::meta::CreateOrUpdateAutoIncrementsRequest *request,
4080+
pb::meta::CreateOrUpdateAutoIncrementsResponse *response,
4081+
google::protobuf::Closure *done) {
4082+
brpc::ClosureGuard done_guard(done);
4083+
if (!auto_increment_control_->IsLeader()) {
4084+
return RedirectResponse(response);
4085+
}
4086+
DINGO_LOG(INFO) << request->ShortDebugString();
4087+
4088+
// Run in queue.
4089+
auto *svr_done = new CoordinatorServiceClosure(__func__, done_guard.release(), request, response);
4090+
auto task = std::make_shared<ServiceTask>([this, controller, request, response, svr_done]() {
4091+
DoCreateOrUpdateAutoIncrements(controller, request, response, svr_done, auto_increment_control_, engine_);
4092+
});
4093+
bool ret = worker_set_->ExecuteRR(task);
4094+
if (!ret) {
4095+
brpc::ClosureGuard done_guard(svr_done);
4096+
ServiceHelper::SetError(response->mutable_error(), pb::error::EREQUEST_FULL, "Commit execute queue failed");
4097+
}
4098+
}
4099+
40324100
void DoCreateTenant(google::protobuf::RpcController * /*controller*/, const pb::meta::CreateTenantRequest *request,
40334101
pb::meta::CreateTenantResponse *response, TrackClosure *done,
40344102
std::shared_ptr<CoordinatorControl> coordinator_control, std::shared_ptr<Engine> /*raft_engine*/) {

src/server/meta_service.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,11 @@ class MetaServiceImpl : public pb::meta::MetaService {
229229
void ImportIdEpochType(google::protobuf::RpcController* controller, const pb::meta::ImportIdEpochTypeRequest* request,
230230
pb::meta::ImportIdEpochTypeResponse* response, google::protobuf::Closure* done) override;
231231

232+
void CreateOrUpdateAutoIncrements(google::protobuf::RpcController* controller,
233+
const pb::meta::CreateOrUpdateAutoIncrementsRequest* request,
234+
pb::meta::CreateOrUpdateAutoIncrementsResponse* response,
235+
google::protobuf::Closure* done) override;
236+
232237
void SetWorkSet(WorkerSetPtr worker_set) { worker_set_ = worker_set; }
233238

234239
// table and index definition convertor

0 commit comments

Comments
 (0)