Skip to content

Commit 7d8f8b1

Browse files
rfsalievalonre24meiravgri
authored
[MOD-12668] Implement asynchronous SVS GC execution using SVSMultiThreadJob (#840)
* Implement asynchronous SVS GC execution using SVSMultiThreadJob * Add runGCParallel test * [MOD-12668] Add micro benchmark for SVS GC with threads (#844) * add micro benchmark for GC with threads * fix assert Co-authored-by: meiravgri <[email protected]> --------- Co-authored-by: meiravgri <[email protected]> * Fix micro benchmark for SVS GC with threads --------- Co-authored-by: alon <[email protected]> Co-authored-by: alonre24 <[email protected]> Co-authored-by: meiravgri <[email protected]>
1 parent bac48fb commit 7d8f8b1

File tree

5 files changed

+132
-11
lines changed

5 files changed

+132
-11
lines changed

src/VecSim/algorithms/svs/svs_tiered.h

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
225225
// Used to prevent scheduling multiple index update jobs at the same time.
226226
// As far as the update job does a batch update, job queue should have just 1 job at the moment.
227227
std::atomic_flag indexUpdateScheduled = ATOMIC_FLAG_INIT;
228+
// Used to prevent scheduling multiple index GC jobs at the same time.
229+
std::atomic_flag indexGCScheduled = ATOMIC_FLAG_INIT;
228230
// Used to prevent running multiple index update jobs in parallel.
229231
// Even if update jobs scheduled sequentially, they can be started in parallel.
230232
mutable std::mutex updateJobMutex;
@@ -549,6 +551,39 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
549551
index->updateSVSIndex(availableThreads);
550552
}
551553

554+
/**
555+
* @brief Run SVS index GC in a thread-safe manner.
556+
*
557+
* This static wrapper function performs the following actions:
558+
* - Acquires a lock on the index's mainIndexGuard to ensure thread safety during the GC
559+
* - Configures the number of threads for the underlying SVS index update operation.
560+
* - Calls the SVSIndex::runGC() method to perform the actual index update.
561+
* - Clears the indexGCScheduled flag to allow future scheduling.
562+
*
563+
* @param idx Pointer to the VecSimIndex to be updated.
564+
* @param availableThreads The number of threads available for the update operation. Current
565+
* thread us used as well, so the minimal value is 1.
566+
* @note no need to implement extra non-static method, as GC logic is simple enough to be done
567+
* here.
568+
*/
569+
static void SVSIndexGCWrapper(VecSimIndex *idx, size_t availableThreads) {
570+
assert(availableThreads > 0);
571+
auto index = static_cast<TieredSVSIndex<DataType> *>(idx);
572+
assert(index);
573+
574+
std::lock_guard lock{index->mainIndexGuard};
575+
// Release the scheduled flag to allow scheduling again
576+
index->indexGCScheduled.clear();
577+
578+
// Do SVS index GC
579+
index->backendIndex->log(VecSimCommonStrings::LOG_VERBOSE_STRING,
580+
"running asynchronous GC for tiered SVS index");
581+
auto svs_index = index->GetSVSIndex();
582+
svs_index->setNumThreads(std::min(availableThreads, index->backendIndex->indexSize()));
583+
// VecSimIndexAbstract::runGC() is protected
584+
static_cast<VecSimIndexInterface *>(index->backendIndex)->runGC();
585+
}
586+
552587
#ifdef BUILD_TESTS
553588
public:
554589
#endif
@@ -565,6 +600,19 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
565600
this->submitJobs(jobs);
566601
}
567602

603+
void scheduleSVSIndexGC() {
604+
// do not schedule if scheduled already
605+
if (indexGCScheduled.test_and_set()) {
606+
return;
607+
}
608+
609+
auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity();
610+
auto jobs = SVSMultiThreadJob::createJobs(
611+
this->allocator, SVS_GC_JOB, SVSIndexGCWrapper, this, total_threads,
612+
std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs);
613+
this->submitJobs(jobs);
614+
}
615+
568616
private:
569617
static void applySwapsToLabelsArray(std::vector<size_t> &labels,
570618
const std::vector<swap_record> &swaps) {
@@ -962,10 +1010,8 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
9621010

9631011
void runGC() override {
9641012
TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING,
965-
"running asynchronous GC for tiered SVS index");
966-
std::lock_guard lock{this->mainIndexGuard};
967-
// VecSimIndexAbstract::runGC() is protected
968-
static_cast<VecSimIndexInterface *>(this->backendIndex)->runGC();
1013+
"scheduling asynchronous GC for tiered SVS index");
1014+
scheduleSVSIndexGC();
9691015
}
9701016

