Skip to content

Commit a91794c

Browse files
yuhaijun999ketor
authored andcommitted
[fix][index] Fix DINGODB-2346 issue. Fixed the crash problem when
calling try load atomic variables in index process. The reason is that asynchronous thread is used to access the destructor object members. Change to shared_ptr mode for passing. Other similar problems have also been fixed.
1 parent 0d4c576 commit a91794c

File tree

4 files changed

+24
-11
lines changed

4 files changed

+24
-11
lines changed

src/diskann/diskann_item.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ DiskANNItem::~DiskANNItem() {
8989
#endif
9090
}
9191

92+
std::shared_ptr<DiskANNItem> DiskANNItem::GetSelf() { return shared_from_this(); }
93+
9294
butil::Status DiskANNItem::Import(std::shared_ptr<Context> ctx, const std::vector<pb::common::Vector>& vectors,
9395
const std::vector<int64_t>& vector_ids, bool has_more,
9496
bool /*force_to_load_data_if_exist*/, int64_t already_send_vector_count, int64_t ts,
@@ -876,8 +878,9 @@ butil::Status DiskANNItem::DoSyncBuild(std::shared_ptr<Context> ctx, bool force_
876878
}
877879

878880
butil::Status DiskANNItem::DoAsyncBuild(std::shared_ptr<Context> ctx, bool force_to_build, DiskANNCoreState old_state) {
879-
auto lambda_call = [this, force_to_build, old_state, ctx]() {
880-
this->DoBuildInternal(ctx, force_to_build, old_state);
881+
std::shared_ptr<DiskANNItem> self = GetSelf();
882+
auto lambda_call = [self, force_to_build, old_state, ctx]() {
883+
self->DoBuildInternal(ctx, force_to_build, old_state);
881884
};
882885

883886
#if defined(ENABLE_DISKANN_ITEM_PTHREAD)
@@ -966,7 +969,8 @@ butil::Status DiskANNItem::DoSyncLoad(std::shared_ptr<Context> ctx, const pb::co
966969

967970
butil::Status DiskANNItem::DoAsyncLoad(std::shared_ptr<Context> ctx, const pb::common::LoadDiskAnnParam& load_param,
968971
DiskANNCoreState old_state) {
969-
auto lambda_call = [this, &load_param, old_state, ctx]() { this->DoLoadInternal(ctx, load_param, old_state); };
972+
std::shared_ptr<DiskANNItem> self = GetSelf();
973+
auto lambda_call = [self, &load_param, old_state, ctx]() { self->DoLoadInternal(ctx, load_param, old_state); };
970974

971975
#if defined(ENABLE_DISKANN_ITEM_PTHREAD)
972976
std::thread th(lambda_call);
@@ -1032,7 +1036,8 @@ butil::Status DiskANNItem::DoSyncTryLoad(std::shared_ptr<Context> ctx, const pb:
10321036

10331037
butil::Status DiskANNItem::DoAsyncTryLoad(std::shared_ptr<Context> ctx, const pb::common::LoadDiskAnnParam& load_param,
10341038
DiskANNCoreState old_state) {
1035-
auto lambda_call = [this, &load_param, old_state, ctx]() { this->DoTryLoadInternal(ctx, load_param, old_state); };
1039+
std::shared_ptr<DiskANNItem> self = GetSelf();
1040+
auto lambda_call = [self, &load_param, old_state, ctx]() { self->DoTryLoadInternal(ctx, load_param, old_state); };
10361041

10371042
#if defined(ENABLE_DISKANN_ITEM_PTHREAD)
10381043
std::thread th(lambda_call);

src/diskann/diskann_item.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace dingodb {
4040

4141
// #undef ENABLE_DISKANN_ID_MAPPING
4242

43-
class DiskANNItem {
43+
class DiskANNItem : public std::enable_shared_from_this<DiskANNItem> {
4444
public:
4545
explicit DiskANNItem(std::shared_ptr<Context> ctx, int64_t vector_index_id,
4646
const pb::common::VectorIndexParameter& vector_index_parameter, u_int32_t num_threads,
@@ -53,6 +53,8 @@ class DiskANNItem {
5353
DiskANNItem(DiskANNItem&& rhs) = delete;
5454
DiskANNItem& operator=(DiskANNItem&& rhs) = delete;
5555

56+
std::shared_ptr<DiskANNItem> GetSelf();
57+
5658
butil::Status Import(std::shared_ptr<Context> ctx, const std::vector<pb::common::Vector>& vectors,
5759
const std::vector<int64_t>& vector_ids, bool has_more, bool force_to_load_data_if_exist,
5860
int64_t already_send_vector_count, int64_t ts, int64_t tso,

src/vector/vector_index_diskann.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,10 @@ butil::Status VectorIndexDiskANN::Build(const pb::common::Range& region_range, m
360360

361361
if (!FLAGS_diskann_build_sync_internal) {
362362
internal_state = pb::common::DiskANNCoreState::BUILDING;
363-
auto task = std::make_shared<ServiceTask>([this, region_range, reader, parameter, ts]() {
363+
std::shared_ptr<VectorIndexDiskANN> self = GetSelf();
364+
auto task = std::make_shared<ServiceTask>([self, region_range, reader, parameter, ts]() {
364365
pb::common::DiskANNCoreState state;
365-
auto status = DoBuild(region_range, reader, parameter, ts, state);
366+
auto status = self->DoBuild(region_range, reader, parameter, ts, state);
366367
(void)status;
367368
});
368369

@@ -502,14 +503,15 @@ butil::Status VectorIndexDiskANN::Load(const pb::common::VectorLoadParameter& pa
502503

503504
if (!FLAGS_diskann_load_sync_internal) {
504505
internal_state = pb::common::DiskANNCoreState::LOADING;
505-
auto task = std::make_shared<ServiceTask>([this, internal_parameter]() {
506+
std::shared_ptr<VectorIndexDiskANN> self = GetSelf();
507+
auto task = std::make_shared<ServiceTask>([self, internal_parameter]() {
506508
pb::common::DiskANNCoreState state;
507509
butil::Status status;
508510
// load index rpc
509511
if (internal_parameter.diskann().direct_load_without_build()) {
510-
status = SendVectorTryLoadRequestWrapper(internal_parameter, state);
512+
status = self->SendVectorTryLoadRequestWrapper(internal_parameter, state);
511513
} else {
512-
status = SendVectorLoadRequestWrapper(internal_parameter, state);
514+
status = self->SendVectorLoadRequestWrapper(internal_parameter, state);
513515
}
514516
if (!status.ok()) {
515517
LOG(ERROR) << "[" << __PRETTY_FUNCTION__ << "] " << status.error_cstr();
@@ -688,6 +690,8 @@ butil::Status VectorIndexDiskANN::Dump(bool dump_all, std::vector<std::string>&
688690
}
689691
}
690692

693+
std::shared_ptr<VectorIndexDiskANN> VectorIndexDiskANN::GetSelf() { return shared_from_this(); }
694+
691695
butil::Status VectorIndexDiskANN::Save(const std::string& /*path*/) {
692696
return butil::Status(pb::error::Errno::EVECTOR_NOT_SUPPORT, "not support in DiskANN!!!");
693697
}

src/vector/vector_index_diskann.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ namespace dingodb {
3636

3737
#undef TEST_VECTOR_INDEX_DISKANN_MOCK
3838

39-
class VectorIndexDiskANN : public VectorIndex {
39+
class VectorIndexDiskANN : public VectorIndex, public std::enable_shared_from_this<VectorIndexDiskANN> {
4040
public:
4141
explicit VectorIndexDiskANN(int64_t id, const pb::common::VectorIndexParameter& vector_index_parameter,
4242
const pb::common::RegionEpoch& epoch, const pb::common::Range& range,
@@ -51,6 +51,8 @@ class VectorIndexDiskANN : public VectorIndex {
5151

5252
static void Init();
5353

54+
std::shared_ptr<VectorIndexDiskANN> GetSelf();
55+
5456
butil::Status Save(const std::string& path) override;
5557
butil::Status Load(const std::string& path) override;
5658

0 commit comments

Comments
 (0)