Skip to content

Commit 24cef8a

Browse files
committed
Don't copy data to job
1 parent fae0543 commit 24cef8a

File tree

2 files changed

+79
-38
lines changed

2 files changed

+79
-38
lines changed

docs/disk_hnsw_multithreaded_architecture.md

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,27 @@
22

33
## Overview
44

5-
This document describes the architectural changes introduced in the `dorer-disk-poc-add-delete-mt` branch compared to the original `disk-poc` branch. The focus is on multi-threading, synchronization, concurrency in writing to disk, and performance enhancements.
5+
This document describes the multi-threaded architecture of the HNSWDisk index, focusing on synchronization, concurrency in writing to disk, and performance enhancements.
66

7-
## Key Architectural Changes
7+
## Key Architectural Components
88

9-
### 1. Insertion Mode
9+
### 1. Lightweight Insert Jobs
1010

11-
**Previous single threaded approach:** Vectors were accumulated in batches before being written to disk, requiring complex coordination between threads.
12-
13-
14-
**Current approach:** Each insert job is self-contained and can write directly to disk upon completion, optimized for workloads where disk writes are cheap but neighbor searching (reads from disk) is the bottleneck.
11+
Each insert job is lightweight and only stores metadata (vectorId, elementMaxLevel). Vector data is looked up from shared storage when the job executes, minimizing memory usage when many jobs are queued.
1512

1613
```
1714
┌──────────────────────────────────────────────────────────────┐
1815
│ HNSWDiskSingleInsertJob │
1916
├──────────────────────────────────────────────────────────────┤
2017
│ - vectorId │
2118
│ - elementMaxLevel │
22-
│ - rawVectorData (copied into job - no external references) │
23-
│ - processedVectorData (quantized vector for distance calc) │
2419
└──────────────────────────────────────────────────────────────┘
2520
```
2621

22+
At execution time, jobs access vector data via:
23+
- **Raw vectors**: `shared_ptr` from `rawVectorsInRAM` (refcount increment, no copy)
24+
- **Processed vectors**: Direct access from `this->vectors` container
25+
2726
### 2. Segmented Neighbor Cache
2827

2928
To reduce lock contention in multi-threaded scenarios, the neighbor changes cache is partitioned into **64 independent segments**:
@@ -39,7 +38,7 @@ struct alignas(64) CacheSegment {
3938
};
4039
```
4140
Note:
42-
NUM_CACHE_SEGMENTS can be changes which will be cause better separation of the cache,
41+
NUM_CACHE_SEGMENTS can be changed which will cause better separation of the cache,
4342
but will require more RAM usage.
4443

4544
**Key benefits:**
@@ -146,10 +145,12 @@ addVector()
146145
│ └── executeGraphInsertionCore() [inline]
147146
148147
└── Multi-threaded path (with job queue):
149-
├── Create HNSWDiskSingleInsertJob (copies vector data)
148+
├── Create HNSWDiskSingleInsertJob (just vectorId + level, no vector copy)
150149
├── Submit via SubmitJobsToQueue callback
151150
└── Worker thread executes:
152151
└── executeSingleInsertJob()
152+
├── Get shared_ptr to raw vector from rawVectorsInRAM
153+
├── Get processed vector from this->vectors
153154
└── executeGraphInsertionCore()
154155
```
155156

