Skip to content

Commit 3e006ea

Browse files
authored
Replace deleted elements at addition (#418)
* Replace deleted elements at insertion * Add multithread stress tests * Add timeout to jobs in actions * Add locks by label * Remove python 3.6 tests as it is not available in Ubuntu 22.04 * Fix multithread update of elements * Update readme and refactoring
1 parent 983cea9 commit 3e006ea

15 files changed

+884
-81
lines changed

.github/workflows/build.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
strategy:
99
matrix:
1010
os: [ubuntu-latest, windows-latest]
11-
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
11+
python-version: ["3.7", "3.8", "3.9", "3.10"]
1212
steps:
1313
- uses: actions/checkout@v3
1414
- uses: actions/setup-python@v4
@@ -19,6 +19,7 @@ jobs:
1919
run: python -m pip install .
2020

2121
- name: Test
22+
timeout-minutes: 15
2223
run: python -m unittest discover -v --start-directory python_bindings/tests --pattern "*_test*.py"
2324

2425
test_cpp:
@@ -52,13 +53,16 @@ jobs:
5253
shell: bash
5354

5455
- name: Test
56+
timeout-minutes: 15
5557
run: |
5658
cd build
5759
if [ "$RUNNER_OS" == "Windows" ]; then
5860
cp ./Release/* ./
5961
fi
6062
./searchKnnCloserFirst_test
6163
./searchKnnWithFilter_test
64+
./multiThreadLoad_test
65+
./multiThread_replace_test
6266
./test_updates
6367
./test_updates update
6468
shell: bash

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ var/
99
.idea/
1010
.vscode/
1111
.vs/
12+
**.DS_Store

CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME)
2525
add_executable(searchKnnWithFilter_test examples/searchKnnWithFilter_test.cpp)
2626
target_link_libraries(searchKnnWithFilter_test hnswlib)
2727

28+
add_executable(multiThreadLoad_test examples/multiThreadLoad_test.cpp)
29+
target_link_libraries(multiThreadLoad_test hnswlib)
30+
31+
add_executable(multiThread_replace_test examples/multiThread_replace_test.cpp)
32+
target_link_libraries(multiThread_replace_test hnswlib)
33+
2834
add_executable(main main.cpp sift_1b.cpp)
2935
target_link_libraries(main hnswlib)
3036
endif()

README.md

Lines changed: 109 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,33 +54,38 @@ For other spaces use the nmslib library https://github.com/nmslib/nmslib.
5454
* `hnswlib.Index(space, dim)` creates a non-initialized index an HNSW in space `space` with integer dimension `dim`.
5555

5656
`hnswlib.Index` methods:
57-
* `init_index(max_elements, M = 16, ef_construction = 200, random_seed = 100)` initializes the index from with no elements.
57+
* `init_index(max_elements, M = 16, ef_construction = 200, random_seed = 100, allow_replace_deleted = False)` initializes the index from with no elements.
5858
* `max_elements` defines the maximum number of elements that can be stored in the structure(can be increased/shrunk).
5959
* `ef_construction` defines a construction time/accuracy trade-off (see [ALGO_PARAMS.md](ALGO_PARAMS.md)).
6060
* `M` defines tha maximum number of outgoing connections in the graph ([ALGO_PARAMS.md](ALGO_PARAMS.md)).
61+
* `allow_replace_deleted` enables replacing of deleted elements with new added ones.
6162

62-
* `add_items(data, ids, num_threads = -1)` - inserts the `data`(numpy array of vectors, shape:`N*dim`) into the structure.
63+
* `add_items(data, ids, num_threads = -1, replace_deleted = False)` - inserts the `data`(numpy array of vectors, shape:`N*dim`) into the structure.
6364
* `num_threads` sets the number of cpu threads to use (-1 means use default).
6465
* `ids` are optional N-size numpy array of integer labels for all elements in `data`.
6566
- If index already has the elements with the same labels, their features will be updated. Note that update procedure is slower than insertion of a new element, but more memory- and query-efficient.
67+
* `replace_deleted` replaces deleted elements. Note it allows to save memory.
68+
- to use it `init_index` should be called with `allow_replace_deleted=True`
6669
* Thread-safe with other `add_items` calls, but not with `knn_query`.
6770

6871
* `mark_deleted(label)` - marks the element as deleted, so it will be omitted from search results. Throws an exception if it is already deleted.
69-
*
72+
7073
* `unmark_deleted(label)` - unmarks the element as deleted, so it will be not be omitted from search results.
7174

7275
* `resize_index(new_size)` - changes the maximum capacity of the index. Not thread safe with `add_items` and `knn_query`.
7376

7477
* `set_ef(ef)` - sets the query time accuracy/speed trade-off, defined by the `ef` parameter (
7578
[ALGO_PARAMS.md](ALGO_PARAMS.md)). Note that the parameter is currently not saved along with the index, so you need to set it manually after loading.
7679

77-
* `knn_query(data, k = 1, num_threads = -1)` make a batch query for `k` closest elements for each element of the
80+
* `knn_query(data, k = 1, num_threads = -1, filter = None)` make a batch query for `k` closest elements for each element of the
7881
* `data` (shape:`N*dim`). Returns a numpy array of (shape:`N*k`).
7982
* `num_threads` sets the number of cpu threads to use (-1 means use default).
83+
* `filter` filters elements by its labels, returns elements with allowed ids
8084
* Thread-safe with other `knn_query` calls, but not with `add_items`.
8185

82-
* `load_index(path_to_index, max_elements = 0)` loads the index from persistence to the uninitialized index.
86+
* `load_index(path_to_index, max_elements = 0, allow_replace_deleted = False)` loads the index from persistence to the uninitialized index.
8387
* `max_elements`(optional) resets the maximum number of elements in the structure.
88+
* `allow_replace_deleted` specifies whether the index being loaded has enabled replacing of deleted elements.
8489

8590
* `save_index(path_to_index)` saves the index from persistence.
8691

@@ -142,7 +147,7 @@ p.add_items(data, ids)
142147
# Controlling the recall by setting ef:
143148
p.set_ef(50) # ef should always be > k
144149

145-
# Query dataset, k - number of closest elements (returns 2 numpy arrays)
150+
# Query dataset, k - number of the closest elements (returns 2 numpy arrays)
146151
labels, distances = p.knn_query(data, k = 1)
147152

148153
# Index objects support pickling
@@ -155,7 +160,6 @@ print(f"Parameters passed to constructor: space={p_copy.space}, dim={p_copy.dim
155160
print(f"Index construction: M={p_copy.M}, ef_construction={p_copy.ef_construction}")
156161
print(f"Index size is {p_copy.element_count} and index capacity is {p_copy.max_elements}")
157162
print(f"Search speed/quality trade-off parameter: ef={p_copy.ef}")
158-
159163
```
160164

161165
An example with updates after serialization/deserialization:
@@ -196,7 +200,6 @@ p.set_ef(10)
196200
# By default using all available cores
197201
p.set_num_threads(4)
198202

199-
200203
print("Adding first batch of %d elements" % (len(data1)))
201204
p.add_items(data1)
202205

@@ -226,6 +229,104 @@ labels, distances = p.knn_query(data, k=1)
226229
print("Recall for two batches:", np.mean(labels.reshape(-1) == np.arange(len(data))), "\n")
227230
```
228231

232+
An example with a filter:
233+
```python
234+
import hnswlib
235+
import numpy as np
236+
237+
dim = 16
238+
num_elements = 10000
239+
240+
# Generating sample data
241+
data = np.float32(np.random.random((num_elements, dim)))
242+
243+
# Declaring index
244+
hnsw_index = hnswlib.Index(space='l2', dim=dim) # possible options are l2, cosine or ip
245+
246+
# Initiating index
247+
# max_elements - the maximum number of elements, should be known beforehand
248+
# (probably will be made optional in the future)
249+
#
250+
# ef_construction - controls index search speed/build speed tradeoff
251+
# M - is tightly connected with internal dimensionality of the data
252+
# strongly affects the memory consumption
253+
254+
hnsw_index.init_index(max_elements=num_elements, ef_construction=100, M=16)
255+
256+
# Controlling the recall by setting ef:
257+
# higher ef leads to better accuracy, but slower search
258+
hnsw_index.set_ef(10)
259+
260+
# Set number of threads used during batch search/construction
261+
# By default using all available cores
262+
hnsw_index.set_num_threads(4)
263+
264+
print("Adding %d elements" % (len(data)))
265+
# Added elements will have consecutive ids
266+
hnsw_index.add_items(data, ids=np.arange(num_elements))
267+
268+
print("Querying only even elements")
269+
# Define filter function that allows only even ids
270+
filter_function = lambda idx: idx%2 == 0
271+
# Query the elements for themselves and search only for even elements:
272+
labels, distances = hnsw_index.knn_query(data, k=1, filter=filter_function)
273+
# labels contain only elements with even id
274+
```
275+
276+
An example with replacing of deleted elements:
277+
```python
278+
import hnswlib
279+
import numpy as np
280+
281+
dim = 16
282+
num_elements = 1_000
283+
max_num_elements = 2 * num_elements
284+
285+
# Generating sample data
286+
labels1 = np.arange(0, num_elements)
287+
data1 = np.float32(np.random.random((num_elements, dim))) # batch 1
288+
labels2 = np.arange(num_elements, 2 * num_elements)
289+
data2 = np.float32(np.random.random((num_elements, dim))) # batch 2
290+
labels3 = np.arange(2 * num_elements, 3 * num_elements)
291+
data3 = np.float32(np.random.random((num_elements, dim))) # batch 3
292+
293+
# Declaring index
294+
hnsw_index = hnswlib.Index(space='l2', dim=dim)
295+
296+
# Initiating index
297+
# max_elements - the maximum number of elements, should be known beforehand
298+
# (probably will be made optional in the future)
299+
#
300+
# ef_construction - controls index search speed/build speed tradeoff
301+
# M - is tightly connected with internal dimensionality of the data
302+
# strongly affects the memory consumption
303+
304+
# Enable replacing of deleted elements
305+
hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, allow_replace_deleted=True)
306+
307+
# Controlling the recall by setting ef:
308+
# higher ef leads to better accuracy, but slower search
309+
hnsw_index.set_ef(10)
310+
311+
# Set number of threads used during batch search/construction
312+
# By default using all available cores
313+
hnsw_index.set_num_threads(4)
314+
315+
# Add batch 1 and 2 data
316+
hnsw_index.add_items(data1, labels1)
317+
hnsw_index.add_items(data2, labels2) # Note: maximum number of elements is reached
318+
319+
# Delete data of batch 2
320+
for label in labels2:
321+
hnsw_index.mark_deleted(label)
322+
323+
# Replace deleted elements
324+
# Maximum number of elements is reached therefore we cannot add new items,
325+
# but we can replace the deleted ones by using replace_deleted=True
326+
hnsw_index.add_items(data3, labels3, replace_deleted=True)
327+
# hnsw_index contains the data of batch 1 and batch 3 only
328+
```
329+
229330
### Bindings installation
230331

231332
You can install from sources:

examples/multiThreadLoad_test.cpp

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#include "../hnswlib/hnswlib.h"
2+
#include <thread>
3+
#include <chrono>
4+
5+
6+
int main() {
7+
std::cout << "Running multithread load test" << std::endl;
8+
int d = 16;
9+
int max_elements = 1000;
10+
11+
std::mt19937 rng;
12+
rng.seed(47);
13+
std::uniform_real_distribution<> distrib_real;
14+
15+
hnswlib::L2Space space(d);
16+
hnswlib::HierarchicalNSW<float>* alg_hnsw = new hnswlib::HierarchicalNSW<float>(&space, 2 * max_elements);
17+
18+
std::cout << "Building index" << std::endl;
19+
int num_threads = 40;
20+
int num_labels = 10;
21+
22+
int num_iterations = 10;
23+
int start_label = 0;
24+
25+
// run threads that will add elements to the index
26+
// about 7 threads (the number depends on num_threads and num_labels)
27+
// will add/update element with the same label simultaneously
28+
while (true) {
29+
// add elements by batches
30+
std::uniform_int_distribution<> distrib_int(start_label, start_label + num_labels - 1);
31+
std::vector<std::thread> threads;
32+
for (size_t thread_id = 0; thread_id < num_threads; thread_id++) {
33+
threads.push_back(
34+
std::thread(
35+
[&] {
36+
for (int iter = 0; iter < num_iterations; iter++) {
37+
std::vector<float> data(d);
38+
hnswlib::labeltype label = distrib_int(rng);
39+
for (int i = 0; i < d; i++) {
40+
data[i] = distrib_real(rng);
41+
}
42+
alg_hnsw->addPoint(data.data(), label);
43+
}
44+
}
45+
)
46+
);
47+
}
48+
for (auto &thread : threads) {
49+
thread.join();
50+
}
51+
if (alg_hnsw->cur_element_count > max_elements - num_labels) {
52+
break;
53+
}
54+
start_label += num_labels;
55+
}
56+
57+
// insert remaining elements if needed
58+
for (hnswlib::labeltype label = 0; label < max_elements; label++) {
59+
auto search = alg_hnsw->label_lookup_.find(label);
60+
if (search == alg_hnsw->label_lookup_.end()) {
61+
std::cout << "Adding " << label << std::endl;
62+
std::vector<float> data(d);
63+
for (int i = 0; i < d; i++) {
64+
data[i] = distrib_real(rng);
65+
}
66+
alg_hnsw->addPoint(data.data(), label);
67+
}
68+
}
69+
70+
std::cout << "Index is created" << std::endl;
71+
72+
bool stop_threads = false;
73+
std::vector<std::thread> threads;
74+
75+
// create threads that will do markDeleted and unmarkDeleted of random elements
76+
// each thread works with specific range of labels
77+
std::cout << "Starting markDeleted and unmarkDeleted threads" << std::endl;
78+
num_threads = 20;
79+
int chunk_size = max_elements / num_threads;
80+
for (size_t thread_id = 0; thread_id < num_threads; thread_id++) {
81+
threads.push_back(
82+
std::thread(
83+
[&, thread_id] {
84+
std::uniform_int_distribution<> distrib_int(0, chunk_size - 1);
85+
int start_id = thread_id * chunk_size;
86+
std::vector<bool> marked_deleted(chunk_size);
87+
while (!stop_threads) {
88+
int id = distrib_int(rng);
89+
hnswlib::labeltype label = start_id + id;
90+
if (marked_deleted[id]) {
91+
alg_hnsw->unmarkDelete(label);
92+
marked_deleted[id] = false;
93+
} else {
94+
alg_hnsw->markDelete(label);
95+
marked_deleted[id] = true;
96+
}
97+
}
98+
}
99+
)
100+
);
101+
}
102+
103+
// create threads that will add and update random elements
104+
std::cout << "Starting add and update elements threads" << std::endl;
105+
num_threads = 20;
106+
std::uniform_int_distribution<> distrib_int_add(max_elements, 2 * max_elements - 1);
107+
for (size_t thread_id = 0; thread_id < num_threads; thread_id++) {
108+
threads.push_back(
109+
std::thread(
110+
[&] {
111+
std::vector<float> data(d);
112+
while (!stop_threads) {
113+
hnswlib::labeltype label = distrib_int_add(rng);
114+
for (int i = 0; i < d; i++) {
115+
data[i] = distrib_real(rng);
116+
}
117+
alg_hnsw->addPoint(data.data(), label);
118+
std::vector<float> data = alg_hnsw->getDataByLabel<float>(label);
119+
float max_val = *max_element(data.begin(), data.end());
120+
// never happens but prevents compiler from deleting unused code
121+
if (max_val > 10) {
122+
throw std::runtime_error("Unexpected value in data");
123+
}
124+
}
125+
}
126+
)
127+
);
128+
}
129+
130+
std::cout << "Sleep and continue operations with index" << std::endl;
131+
int sleep_ms = 60 * 1000;
132+
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
133+
stop_threads = true;
134+
for (auto &thread : threads) {
135+
thread.join();
136+
}
137+
138+
std::cout << "Finish" << std::endl;
139+
return 0;
140+
}

0 commit comments

Comments
 (0)