9711017
void acquireSharedLocks() override {

src/VecSim/vec_sim_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ typedef enum {
243243
HNSW_SEARCH_JOB,
244244
HNSW_SWAP_JOB,
245245
SVS_BATCH_UPDATE_JOB,
246+
SVS_GC_JOB,
246247
INVALID_JOB // to indicate that finding a JobType >= INVALID_JOB is an error
247248
} JobType;
248249

tests/benchmark/bm_initialization/bm_basics_svs_initialize_fp32.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@ BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimSVS, BM_FUNC_NAME(BM_RunGC), DATA_TYPE_INDE
1919
BENCHMARK_REGISTER_F(BM_VecSimSVS, BM_FUNC_NAME(BM_RunGC))
2020
->Unit(benchmark::kMillisecond)
2121
->Iterations(1)
22-
->Arg(50)
23-
->Arg(100)
24-
->Arg(250)
25-
->Arg(500)
26-
->ArgName("num_deletions");
22+
->ArgsProduct({{50, 100, 500}, {1, 4}})
23+
->ArgNames({"num_deletions", "thread_count"});
2724

2825
// AddLabel one by one
2926
BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimSVS, BM_FUNC_NAME(BM_AddLabelOneByOne), DATA_TYPE_INDEX_T)

tests/benchmark/bm_vecsim_svs.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,13 @@ template <typename index_type_t>
407407
void BM_VecSimSVS<index_type_t>::RunGC(benchmark::State &st) {
408408

409409
size_t num_deletions = st.range(0);
410-
auto mock_thread_pool = tieredIndexMock(1);
411-
ASSERT_EQ(mock_thread_pool.thread_pool_size, 1);
410+
auto mock_thread_pool = tieredIndexMock(st.range(1));
411+
ASSERT_EQ(mock_thread_pool.thread_pool_size, st.range(1));
412412
auto *tiered_index = CreateTieredSVSIndexFromFile(mock_thread_pool, 1);
413413

414+
// start threadpool
415+
mock_thread_pool.init_threads();
416+
414417
// Delete vectors, not yet triggering consolidation.
415418
for (size_t i = 0; i < num_deletions; ++i) {
416419
int ret = VecSimIndex_DeleteVector(tiered_index, i);
@@ -421,6 +424,7 @@ void BM_VecSimSVS<index_type_t>::RunGC(benchmark::State &st) {
421424
ASSERT_EQ(info.svsInfo.numberOfMarkedDeletedNodes, num_deletions);
422425
for (auto _ : st) {
423426
VecSimTieredIndex_GC(tiered_index);
427+
mock_thread_pool.thread_pool_wait();
424428
};
425429
// num deleted should be 0
426430
info = VecSimIndex_DebugInfo(tiered_index);

tests/unit/test_svs_tiered.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3126,8 +3126,19 @@ TYPED_TEST(SVSTieredIndexTestBasic, runGCAPI) {
31263126
ASSERT_EQ(tiered_index->GetSVSIndex()->indexStorageSize(), n);
31273127
auto size_before_gc = tiered_index->getAllocationSize();
31283128

3129+
auto jobs_before_gc = mock_thread_pool.jobQ.size();
31293130
// Run the GC API call, expect that we will clean up the SVS index.
31303131
VecSimTieredIndex_GC(tiered_index);
3132+
// Expected that GC jobs were added to the queue.
3133+
ASSERT_EQ(mock_thread_pool.jobQ.size(), jobs_before_gc + mock_thread_pool.thread_pool_size);
3134+
// Run GC twice.
3135+
VecSimTieredIndex_GC(tiered_index);
3136+
// Expected that no new GC jobs were added to the queue.
3137+
ASSERT_EQ(mock_thread_pool.jobQ.size(), jobs_before_gc + mock_thread_pool.thread_pool_size);
3138+
// Wait for any pending jobs to complete. As far as SVS GC is done via a job.
3139+
mock_thread_pool.init_threads();
3140+
mock_thread_pool.thread_pool_join();
3141+
// Validate sizes after GC.
31313142
ASSERT_EQ(tiered_index->indexSize(), n - threshold);
31323143
ASSERT_EQ(tiered_index->GetBackendIndex()->indexSize(), n - threshold);
31333144
ASSERT_EQ(tiered_index->GetSVSIndex()->indexStorageSize(), n - threshold);
@@ -3138,6 +3149,68 @@ TYPED_TEST(SVSTieredIndexTestBasic, runGCAPI) {
31383149
EXPECT_EQ(tiered_index->statisticInfo().numberOfMarkedDeleted, 0);
31393150
}
31403151

3152+
TYPED_TEST(SVSTieredIndexTestBasic, runGCParallel) {
3153+
// Create TieredSVS index instance with a mock queue.
3154+
size_t dim = 4;
3155+
size_t threshold = 1024;
3156+
const size_t n = threshold * 4;
3157+
SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2};
3158+
VecSimParams svs_params = CreateParams(params);
3159+
auto mock_thread_pool = tieredIndexMock();
3160+
3161+
// Force trigger the first update job for 64 first vectors.
3162+
auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool, 64);
3163+
ASSERT_INDEX(tiered_index);
3164+
auto allocator = tiered_index->getAllocator();
3165+
3166+
// Insert n vectors directly to SVS.
3167+
std::srand(10); // create pseudo random generator with any arbitrary seed.
3168+
for (size_t i = 0; i < n; i++) {
3169+
TEST_DATA_T vector[dim];
3170+
for (size_t j = 0; j < dim; j++) {
3171+
vector[j] = std::rand() / (TEST_DATA_T)RAND_MAX;
3172+
}
3173+
VecSimIndex_AddVector(tiered_index->GetBackendIndex(), vector, i);
3174+
}
3175+
3176+
ASSERT_EQ(tiered_index->indexSize(), n);
3177+
ASSERT_EQ(tiered_index->GetBackendIndex()->indexSize(), n);
3178+
3179+
// Initialize the thread pool to start processing jobs.
3180+
mock_thread_pool.init_threads();
3181+
3182+
// Run the mess of add, delete, GC
3183+
for (size_t i = 0; i < threshold; i++) {
3184+
// Run GC for every 64 iterations.
3185+
if (i % 64 == 0) {
3186+
VecSimTieredIndex_GC(tiered_index);
3187+
}
3188+
// Add a new vector
3189+
TEST_DATA_T vector[dim];
3190+
for (size_t j = 0; j < dim; j++) {
3191+
vector[j] = std::rand() / (TEST_DATA_T)RAND_MAX;
3192+
}
3193+
VecSimIndex_AddVector(tiered_index, vector, n + i);
3194+
// Delete an existing vector
3195+
tiered_index->deleteVector(i + threshold);
3196+
}
3197+
// Final GC after all operations.
3198+
VecSimTieredIndex_GC(tiered_index);
3199+
// Wait for any pending jobs to complete. As far as SVS GC is done via a job.
3200+
mock_thread_pool.thread_pool_join();
3201+
3202+
// Validate sizes after GC.
3203+
auto tiered_size = tiered_index->indexSize();
3204+
auto flat_size = tiered_index->GetFlatIndex()->indexSize();
3205+
auto backend_size = tiered_index->GetBackendIndex()->indexSize();
3206+
ASSERT_EQ(tiered_size, n);
3207+
ASSERT_EQ(tiered_size, backend_size + flat_size);
3208+
// Expect that GC cleaned all the deleted vectors.
3209+
ASSERT_EQ(tiered_index->GetSVSIndex()->indexStorageSize(), backend_size);
3210+
ASSERT_EQ(tiered_index->GetSVSIndex()->getNumMarkedDeleted(), 0);
3211+
EXPECT_EQ(tiered_index->statisticInfo().numberOfMarkedDeleted, 0);
3212+
}
3213+
31413214
TYPED_TEST(SVSTieredIndexTestBasic, switchDeleteModes) {
31423215
// Create TieredSVS index instance with a mock queue.
31433216
size_t dim = 16;

0 commit comments

Comments
 (0)