Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions centroid_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//go:build vectors
// +build vectors

package bleve

import (
"encoding/json"
"fmt"
"os"
"testing"

"github.com/blevesearch/bleve/v2/analysis/lang/en"
"github.com/blevesearch/bleve/v2/mapping"
index "github.com/blevesearch/bleve_index_api"
)

func loadSiftData() ([]map[string]interface{}, error) {
fileContent, err := os.ReadFile("~/fts/data/datasets/vec-sift-bucket.json")
if err != nil {
return nil, err
}
var documents []map[string]interface{}
err = json.Unmarshal(fileContent, &documents)
if err != nil {
return nil, err
}
return documents, nil
}

func TestCentroidIndex(t *testing.T) {
_, _, err := readDatasetAndQueries(testInputCompressedFile)
if err != nil {
t.Fatal(err)
}
documents, err := loadSiftData()
if err != nil {
t.Fatal(err)
}
contentFieldMapping := NewTextFieldMapping()
contentFieldMapping.Analyzer = en.AnalyzerName

vecFieldMappingL2 := mapping.NewVectorFieldMapping()
vecFieldMappingL2.Dims = 128
vecFieldMappingL2.Similarity = index.EuclideanDistance

indexMappingL2Norm := NewIndexMapping()
indexMappingL2Norm.DefaultMapping.AddFieldMappingsAt("content", contentFieldMapping)
indexMappingL2Norm.DefaultMapping.AddFieldMappingsAt("vector", vecFieldMappingL2)

idx, err := newIndexUsing(t.TempDir(), indexMappingL2Norm, Config.DefaultIndexType, Config.DefaultKVStore, nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := idx.Close()
if err != nil {
t.Fatal(err)
}
}()

batch := idx.NewBatch()
for _, doc := range documents[:100000] {
docId := fmt.Sprintf("%s:%s", index.TrainDataPrefix, doc["id"])
err = batch.Index(docId, doc)
if err != nil {
t.Fatal(err)
}
}

err = idx.Train(batch)
if err != nil {
t.Fatal(err)
}
}
22 changes: 22 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,25 @@ require (
github.com/spf13/pflag v1.0.6 // indirect
golang.org/x/sys v0.29.0 // indirect
)

replace github.com/blevesearch/bleve/v2 => /Users/thejas.orkombu/fts/blevesearch/bleve

replace github.com/blevesearch/zapx/v11 => /Users/thejas.orkombu/fts/blevesearch/zapx11

replace github.com/blevesearch/zapx/v12 => /Users/thejas.orkombu/fts/blevesearch/zapx12

replace github.com/blevesearch/zapx/v13 => /Users/thejas.orkombu/fts/blevesearch/zapx13

replace github.com/blevesearch/zapx/v14 => /Users/thejas.orkombu/fts/blevesearch/zapx14

replace github.com/blevesearch/zapx/v15 => /Users/thejas.orkombu/fts/blevesearch/zapx15

replace github.com/blevesearch/zapx/v16 => /Users/thejas.orkombu/fts/blevesearch/zapx

replace github.com/blevesearch/scorch_segment_api/v2 => /Users/thejas.orkombu/fts/blevesearch/scorch_segment_api

replace github.com/blevesearch/go-faiss => /Users/thejas.orkombu/fts/blevesearch/go-faiss

replace github.com/blevesearch/bleve_index_api => /Users/thejas.orkombu/fts/blevesearch/bleve_index_api

replace github.com/blevesearch/sear => /Users/thejas.orkombu/fts/blevesearch/sear
11 changes: 11 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ func (b *Batch) Index(id string, data interface{}) error {
eventIndex.FireIndexEvent()
}
doc := document.NewDocument(id)
// fmt.Printf("data is before mapping %#v\n", data)
err := b.index.Mapping().MapDocument(doc, data)
if err != nil {
return err
}
// fmt.Printf("data is after mapping %#v\n", doc)
b.internal.Update(doc)

