Skip to content

Commit 5ad562e

Browse files
yuhaijun999ketor
authored andcommitted
[fix][index] Fix the bug that diskann build pq data exceeds 2^31
squares. And optimize the error reporting time. It is only found on the index side.
1 parent a91794c commit 5ad562e

File tree

10 files changed

+205
-2
lines changed

10 files changed

+205
-2
lines changed

dingo-store-proto

src/common/constant.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ class Constant {
243243
// do not change this parameter or it will crash.
244244
inline static const int64_t kDiskannMinCount = 2;
245245
// max count of vectors for diskann
246-
inline static const int64_t kDiskannMaxCount = std::numeric_limits<uint32_t>::max();
246+
inline static const int64_t kDiskannMaxCount = std::numeric_limits<int32_t>::max();
247247
inline static const std::string kDiskannStore = "store";
248248
inline static const std::string kDiskannPathConfigName = "path";
249249
inline static const std::string kDiskannNumThreadsConfigName = "num_threads";

src/diskann/diskann_item.cc

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,56 @@ butil::Status DiskANNItem::SetNoData(std::shared_ptr<Context> ctx) {
694694
return butil::Status::OK();
695695
}
696696

697+
butil::Status DiskANNItem::SetImportTooMany(std::shared_ptr<Context> ctx) {
698+
DiskANNCoreState old_state;
699+
butil::Status status;
700+
BvarLatencyGuard bvar_guard(&g_diskann_server_nodata_latency);
701+
RWLockWriteGuard guard(&rw_lock_);
702+
703+
SetSide(ctx);
704+
bool is_error_occurred = false;
705+
old_state = state_.load();
706+
707+
auto lambda_set_state_function = [this, &is_error_occurred, &status, ctx]() {
708+
if (is_error_occurred) {
709+
last_error_ = status;
710+
error_local_side_ = local_side_;
711+
error_remote_side_ = remote_side_;
712+
}
713+
714+
ctx->SetStatus(last_error_);
715+
ctx->SetDiskANNCoreStateX(state_);
716+
};
717+
718+
ON_SCOPE_EXIT(lambda_set_state_function);
719+
if (state_.load() == DiskANNCoreState::kImporting &&
720+
last_error_.error_code() == pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY) {
721+
std::string s = fmt::format("diskann is set import too many.(kImporting). ignore.");
722+
DINGO_LOG(INFO) << s;
723+
status = butil::Status(pb::error::Errno::OK, s);
724+
return status;
725+
}
726+
727+
if (!last_error_.ok()) {
728+
is_error_occurred = true;
729+
DINGO_LOG(ERROR) << "already error occurred, ignore set import too many. return." << last_error_.error_cstr();
730+
status = last_error_;
731+
return status;
732+
}
733+
734+
if (state_.load() != DiskANNCoreState::kUnknown) {
735+
std::string s = fmt::format("diskann item state wrong. {}", FormatParameter());
736+
DINGO_LOG(ERROR) << s;
737+
status = butil::Status(pb::error::Errno::EDISKANN_IMPORT_STATE_WRONG, s);
738+
return status;
739+
}
740+
741+
state_ = DiskANNCoreState::kImporting;
742+
std::string s = fmt::format("diskann set import too many. > {}", Constant::kDiskannMaxCount);
743+
last_error_ = butil::Status(pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY, s);
744+
return butil::Status::OK();
745+
}
746+
697747
bool DiskANNItem::IsBuildedFilesExist(int64_t vector_index_id, pb::common::MetricType metric_type) {
698748
std::string data_bin_path = fmt::format("{}/{}/{}/{}", base_dir, normal_name, vector_index_id, input_name);
699749
#if defined(ENABLE_DISKANN_ID_MAPPING)

src/diskann/diskann_item.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class DiskANNItem : public std::enable_shared_from_this<DiskANNItem> {
7171
std::string Dump(std::shared_ptr<Context> ctx);
7272
butil::Status Count(std::shared_ptr<Context> ctx, int64_t& count); // NOLINT
7373
butil::Status SetNoData(std::shared_ptr<Context> ctx);
74+
butil::Status SetImportTooMany(std::shared_ptr<Context> ctx);
75+
7476

7577
static void SetImportTimeout(int64_t timeout_s) { DiskANNItem::import_timeout_s = timeout_s; }
7678
static void SetBaseDir(const std::string& base) { DiskANNItem::base_dir = base; }

src/diskann/diskann_service_handle.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,30 @@ butil::Status DiskAnnServiceHandle::VectorSetNoData(std::shared_ptr<Context> ctx
331331
return status;
332332
}
333333

334+
butil::Status DiskAnnServiceHandle::VectorSetImportTooMany(std::shared_ptr<Context> ctx, int64_t vector_index_id) {
335+
butil::Status status;
336+
auto item = item_manager.Find(vector_index_id);
337+
if (item == nullptr) {
338+
std::string s = fmt::format("vector_index_id : {} not exists", vector_index_id);
339+
DINGO_LOG(ERROR) << s;
340+
status = butil::Status(pb::error::EINDEX_NOT_FOUND, s);
341+
return status;
342+
}
343+
344+
status = item->SetImportTooMany(ctx);
345+
if (!status.ok()) {
346+
DINGO_LOG(ERROR) << status.error_cstr();
347+
}
348+
349+
pb::diskann::VectorSetImportTooManyResponse& response =
350+
(dynamic_cast<pb::diskann::VectorSetImportTooManyResponse&>(*ctx->Response()));
351+
352+
ServiceHelper::SetError(response.mutable_last_error(), ctx->Status().error_code(), ctx->Status().error_str());
353+
response.set_state(DiskANNUtils::DiskANNCoreStateToPb(ctx->DiskANNCoreStateX()));
354+
355+
return status;
356+
}
357+
334358
butil::Status DiskAnnServiceHandle::VectorDump(std::shared_ptr<Context> ctx, int64_t vector_index_id) {
335359
butil::Status status;
336360
auto item = item_manager.Find(vector_index_id);

src/diskann/diskann_service_handle.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class DiskAnnServiceHandle {
5959

6060
static butil::Status VectorSetNoData(std::shared_ptr<Context> ctx, int64_t vector_index_id);
6161

62+
static butil::Status VectorSetImportTooMany(std::shared_ptr<Context> ctx, int64_t vector_index_id);
63+
6264
static butil::Status VectorDump(std::shared_ptr<Context> ctx, int64_t vector_index_id);
6365

6466
static butil::Status VectorDumpAll(std::shared_ptr<Context> ctx);

src/server/diskann_service.cc

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,60 @@ void DiskAnnServiceImpl::VectorSetNoData(google::protobuf::RpcController* contro
692692
}
693693
}
694694

695+
static butil::Status ValidateVectorSetImportTooMany(
696+
const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request) {
697+
if (request->vector_index_id() <= 0) {
698+
std::string s = fmt::format("Invalid vector index id : {}", request->vector_index_id());
699+
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s);
700+
}
701+
702+
return butil::Status::OK();
703+
}
704+
705+
static void DoVectorSetImportTooMany(std::shared_ptr<DiskAnnServiceHandle> handle,
706+
google::protobuf::RpcController* controller,
707+
const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request,
708+
::dingodb::pb::diskann::VectorSetImportTooManyResponse* response,
709+
TrackClosure* done) {
710+
brpc::Controller* cntl = (brpc::Controller*)controller;
711+
brpc::ClosureGuard done_guard(done);
712+
auto tracker = done->Tracker();
713+
tracker->SetServiceQueueWaitTime();
714+
715+
butil::Status status = ValidateVectorSetImportTooMany(request);
716+
if (!status.ok()) {
717+
ServiceHelper::SetError(response->mutable_error(), status.error_code(), status.error_str());
718+
return;
719+
}
720+
721+
auto ctx = std::make_shared<Context>(cntl, true ? nullptr : done_guard.release(), request, response);
722+
ctx->SetTracker(tracker);
723+
724+
status = handle->VectorSetImportTooMany(ctx, request->vector_index_id());
725+
if (!status.ok()) {
726+
ServiceHelper::SetError(response->mutable_error(), status.error_code(), status.error_str());
727+
}
728+
}
729+
730+
void DiskAnnServiceImpl::VectorSetImportTooMany(google::protobuf::RpcController* controller,
731+
const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request,
732+
::dingodb::pb::diskann::VectorSetImportTooManyResponse* response,
733+
::google::protobuf::Closure* done) {
734+
auto* svr_done = new NoContextServiceClosure(__func__, done, request, response);
735+
736+
// Run in queue.
737+
auto task = std::make_shared<ServiceTask>([this, controller, request, response, svr_done]() {
738+
DoVectorSetImportTooMany(handle_, controller, request, response, svr_done);
739+
});
740+
741+
bool ret = DiskANNItemRuntime::GetMiscWorkerSet()->Execute(task);
742+
if (!ret) {
743+
brpc::ClosureGuard done_guard(svr_done);
744+
ServiceHelper::SetError(response->mutable_error(), pb::error::EREQUEST_FULL,
745+
"WorkerSet queue is full, please wait and retry");
746+
}
747+
}
748+
695749
static butil::Status ValidateVectorDump(const ::dingodb::pb::diskann::VectorDumpRequest* request) {
696750
if (request->vector_index_id() <= 0) {
697751
std::string s = fmt::format("Invalid vector index id : {}", request->vector_index_id());

src/server/diskann_service.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ class DiskAnnServiceImpl : public pb::diskann::DiskAnnService {
7474
::dingodb::pb::diskann::VectorSetNoDataResponse* response,
7575
::google::protobuf::Closure* done) override;
7676

77+
void VectorSetImportTooMany(google::protobuf::RpcController* controller,
78+
const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request,
79+
::dingodb::pb::diskann::VectorSetImportTooManyResponse* response,
80+
::google::protobuf::Closure* done) override;
81+
7782
void VectorDump(google::protobuf::RpcController* controller, const ::dingodb::pb::diskann::VectorDumpRequest* request,
7883
::dingodb::pb::diskann::VectorDumpResponse* response, ::google::protobuf::Closure* done) override;
7984

src/vector/vector_index_diskann.cc

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,20 @@ butil::Status VectorIndexDiskANN::DoBuild(const pb::common::Range& region_range,
772772
return butil::Status::OK();
773773
}
774774

775+
// count too many, set error to diskann.
776+
if (region_count > Constant::kDiskannMaxCount) {
777+
std::string s = fmt::format("region : {} vector current count {} is more than max count {}. set error to diskann",
778+
Id(), region_count, Constant::kDiskannMaxCount);
779+
DINGO_LOG(WARNING) << s;
780+
status = SendVectorSetImportTooManyRequestWrapper();
781+
if (!status.ok()) {
782+
DINGO_LOG(ERROR) << status.error_cstr();
783+
return status;
784+
}
785+
state = pb::common::DiskANNCoreState::IMPORTING;
786+
return butil::Status(pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY, s);
787+
}
788+
775789
auto iter = reader->NewIterator(Constant::kVectorDataCF, ts, options);
776790

777791
if (!iter) {
@@ -1224,6 +1238,55 @@ butil::Status VectorIndexDiskANN::SendVectorSetNoDataRequestWrapper() {
12241238
return butil::Status::OK();
12251239
}
12261240

1241+
butil::Status VectorIndexDiskANN::SendVectorSetImportTooManyRequest(const google::protobuf::Message& request,
1242+
google::protobuf::Message& response) {
1243+
butil::Status status;
1244+
if (!InitChannel(diskann_server_addr)) {
1245+
std::string s = fmt::format("Init channel failed, addr : {}", diskann_server_addr);
1246+
DINGO_LOG(ERROR) << s;
1247+
return butil::Status(pb::error::Errno::EINTERNAL, s);
1248+
}
1249+
1250+
// count rpc
1251+
status = SendRequest("DiskAnnService", "VectorSetImportTooMany", request, response);
1252+
if (!status.ok()) {
1253+
std::string s = fmt::format("VectorSetImportTooMany request failed, errcode: {} errmsg: {}",
1254+
pb::error::Errno_Name(status.error_code()), status.error_cstr());
1255+
DINGO_LOG(ERROR) << s;
1256+
return butil::Status(status.error_code(), s);
1257+
}
1258+
1259+
return butil::Status::OK();
1260+
}
1261+
butil::Status VectorIndexDiskANN::SendVectorSetImportTooManyRequestWrapper() {
1262+
butil::Status status;
1263+
1264+
// count rpc
1265+
pb::diskann::VectorSetImportTooManyRequest vector_set_import_too_many_request;
1266+
pb::diskann::VectorSetImportTooManyResponse vector_set_import_too_many_response;
1267+
1268+
vector_set_import_too_many_request.set_vector_index_id(Id());
1269+
status = SendVectorSetImportTooManyRequest(vector_set_import_too_many_request, vector_set_import_too_many_response);
1270+
if (!status.ok()) {
1271+
std::string s = fmt::format("VectorSetImportTooMany request failed, errcode: {} errmsg: {}",
1272+
pb::error::Errno_Name(status.error_code()), status.error_cstr());
1273+
DINGO_LOG(ERROR) << s;
1274+
return butil::Status(status.error_code(), s);
1275+
}
1276+
1277+
if (vector_set_import_too_many_response.error().errcode() != pb::error::Errno::OK) {
1278+
std::string s =
1279+
fmt::format("VectorSetImportTooMany response error, errcode: {} errmsg: {} state: {}",
1280+
pb::error::Errno_Name(vector_set_import_too_many_response.error().errcode()),
1281+
vector_set_import_too_many_response.error().errmsg(),
1282+
pb::common::DiskANNCoreState_Name(vector_set_import_too_many_response.state())); // state
1283+
DINGO_LOG(ERROR) << s;
1284+
return butil::Status(vector_set_import_too_many_response.error().errcode(), s);
1285+
}
1286+
1287+
return butil::Status::OK();
1288+
}
1289+
12271290
butil::Status VectorIndexDiskANN::SendVectorBuildRequest(const google::protobuf::Message& request,
12281291
google::protobuf::Message& response) {
12291292
butil::Status status;

src/vector/vector_index_diskann.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ class VectorIndexDiskANN : public VectorIndex, public std::enable_shared_from_th
133133
butil::Status SendVectorSetNoDataRequest(const google::protobuf::Message& request,
134134
google::protobuf::Message& response);
135135
butil::Status SendVectorSetNoDataRequestWrapper();
136+
butil::Status SendVectorSetImportTooManyRequest(const google::protobuf::Message& request,
137+
google::protobuf::Message& response);
138+
butil::Status SendVectorSetImportTooManyRequestWrapper();
136139
butil::Status SendVectorBuildRequest(const google::protobuf::Message& request, google::protobuf::Message& response);
137140
butil::Status SendVectorBuildRequestWrapper(const pb::common::VectorBuildParameter& parameter,
138141
pb::common::DiskANNCoreState& state);

0 commit comments

Comments
 (0)