Skip to content

Commit c2378c8

Browse files
committed
enable auto-stiching strategy again with proper locking
1 parent b6e58f5 commit c2378c8

File tree

1 file changed

+75
-16
lines changed

1 file changed

+75
-16
lines changed

pkg/search/search.go

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,6 @@ const (
534534
type strategyDeltaMutation struct {
535535
seq uint64
536536
id string
537-
vec []float32
538537
add bool
539538
}
540539

@@ -1794,6 +1793,9 @@ func (s *Service) IndexNode(node *storage.Node) error {
17941793
s.indexMu.Lock()
17951794
err := s.indexNodeLocked(node, false)
17961795
s.indexMu.Unlock()
1796+
if err == nil && !s.buildInProgress.Load() {
1797+
s.scheduleStrategyTransitionCheck()
1798+
}
17971799
return err
17981800
}
17991801

@@ -1810,7 +1812,7 @@ func (s *Service) addVectorLocked(id string, vec []float32) error {
18101812
err = s.vectorIndex.Add(id, vec)
18111813
}
18121814
if err == nil {
1813-
s.appendStrategyDelta(id, vec, true)
1815+
s.appendStrategyDelta(id, true)
18141816
}
18151817
return err
18161818
}
@@ -1880,7 +1882,7 @@ func (s *Service) removeVectorLocked(id string) {
18801882
if s.vectorFileStore != nil {
18811883
s.vectorFileStore.Remove(id)
18821884
}
1883-
s.appendStrategyDelta(id, nil, false)
1885+
s.appendStrategyDelta(id, false)
18841886
}
18851887

18861888
// ensureBuildVectorFileStore creates vectorFileStore when building with vectorIndexPath so vectors go to disk.
@@ -2206,6 +2208,9 @@ func (s *Service) RemoveNode(nodeID storage.NodeID) error {
22062208
s.indexMu.Lock()
22072209
s.removeNodeLocked(string(nodeID))
22082210
s.indexMu.Unlock()
2211+
if !s.buildInProgress.Load() {
2212+
s.scheduleStrategyTransitionCheck()
2213+
}
22092214

22102215
return nil
22112216
}
@@ -3264,7 +3269,26 @@ func (s *Service) snapshotStrategyInputs() (int, *VectorIndex, *VectorFileStore)
32643269
}
32653270

32663271
func (s *Service) scheduleStrategyTransitionCheck() {
3267-
return
3272+
if s.buildInProgress.Load() {
3273+
return
3274+
}
3275+
vectorCount, _, _ := s.snapshotStrategyInputs()
3276+
current := s.currentPipelineStrategy()
3277+
if current == strategyModeUnknown {
3278+
return
3279+
}
3280+
desired := s.desiredRuntimeStrategy(vectorCount)
3281+
if desired == current {
3282+
return
3283+
}
3284+
if (current == strategyModeBruteCPU || current == strategyModeBruteGPU) &&
3285+
(desired == strategyModeBruteCPU || desired == strategyModeBruteGPU) {
3286+
if s.switchBruteStrategy(desired) {
3287+
log.Printf("🔍 Runtime strategy switch: %s -> %s (N=%d)", current.String(), desired.String(), vectorCount)
3288+
}
3289+
return
3290+
}
3291+
s.scheduleDebouncedStrategyTransition(desired)
32683292
}
32693293

32703294
func (s *Service) scheduleDebouncedStrategyTransition(target strategyMode) {
@@ -3329,12 +3353,12 @@ func (s *Service) runStrategyTransition(target strategyMode) {
33293353
}
33303354
}
33313355