@@ -159,12 +160,16 @@ addVector()
159160
struct HNSWDiskSingleInsertJob : public AsyncJob {
160161
idType vectorId;
161162
size_t elementMaxLevel;
162-
std::string rawVectorData; // Copied - no external references
163-
std::string processedVectorData; // Quantized data for distance calc
163+
// No vector data stored - looked up from index when job executes
164+
// This saves memory: 100M pending jobs don't need 100M vector copies
164165
};
165166
```
166167
167-
Copying vector data into the job eliminates race conditions with the caller's buffer.
168+
Jobs look up vector data at execution time:
169+
- **Raw vectors**: Accessed via `shared_ptr` from `rawVectorsInRAM` (just increments refcount, no copy)
170+
- **Processed vectors**: Accessed from `this->vectors` container
171+
172+
This eliminates memory duplication while maintaining thread safety through reference counting.
168173
169174
## Data Flow During Insert
170175
@@ -222,13 +227,33 @@ Nodes created in the current batch are tracked in `cacheSegment.newNodes`:
222227
- Avoids disk lookups for vectors that haven't been written yet
223228
- Cleared after successful flush to disk
224229

225-
### 4. Raw Vectors in RAM
230+
### 4. Raw Vectors in RAM with shared_ptr
231+
232+
Raw vectors are stored in `rawVectorsInRAM` using `std::shared_ptr<std::string>`:
233+
234+
```cpp
235+
std::unordered_map<idType, std::shared_ptr<std::string>> rawVectorsInRAM;
236+
```
226237

227-
Raw vectors are kept in `rawVectorsInRAM` until flushed to disk:
238+
**Benefits:**
228239
- Allows concurrent jobs to access vectors before disk write
229240
- Eliminates redundant disk reads during graph construction
241+
- **Zero-copy job execution**: Jobs increment refcount instead of copying entire vector
242+
- **Safe concurrent deletion**: If vector is erased from map while job is executing, the `shared_ptr` keeps data alive until job completes
230243
- Protected by `rawVectorsGuard` (shared_mutex)
231244

245+
**Execution flow:**
246+
```cpp
247+
// Job execution - no data copy, just refcount increment
248+
std::shared_ptr<std::string> localRawRef;
249+
{
250+
std::shared_lock<std::shared_mutex> lock(rawVectorsGuard);
251+
localRawRef = rawVectorsInRAM[job->vectorId]; // refcount++
252+
}
253+
// Lock released, but data stays alive via localRawRef
254+
// Use localRawRef->data() for graph insertion and disk write
255+
```
256+
232257
## Thread Safety Summary
233258

234259
| Operation | Thread Safety | Notes |

src/VecSim/algorithms/hnsw/hnsw_disk.h

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "VecSim/memory/vecsim_malloc.h"
1414
#include "VecSim/utils/vecsim_stl.h"
1515
#include "VecSim/utils/vec_utils.h"
16+
#include <optional>
1617
#include <vector>
1718
// #include "VecSim/containers/data_block.h"
1819
// #include "VecSim/containers/raw_data_container_interface.h"
@@ -143,17 +144,14 @@ class HNSWDiskIndex;
143144
struct HNSWDiskSingleInsertJob : public AsyncJob {
144145
idType vectorId;
145146
size_t elementMaxLevel;
146-
// Store vector data directly in the job (no external references)
147-
std::string rawVectorData; // Original float32 vector
148-
std::string processedVectorData; // Preprocessed/quantized vector for distance calculations
147+
// No vector data stored - looked up from index when job executes
148+
// This saves memory: 100M pending jobs don't need 100M vector copies
149149

150150
HNSWDiskSingleInsertJob(std::shared_ptr<VecSimAllocator> allocator, idType vectorId_,
151-
size_t elementMaxLevel_, std::string &&rawVector,
152-
std::string &&processedVector, JobCallback insertCb,
151+
size_t elementMaxLevel_, JobCallback insertCb,
153152
VecSimIndex *index_)
154153
: AsyncJob(allocator, HNSW_DISK_SINGLE_INSERT_JOB, insertCb, index_),
155-
vectorId(vectorId_), elementMaxLevel(elementMaxLevel_),
156-
rawVectorData(std::move(rawVector)), processedVectorData(std::move(processedVector)) {}
154+
vectorId(vectorId_), elementMaxLevel(elementMaxLevel_) {}
157155
};
158156

159157
//////////////////////////////////// HNSW index implementation ////////////////////////////////////
@@ -277,8 +275,9 @@ class HNSWDiskIndex : public VecSimIndexAbstract<DataType, DistType>
277275
vecsim_stl::vector<NeighborUpdate> stagedInsertNeighborUpdates;
278276

279277
// Temporary storage for raw vectors in RAM (until flush batch)
280-
// Maps idType -> raw vector data (stored as string for simplicity)
281-
std::unordered_map<idType, std::string> rawVectorsInRAM;
278+
// Maps idType -> raw vector data (using shared_ptr to avoid copying in job execution)
279+
// When a job executes, it just increments refcount instead of copying the entire vector
280+
std::unordered_map<idType, std::shared_ptr<std::string>> rawVectorsInRAM;
282281

283282

284283
/********************************** Multi-threading Support **********************************/
@@ -934,10 +933,12 @@ int HNSWDiskIndex<DataType, DistType>::addVector(
934933
// We need to store the original vector before preprocessing
935934
// NOTE: In batchless mode, we still use rawVectorsInRAM so other concurrent jobs can access
936935
// the raw vectors of vectors that haven't been written to disk yet
936+
// Using shared_ptr so job execution can just increment refcount instead of copying
937937
const char* raw_data = reinterpret_cast<const char*>(vector);
938+
auto rawVectorPtr = std::make_shared<std::string>(raw_data, this->inputBlobSize);
938939
{
939940
std::lock_guard<std::shared_mutex> lock(rawVectorsGuard);
940-
rawVectorsInRAM[newElementId] = std::string(raw_data, this->inputBlobSize);
941+
rawVectorsInRAM[newElementId] = rawVectorPtr;
941942
}
942943
// Preprocess the vector
943944
ProcessedBlobs processedBlobs = this->preprocess(vector);
@@ -1009,14 +1010,9 @@ int HNSWDiskIndex<DataType, DistType>::addVector(
10091010
// Check if we have a job queue for async processing
10101011
if (SubmitJobsToQueue != nullptr) {
10111012
// Multi-threaded: submit job for async processing
1012-
std::string rawVectorCopy(raw_data, this->inputBlobSize);
1013-
std::string processedVectorCopy(
1014-
reinterpret_cast<const char *>(processedBlobs.getStorageBlob()),
1015-
this->dataSize);
1016-
1013+
// No vector copies in job - job will look up from rawVectorsInRAM and this->vectors
10171014
auto *job = new (this->allocator) HNSWDiskSingleInsertJob(
1018-
this->allocator, newElementId, elementMaxLevel, std::move(rawVectorCopy),
1019-
std::move(processedVectorCopy),
1015+
this->allocator, newElementId, elementMaxLevel,
10201016
HNSWDiskIndex<DataType, DistType>::executeSingleInsertJobWrapper, this);
10211017

10221018
submitSingleJob(job);
@@ -1304,7 +1300,7 @@ bool HNSWDiskIndex<DataType, DistType>::getRawVector(idType id, void* output_buf
13041300
std::shared_lock<std::shared_mutex> lock(rawVectorsGuard);
13051301
auto it = rawVectorsInRAM.find(id);
13061302
if (it != rawVectorsInRAM.end()) {
1307-
const char* data_ptr = it->second.data();
1303+
const char* data_ptr = it->second->data();
13081304
std::memcpy(output_buffer, data_ptr, this->inputBlobSize);
13091305
return true;
13101306
}
@@ -1353,7 +1349,7 @@ bool HNSWDiskIndex<DataType, DistType>::getRawVectorInternal(idType id, void* ou
13531349
std::shared_lock<std::shared_mutex> lock(rawVectorsGuard);
13541350
auto it = rawVectorsInRAM.find(id);
13551351
if (it != rawVectorsInRAM.end()) {
1356-
const char* data_ptr = it->second.data();
1352+
const char* data_ptr = it->second->data();
13571353
std::memcpy(output_buffer, data_ptr, this->inputBlobSize);
13581354
return true;
13591355
}
@@ -3146,7 +3142,28 @@ void HNSWDiskIndex<DataType, DistType>::executeSingleInsertJob(HNSWDiskSingleIns
31463142
return;
31473143
}
31483144

3149-
// Get current entry point
3145+
// Get shared_ptr to raw vector from rawVectorsInRAM (just increments refcount, no copy)
3146+
// This keeps the data alive even if erased from map before job finishes
3147+
std::shared_ptr<std::string> localRawRef;
3148+
{
3149+
std::shared_lock<std::shared_mutex> lock(rawVectorsGuard);
3150+
auto it = rawVectorsInRAM.find(job->vectorId);
3151+
if (it == rawVectorsInRAM.end()) {
3152+
// Vector was already erased (e.g., deleted before job executed)
3153+
delete job;
3154+
return;
3155+
}
3156+
localRawRef = it->second; // Just increments refcount, no data copy
3157+
}
3158+
3159+
// Get processed vector from vectors container
3160+
const void *processedVector;
3161+
{
3162+
std::shared_lock<std::shared_mutex> lock(vectorsGuard);
3163+
processedVector = this->vectors->getElement(job->vectorId);
3164+
}
3165+
3166+
// Get current entry point and max level
31503167
idType currentEntryPoint;
31513168
size_t currentMaxLevel;
31523169
{
@@ -3157,8 +3174,7 @@ void HNSWDiskIndex<DataType, DistType>::executeSingleInsertJob(HNSWDiskSingleIns
31573174

31583175
// Use unified core function (batching controlled by diskWriteBatchThreshold)
31593176
executeGraphInsertionCore(job->vectorId, job->elementMaxLevel, currentEntryPoint,
3160-
currentMaxLevel, job->rawVectorData.data(),
3161-
job->processedVectorData.data());
3177+
currentMaxLevel, localRawRef->data(), processedVector);
31623178

31633179
delete job;
31643180
}

0 commit comments

Comments
 (0)