b.lastDocSize = uint64(doc.Size() +
Expand Down Expand Up @@ -353,6 +355,11 @@ type IndexCopyable interface {
CopyTo(d index.Directory) error
}

type IndexFileCopyable interface {
UpdateFileInBolt(key []byte, value []byte) error
CopyFile(file string, d index.IndexDirectory) error
}

// FileSystemDirectory is the default implementation for the
// index.Directory interface.
type FileSystemDirectory string
Expand Down Expand Up @@ -396,3 +403,7 @@ type InsightsIndex interface {
// CentroidCardinalities returns the centroids (clusters) from IVF indexes ordered by data density.
CentroidCardinalities(field string, limit int, desceding bool) ([]index.CentroidCardinality, error)
}
type VectorIndex interface {
Index
Train(*Batch) error
}
11 changes: 6 additions & 5 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,9 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,

atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
prevBytesReadTotal := cumulateBytesRead(segmentsToMerge)
newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
cw.cancelCh, s)

newDocNums, _, err := s.segPlugin.MergeEx(segmentsToMerge, docsToDrop, path,
cw.cancelCh, s, s.segmentConfig)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)

fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
Expand All @@ -391,7 +392,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
return fmt.Errorf("merging failed: %v", err)
}

seg, err = s.segPlugin.Open(path)
seg, err = s.segPlugin.OpenEx(path, s.segmentConfig)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
Expand Down Expand Up @@ -540,7 +541,7 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot,
// the newly merged segment is already flushed out to disk, just needs
// to be opened using mmap.
newDocIDs, _, err :=
s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
s.segPlugin.MergeEx(segsBatch, dropsBatch, path, s.closeCh, s, s.segmentConfig)
if err != nil {
em.Lock()
errs = append(errs, err)
Expand All @@ -555,7 +556,7 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot,
s.markIneligibleForRemoval(filename)
newMergedSegmentIDs[id] = newSegmentID
newDocIDsSet[id] = newDocIDs
newMergedSegments[id], err = s.segPlugin.Open(path)
newMergedSegments[id], err = s.segPlugin.OpenEx(path, s.segmentConfig)
if err != nil {
em.Lock()
errs = append(errs, err)
Expand Down
32 changes: 30 additions & 2 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot, exclude map[uint
}
}()
for segmentID, path := range newSegmentPaths {
newSegments[segmentID], err = s.segPlugin.Open(path)
newSegments[segmentID], err = s.segPlugin.OpenEx(path, s.segmentConfig)
if err != nil {
return fmt.Errorf("error opening new segment at %s, %v", path, err)
}
Expand Down Expand Up @@ -853,6 +853,22 @@ func zapFileName(epoch uint64) string {
return fmt.Sprintf("%012x.zap", epoch)
}

func (s *Scorch) updateCentroidIndex(bucket *bolt.Bucket) error {
if bucket == nil {
return nil
}
fmt.Println("updateCentroidIndex bucket", bucket != nil)
segmentSnapshot, err := s.loadSegment(bucket)
if err != nil {
return err
}
s.rootLock.Lock()
defer s.rootLock.Unlock()
fmt.Println("updateCentroidIndex", segmentSnapshot.segment != nil)
s.centroidIndex = segmentSnapshot
return nil
}

// bolt snapshot code

func (s *Scorch) loadFromBolt() error {
Expand All @@ -873,6 +889,12 @@ func (s *Scorch) loadFromBolt() error {
s.AddEligibleForRemoval(snapshotEpoch)
continue
}
// fmt.Println("loadFromBolt key %s", k)
// if k[0] == util.BoltCentroidIndexKey[0] {
// fmt.Println("loadFromBolt centroid index key", string(k))

// continue
// }
snapshot := snapshots.Bucket(k)
if snapshot == nil {
log.Printf("snapshot key, but bucket missing %x, continuing", k)
Expand Down Expand Up @@ -904,6 +926,12 @@ func (s *Scorch) loadFromBolt() error {

foundRoot = true
}

centroidIndexBucket := snapshots.Bucket(util.BoltCentroidIndexKey)
err := s.updateCentroidIndex(centroidIndexBucket)
if err != nil {
return err
}
return nil
})
if err != nil {
Expand Down Expand Up @@ -1016,7 +1044,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
return nil, fmt.Errorf("segment path missing")
}
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
seg, err := s.segPlugin.Open(segmentPath)
seg, err := s.segPlugin.OpenEx(segmentPath, s.segmentConfig)
if err != nil {
return nil, fmt.Errorf("error opening bolt segment: %v", err)
}
Expand Down
Loading
Loading