Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 140 additions & 47 deletions tests/unit/test_hnsw_parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,64 @@ class HNSWTestParallel : public ::testing::Test {
}
}
}

/**
* Serialize the connections of the index for debugging purposes.
* Example output:
* index connections: {
* Entry Point Label: 3
*
* Node 0:
* Level 0 neighbors:
* 1, 2, 3, 4, 5,
* Node 1:
* Level 0 neighbors:
* 0, 2, 3,
* Node 2:
* Level 0 neighbors:
* 0, 1, 3,
* Node 3:
* Level 0 neighbors:
* 0, 1, 2, 4, 5,
* Level 1 neighbors:
* 4,
* Node 4:
* Level 0 neighbors:
* 0, 3, 5,
* Level 1 neighbors:
* 3,
* Node 5:
* Level 0 neighbors:
* 0, 3, 4,
* }
*/
std::string serializeIndexConnections(VecSimIndex *index) const {
std::string res("index connections: {");
auto *hnsw_index = CastToHNSW(index);

if (index->indexSize() > 0) {
res += "\nEntry Point Label: ";
res += std::to_string(hnsw_index->getEntryPointLabel()) + "\n";
}
for (idType id = 0; id < index->indexSize(); id++) {
labelType label = hnsw_index->getExternalLabel(id);
if (label == SIZE_MAX)
continue; // The ID is not in the index
int **neighbors_output;
VecSimDebug_GetElementNeighborsInHNSWGraph(index, label, &neighbors_output);
res += "\nNode " + std::to_string(label) + ":";
for (size_t l = 0; neighbors_output[l]; l++) {
res += "\n\tLevel " + std::to_string(l) + " neighbors:\n\t\t";
auto &neighbours = neighbors_output[l];
auto neighbours_count = neighbours[0];
for (size_t j = 1; j <= neighbours_count; j++) {
res += std::to_string(neighbours[j]) + ", ";
}
}
VecSimDebug_ReleaseElementNeighborsInHNSWGraph(neighbors_output);
}
return res + "}";
}
};