3332-
s.replayTransitionDeltas(targetHNSW, target, 0)
3356+
s.replayTransitionDeltas(targetHNSW, target, vi, vfs, 0)
33333357
s.indexMu.Lock()
3334-
lastSeq := s.replayTransitionDeltas(targetHNSW, target, 0)
3358+
lastSeq := s.replayTransitionDeltas(targetHNSW, target, vi, vfs, 0)
33353359
s.applyTransitionSwapLocked(target, targetHNSW, vi, vfs)
33363360
for {
3337-
nextSeq := s.replayTransitionDeltas(targetHNSW, target, lastSeq)
3361+
nextSeq := s.replayTransitionDeltas(targetHNSW, target, vi, vfs, lastSeq)
33383362
if nextSeq == lastSeq {
33393363
break
33403364
}
@@ -3346,11 +3370,13 @@ func (s *Service) runStrategyTransition(target strategyMode) {
33463370
}
33473371

33483372
func (s *Service) applyTransitionSwapLocked(target strategyMode, targetHNSW *HNSWIndex, vi *VectorIndex, vfs *VectorFileStore) {
3373+
var hnswForPipeline *HNSWIndex
33493374
if target == strategyModeHNSW {
33503375
s.hnswMu.Lock()
33513376
old := s.hnswIndex
33523377
s.hnswIndex = targetHNSW
33533378
s.hnswMu.Unlock()
3379+
hnswForPipeline = targetHNSW
33543380
if old != nil && old != targetHNSW {
33553381
old.Clear()
33563382
}
@@ -3366,7 +3392,26 @@ func (s *Service) applyTransitionSwapLocked(target strategyMode, targetHNSW *HNS
33663392
if target == strategyModeBruteGPU {
33673393
_ = s.ensureGPUIndexSynced(vi, vfs)
33683394
}
3369-
p := s.buildPipelineForMode(target, vi, vfs)
3395+
3396+
var p *VectorSearchPipeline
3397+
switch target {
3398+
case strategyModeBruteGPU:
3399+
p = NewVectorSearchPipeline(NewGPUBruteForceCandidateGen(s.gpuEmbeddingIndex), &IdentityExactScorer{})
3400+
case strategyModeBruteCPU:
3401+
if vfs != nil {
3402+
p = NewVectorSearchPipeline(NewFileStoreBruteForceCandidateGen(vfs), NewCPUExactScorer(vfs))
3403+
} else {
3404+
p = NewVectorSearchPipeline(NewBruteForceCandidateGen(vi), NewCPUExactScorer(vi))
3405+
}
3406+
case strategyModeHNSW:
3407+
if hnswForPipeline != nil {
3408+
if vfs != nil {
3409+
p = NewVectorSearchPipeline(NewHNSWCandidateGen(hnswForPipeline), NewCPUExactScorer(vfs))
3410+
} else {
3411+
p = NewVectorSearchPipeline(NewHNSWCandidateGen(hnswForPipeline), NewCPUExactScorer(vi))
3412+
}
3413+
}
3414+
}
33703415
s.pipelineMu.Lock()
33713416
s.vectorPipeline = p
33723417
s.pipelineMu.Unlock()
@@ -3510,7 +3555,7 @@ func (s *Service) ensureGPUIndexSynced(vi *VectorIndex, vfs *VectorFileStore) er
35103555
return nil
35113556
}
35123557

3513-
func (s *Service) replayTransitionDeltas(targetHNSW *HNSWIndex, mode strategyMode, after uint64) uint64 {
3558+
func (s *Service) replayTransitionDeltas(targetHNSW *HNSWIndex, mode strategyMode, vi *VectorIndex, vfs *VectorFileStore, after uint64) uint64 {
35143559
s.strategyTransitionMu.Lock()
35153560
deltas := make([]strategyDeltaMutation, 0, len(s.strategyTransitionDeltas))
35163561
for _, d := range s.strategyTransitionDeltas {
@@ -3523,7 +3568,9 @@ func (s *Service) replayTransitionDeltas(targetHNSW *HNSWIndex, mode strategyMod
35233568
for _, d := range deltas {
35243569
if mode == strategyModeHNSW && targetHNSW != nil {
35253570
if d.add {
3526-
_ = targetHNSW.Update(d.id, d.vec)
3571+
if vec, ok := getTransitionDeltaVector(d.id, vi, vfs); ok {
3572+
_ = targetHNSW.Update(d.id, vec)
3573+
}
35273574
} else {
35283575
targetHNSW.Remove(d.id)
35293576
}
@@ -3533,7 +3580,9 @@ func (s *Service) replayTransitionDeltas(targetHNSW *HNSWIndex, mode strategyMod
35333580
s.mu.RUnlock()
35343581
if gi != nil {
35353582
if d.add {
3536-
_ = gi.Add(d.id, d.vec)
3583+
if vec, ok := getTransitionDeltaVector(d.id, vi, vfs); ok {
3584+
_ = gi.Add(d.id, vec)
3585+
}
35373586
} else {
35383587
_ = gi.Remove(d.id)
35393588
}
@@ -3544,6 +3593,20 @@ func (s *Service) replayTransitionDeltas(targetHNSW *HNSWIndex, mode strategyMod
35443593
return last
35453594
}
35463595

3596+
func getTransitionDeltaVector(id string, vi *VectorIndex, vfs *VectorFileStore) ([]float32, bool) {
3597+
if vfs != nil {
3598+
if vec, ok := vfs.GetVector(id); ok && len(vec) > 0 {
3599+
return vec, true
3600+
}
3601+
}
3602+
if vi != nil {
3603+
if vec, ok := vi.GetVector(id); ok && len(vec) > 0 {
3604+
return vec, true
3605+
}
3606+
}
3607+
return nil, false
3608+
}
3609+
35473610
func (s *Service) clearTransitionDeltaLogLocked(applied uint64) {
35483611
s.strategyTransitionMu.Lock()
35493612
if applied == 0 || len(s.strategyTransitionDeltas) == 0 {
@@ -3561,7 +3624,7 @@ func (s *Service) clearTransitionDeltaLogLocked(applied uint64) {
35613624
s.strategyTransitionMu.Unlock()
35623625
}
35633626

3564-
func (s *Service) appendStrategyDelta(id string, vec []float32, add bool) {
3627+
func (s *Service) appendStrategyDelta(id string, add bool) {
35653628
s.strategyTransitionMu.Lock()
35663629
defer s.strategyTransitionMu.Unlock()
35673630
if !s.strategyTransitionInProgress {
@@ -3573,10 +3636,6 @@ func (s *Service) appendStrategyDelta(id string, vec []float32, add bool) {
35733636
id: id,
35743637
add: add,
35753638
}
3576-
if add && len(vec) > 0 {
3577-
delta.vec = make([]float32, len(vec))
3578-
copy(delta.vec, vec)
3579-
}
35803639
s.strategyTransitionDeltas = append(s.strategyTransitionDeltas, delta)
35813640
}
35823641

0 commit comments

Comments
 (0)