// DataTypeSet, TEST_DATA_T and TEST_DIST_T are defined in test_utils.h
Expand Down Expand Up @@ -112,9 +170,11 @@ TYPED_TEST(HNSWTestParallel, parallelSearchKnn) {
// label and the query val (query_val, query_val-1, query_val+1, query_val-2,
// query_val+2, ...) The score is the L2 distance between the vectors that correspond
// the ids.
size_t diff_id = (id > query_val) ? (id - query_val) : (query_val - id);
ASSERT_EQ(diff_id, (res_index + 1) / 2);
ASSERT_EQ(score, (dim * (diff_id * diff_id)));
int sign = (res_index % 2 == 0) ? 1 : -1;
size_t expected_id = query_val + (sign * int((res_index + 1) / 2));
double expected_score = dim * ((res_index + 1) / 2) * ((res_index + 1) / 2);
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
runTopKSearchTest(index, query, k, verify_res);
successful_searches++;
Expand All @@ -128,6 +188,7 @@ TYPED_TEST(HNSWTestParallel, parallelSearchKnn) {
for (size_t i = 0; i < n_threads; i++) {
thread_objs[i].join();
}
ASSERT_FALSE(testing::Test::HasFatalFailure()) << this->serializeIndexConnections(index);
ASSERT_EQ(successful_searches, n_threads);

// Validate that every thread executed a single job.
Expand Down Expand Up @@ -173,10 +234,12 @@ TYPED_TEST(HNSWTestParallel, parallelSearchKNNMulti) {
TEST_DATA_T query_val = 50 + myID;
TEST_DATA_T query[dim];
GenerateVector<TEST_DATA_T>(query, dim, query_val);
auto verify_res = [&](size_t id, double score, size_t res_index) {
size_t diff_id = (id > query_val) ? (id - query_val) : (query_val - id);
ASSERT_EQ(diff_id, (res_index + 1) / 2);
ASSERT_EQ(score, (dim * ((res_index + 1) / 2) * ((res_index + 1) / 2)));
auto verify_res = [=](size_t id, double score, size_t res_index) {
int sign = (res_index % 2 == 0) ? 1 : -1;
size_t expected_id = query_val + (sign * int((res_index + 1) / 2));
double expected_score = dim * ((res_index + 1) / 2) * ((res_index + 1) / 2);
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
runTopKSearchTest(index, query, k, verify_res);
successful_searches++;
Expand All @@ -189,6 +252,7 @@ TYPED_TEST(HNSWTestParallel, parallelSearchKNNMulti) {
for (size_t i = 0; i < n_threads; i++) {
thread_objs[i].join();
}
ASSERT_FALSE(testing::Test::HasFatalFailure()) << this->serializeIndexConnections(index);
ASSERT_EQ(successful_searches, n_threads);
// Validate that every thread executed a single job.
ASSERT_EQ(*std::min_element(completed_tasks.begin(), completed_tasks.end()), 1);
Expand Down Expand Up @@ -234,28 +298,33 @@ TYPED_TEST(HNSWTestParallel, parallelSearchCombined) {
// label and the query val (query_val, query_val-1, query_val+1, query_val-2,
// query_val+2, ...) The score is the L2 distance between the vectors that correspond
// the ids.
size_t diff_id = std::abs(id - query_val);
ASSERT_EQ(diff_id, (res_index + 1) / 2);
ASSERT_EQ(score, (dim * (diff_id * diff_id)));
int sign = (res_index % 2 == 0) ? 1 : -1;
size_t expected_id = query_val + (sign * int((res_index + 1) / 2));
double expected_score = dim * ((res_index + 1) / 2) * ((res_index + 1) / 2);
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
runTopKSearchTest(index, query, k, verify_res);
successful_searches++;
};

auto parallel_range_search = [&](int myID) {
completed_tasks[myID]++;
TEST_DATA_T pivot_id = 100 + myID;
TEST_DATA_T pivot_id = 100.01 + myID;
TEST_DATA_T query[dim];
GenerateVector<TEST_DATA_T>(query, dim, pivot_id);
auto verify_res_by_score = [&](size_t id, double score, size_t res_index) {
size_t diff_id = std::abs(id - pivot_id);
ASSERT_EQ(diff_id, (res_index + 1) / 2);
ASSERT_EQ(score, dim * (diff_id * diff_id));
int sign = (res_index % 2 == 0) ? -1 : 1;
size_t expected_id = pivot_id + (sign * int((res_index + 1) / 2));
double factor = ((res_index + 1) / 2) - sign * 0.01;
double expected_score = dim * factor * factor;
ASSERT_EQ(id, expected_id);
ASSERT_NEAR(score, expected_score, 0.01);
};
uint expected_num_results = 11;
// To get 11 results in the range [pivot_id-5, pivot_id+5], set the radius as the L2 score
// in the boundaries.
double radius = (double)dim * pow((double)expected_num_results / 2, 2);
double radius = dim * expected_num_results * expected_num_results / 4.0;
runRangeQueryTest(index, query, radius, verify_res_by_score, expected_num_results,
BY_SCORE);
successful_searches++;
Expand All @@ -281,7 +350,7 @@ TYPED_TEST(HNSWTestParallel, parallelSearchCombined) {
expected_ids[i] = (n - iteration_num * n_res - i - 1);
}
auto verify_res = [&](size_t id, double score, size_t res_index) {
ASSERT_TRUE(expected_ids[res_index] == id);
ASSERT_EQ(expected_ids[res_index], id);
};
runBatchIteratorSearchTest(batchIterator, n_res, verify_res);
iteration_num++;
Expand All @@ -305,6 +374,8 @@ TYPED_TEST(HNSWTestParallel, parallelSearchCombined) {
for (size_t i = 0; i < n_threads; i++) {
thread_objs[i].join();
}
ASSERT_FALSE(testing::Test::HasFatalFailure()) << this->serializeIndexConnections(index);

ASSERT_EQ(successful_searches, n_threads);
// Validate that every thread executed a single job.
ASSERT_EQ(*std::min_element(completed_tasks.begin(), completed_tasks.end()), 1);
Expand Down Expand Up @@ -332,7 +403,7 @@ TYPED_TEST(HNSWTestParallel, parallelInsert) {
.M = 16,
.efConstruction = 200};

VecSimIndex *parallel_index = this->CreateNewIndex(params);
VecSimIndex *index = this->CreateNewIndex(params);
size_t n_threads = 10;

// Save the number fo tasks done by thread i in the i-th entry.
Expand All @@ -341,7 +412,7 @@ TYPED_TEST(HNSWTestParallel, parallelInsert) {
auto parallel_insert = [&](int myID) {
for (labelType label = myID; label < n; label += n_threads) {
completed_tasks[myID]++;
GenerateAndAddVector<TEST_DATA_T>(parallel_index, dim, label, label);
GenerateAndAddVector<TEST_DATA_T>(index, dim, label, label);
}
};
std::thread thread_objs[n_threads];
Expand All @@ -351,24 +422,28 @@ TYPED_TEST(HNSWTestParallel, parallelInsert) {
for (size_t i = 0; i < n_threads; i++) {
thread_objs[i].join();
}
ASSERT_EQ(VecSimIndex_IndexSize(parallel_index), n);
ASSERT_EQ(VecSimIndex_IndexSize(index), n);
// Validate that every thread executed n/n_threads jobs.
ASSERT_EQ(*std::min_element(completed_tasks.begin(), completed_tasks.end()), n / n_threads);
ASSERT_EQ(*std::max_element(completed_tasks.begin(), completed_tasks.end()),
ceil((double)n / n_threads));

TEST_DATA_T query[dim];
GenerateVector<TEST_DATA_T>(query, dim, (TEST_DATA_T)n / 2);
TEST_DATA_T query_val = (TEST_DATA_T)n / 2;
GenerateVector<TEST_DATA_T>(query, dim, query_val);
auto verify_res = [&](size_t id, double score, size_t res_index) {
// We expect to get the results with increasing order of the distance between the res
// label and the query val (n/2, n/2-1, n/2+1, n/2-2, n/2+2, ...) The score is the L2
// distance between the vectors that correspond the ids.
size_t diff_id = std::abs(int(id - n / 2));
ASSERT_EQ(diff_id, (res_index + 1) / 2);
ASSERT_EQ(score, (dim * (diff_id * diff_id)));
int sign = (res_index % 2 == 0) ? 1 : -1;
size_t expected_id = query_val + (sign * int((res_index + 1) / 2));
double expected_score = dim * ((res_index + 1) / 2) * ((res_index + 1) / 2);
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
runTopKSearchTest(parallel_index, query, k, verify_res);
VecSimIndex_Free(parallel_index);
runTopKSearchTest(index, query, k, verify_res);
ASSERT_FALSE(testing::Test::HasFatalFailure()) << this->serializeIndexConnections(index);
VecSimIndex_Free(index);
}

TYPED_TEST(HNSWTestParallel, parallelInsertMulti) {
Expand All @@ -384,15 +459,15 @@ TYPED_TEST(HNSWTestParallel, parallelInsertMulti) {
.M = 16,
.efConstruction = 200};

VecSimIndex *parallel_index = this->CreateNewIndex(params, true);
VecSimIndex *index = this->CreateNewIndex(params, true);
size_t n_threads = 10;

// Save the number fo tasks done by thread i in the i-th entry.
std::vector<size_t> completed_tasks(n_threads, 0);
auto parallel_insert = [&](int myID) {
for (size_t i = myID; i < n; i += n_threads) {
completed_tasks[myID]++;
GenerateAndAddVector<TEST_DATA_T>(parallel_index, dim, i % n_labels, i);
GenerateAndAddVector<TEST_DATA_T>(index, dim, i % n_labels, i);
}
};
std::thread thread_objs[n_threads];
Expand All @@ -402,25 +477,29 @@ TYPED_TEST(HNSWTestParallel, parallelInsertMulti) {
for (size_t i = 0; i < n_threads; i++) {
thread_objs[i].join();
}
ASSERT_EQ(VecSimIndex_IndexSize(parallel_index), n);
ASSERT_EQ(VecSimIndex_IndexSize(index), n);
// Validate that every thread executed n/n_threads jobs.
ASSERT_EQ(*std::min_element(completed_tasks.begin(), completed_tasks.end()), n / n_threads);
ASSERT_EQ(*std::max_element(completed_tasks.begin(), completed_tasks.end()),
ceil((double)n / n_threads));

TEST_DATA_T query[dim];
TEST_DATA_T query_val = (TEST_DATA_T)n / 2 + 10;
size_t query_val = n / 2 + 10;
size_t pivot_val = (size_t)query_val % n_labels;
GenerateVector<TEST_DATA_T>(query, dim, (TEST_DATA_T)query_val);
auto verify_res = [&](size_t id, double score, size_t res_index) {
// We expect to get the results with increasing order of the distance between the res
// label and query_val%n_labels (that is ids 10, 9, 11, ... for the current arguments).
// The score is the L2 distance between the vectors that correspond the ids.
size_t diff_id = std::abs(int(id - (size_t)query_val % n_labels));
ASSERT_EQ(diff_id, (res_index + 1) / 2);
ASSERT_EQ(score, (dim * (diff_id * diff_id)));
int sign = (res_index % 2 == 0) ? 1 : -1;
size_t expected_id = pivot_val + (sign * int((res_index + 1) / 2));
double expected_score = dim * ((res_index + 1) / 2) * ((res_index + 1) / 2);
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
runTopKSearchTest(parallel_index, query, k, verify_res);
VecSimIndex_Free(parallel_index);
runTopKSearchTest(index, query, k, verify_res);
ASSERT_FALSE(testing::Test::HasFatalFailure()) << this->serializeIndexConnections(index);
VecSimIndex_Free(index);
}

TYPED_TEST(HNSWTestParallel, parallelInsertSearch) {
Expand Down Expand Up @@ -461,9 +540,11 @@ TYPED_TEST(HNSWTestParallel, parallelInsertSearch) {
// We expect to get the results with increasing order of the distance between the
// res label and the query val (n/4, n/4-1, n/4+1, n/4-2, n/4+2, ...) The score is
// the L2 distance between the vectors that correspond the ids.
size_t diff_id = std::abs(int(id - query_val));
ASSERT_EQ(diff_id, (res_index + 1) / 2);
ASSERT_EQ(score, (dim * (diff_id * diff_id)));
int sign = (res_index % 2 == 0) ? 1 : -1;
size_t expected_id = query_val + (sign * int((res_index + 1) / 2));
double expected_score = dim * ((res_index + 1) / 2) * ((res_index + 1) / 2);
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
runTopKSearchTest(parallel_index, query, k, verify_res);
successful_searches++;
Expand Down Expand Up @@ -508,6 +589,8 @@ TYPED_TEST(HNSWTestParallel, parallelInsertSearch) {
1);
ASSERT_EQ(*std::max_element(completed_tasks.begin() + n_threads / 2, completed_tasks.end()),
1);
ASSERT_FALSE(testing::Test::HasFatalFailure())
<< this->serializeIndexConnections(hnsw_index);
VecSimIndex_Free(parallel_index);
}
}
Expand Down Expand Up @@ -629,9 +712,12 @@ TYPED_TEST(HNSWTestParallel, parallelRepairSearch) {
// res label and the query val and only odd labels (query_val-1, query_val+1,
// query_val-3, query_val+3, ...) The score is the L2 distance between the vectors that
// correspond the ids.
size_t diff_id = std::abs(int(id - query_val));
ASSERT_EQ(diff_id, res_index + (1 - res_index % 2));
ASSERT_EQ(score, (dim * (diff_id * diff_id)));
int sign = (res_index % 2 == 0) ? -1 : 1;
int next_odd = res_index | 1;
size_t expected_id = query_val + (sign * next_odd);
double expected_score = dim * next_odd * next_odd;
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
do {
runTopKSearchTest(hnsw_index, query, k, verify_res);
Expand All @@ -658,6 +744,7 @@ TYPED_TEST(HNSWTestParallel, parallelRepairSearch) {
thread_objs[i].join();
}

ASSERT_FALSE(testing::Test::HasFatalFailure()) << this->serializeIndexConnections(hnsw_index);
// Check index integrity, also make sure that no node is pointing to a deleted node.
auto report = hnsw_index->checkIntegrity();
ASSERT_TRUE(report.valid_state);
Expand Down Expand Up @@ -755,11 +842,15 @@ TYPED_TEST(HNSWTestParallel, parallelRepairInsert) {
// We expect to get the results with increasing order of the distance between the
// res label and the query val (3n/4, 3n/4 - 2, 3n/4 + 2, 3n/4 - 4 3n/4 + 4, ...) The score
// is the L2 distance between the vectors that correspond the ids.
size_t diff_id = std::abs(int(id - query_val));
ASSERT_EQ(diff_id, res_index % 2 ? res_index + 1 : res_index);
ASSERT_EQ(score, (dim * (diff_id * diff_id)));
int sign = (res_index % 2 == 0) ? 1 : -1;
int next_even = 2 * ((res_index + 1) / 2); // (res_index % 2 ? res_index+1 : res_index;
size_t expected_id = query_val + (sign * next_even);
double expected_score = dim * next_even * next_even;
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
runTopKSearchTest(hnsw_index, query, k, verify_res_even);
ASSERT_FALSE(testing::Test::HasFatalFailure()) << this->serializeIndexConnections(hnsw_index);

// Around n/4 we should have all vectors (even and odd).
query_val = n / 4;
Expand All @@ -768,10 +859,12 @@ TYPED_TEST(HNSWTestParallel, parallelRepairInsert) {
// We expect to get the results with increasing order of the distance between the
// res label and the query val (n/4, n/4 - 1, n/4 + 1, n/4 - 2 n/4 + 2, ...) The score
// is the L2 distance between the vectors that correspond the ids.
size_t diff_id = std::abs(int(id - query_val));
ASSERT_EQ(diff_id, (res_index + 1) / 2);
ASSERT_EQ(score, (dim * (diff_id * diff_id)));
int sign = (res_index % 2 == 0) ? 1 : -1;
size_t expected_id = query_val + sign * int((res_index + 1) / 2);
double expected_score = dim * ((res_index + 1) / 2) * ((res_index + 1) / 2);
ASSERT_EQ(id, expected_id);
ASSERT_DOUBLE_EQ(score, expected_score);
};
runTopKSearchTest(hnsw_index, query, k, verify_res);
VecSimIndex_Free(hnsw_index);
ASSERT_FALSE(testing::Test::HasFatalFailure()) << this->serializeIndexConnections(hnsw_index);
}
Loading