Skip to content
2 changes: 2 additions & 0 deletions index/scorch/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,5 +395,7 @@ func (i *IndexSnapshot) unadornedTermFieldReader(
recycle: false,
// signal downstream that this is a special unadorned termFieldReader
unadorned: true,
// unadorned TFRs do not require bytes read tracking
updateBytesRead: false,
}
}
10 changes: 2 additions & 8 deletions index/scorch/optimize_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ type OptimizeVR struct {
totalCost uint64
// maps field to vector readers
vrs map[string][]*IndexSnapshotVectorReader
// if at least one of the vector readers requires filtered kNN.
requiresFiltering bool
}

// This setting _MUST_ only be changed during init and not after.
Expand Down Expand Up @@ -85,8 +83,7 @@ func (o *OptimizeVR) Finish() error {
continue
}

vecIndex, err := segment.InterpretVectorIndex(field,
o.requiresFiltering, origSeg.deleted)
vecIndex, err := segment.InterpretVectorIndex(field, origSeg.deleted)
if err != nil {
errorsM.Lock()
errors = append(errors, err)
Expand All @@ -109,7 +106,7 @@ func (o *OptimizeVR) Finish() error {
// kNN search.
if vr.eligibleSelector != nil {
pl, err = vecIndex.SearchWithFilter(vr.vector, vr.k,
vr.eligibleSelector.SegmentEligibleDocs(index), vr.searchParams)
vr.eligibleSelector.SegmentEligibleDocuments(index), vr.searchParams)
} else {
pl, err = vecIndex.Search(vr.vector, vr.k, vr.searchParams)
}
Expand Down Expand Up @@ -163,9 +160,6 @@ func (s *IndexSnapshotVectorReader) VectorOptimize(ctx context.Context,
return octx, nil
}
o.ctx = ctx
if !o.requiresFiltering {
o.requiresFiltering = s.eligibleSelector != nil
}

if o.snapshot != s.snapshot {
o.invokeSearcherEndCallback()
Expand Down
2 changes: 2 additions & 0 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,8 @@ func (is *IndexSnapshot) TermFieldReader(ctx context.Context, term []byte, field
rv.incrementBytesRead(bytesRead - prevBytesReadItr)
}
}
// ONLY update the bytes read value beyond this point for this TFR if scoring is enabled
rv.updateBytesRead = rv.includeFreq || rv.includeNorm || rv.includeTermVectors
atomic.AddUint64(&is.parent.stats.TotTermSearchersStarted, uint64(1))
return rv, nil
}
Expand Down
29 changes: 20 additions & 9 deletions index/scorch/snapshot_index_tfr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type IndexSnapshotTermFieldReader struct {
bytesRead uint64
ctx context.Context
unadorned bool
// flag to indicate whether to increment our bytesRead
// value after creation of the TFR while iterating our postings
// lists
updateBytesRead bool
}

func (i *IndexSnapshotTermFieldReader) incrementBytesRead(val uint64) {
Expand Down Expand Up @@ -83,10 +87,15 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in
if rv == nil {
rv = &index.TermFieldDoc{}
}
var prevBytesRead uint64
// find the next hit
for i.segmentOffset < len(i.iterators) {
prevBytesRead := i.iterators[i.segmentOffset].BytesRead()
next, err := i.iterators[i.segmentOffset].Next()
// get our current postings iterator
curItr := i.iterators[i.segmentOffset]
if i.updateBytesRead {
prevBytesRead = curItr.BytesRead()
}
next, err := curItr.Next()
if err != nil {
return nil, err
}
Expand All @@ -99,13 +108,15 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in

i.currID = rv.ID
i.currPosting = next
// postingsIterators is maintain the bytesRead stat in a cumulative fashion.
// this is because there are chances of having a series of loadChunk calls,
// and they have to be added together before sending the bytesRead at this point
// upstream.
bytesRead := i.iterators[i.segmentOffset].BytesRead()
if bytesRead > prevBytesRead {
i.incrementBytesRead(bytesRead - prevBytesRead)
if i.updateBytesRead {
// postingsIterators is maintain the bytesRead stat in a cumulative fashion.
// this is because there are chances of having a series of loadChunk calls,
// and they have to be added together before sending the bytesRead at this point
// upstream.
bytesRead := curItr.BytesRead()
if bytesRead > prevBytesRead {
i.incrementBytesRead(bytesRead - prevBytesRead)
}
}
return rv, nil
}
Expand Down
3 changes: 1 addition & 2 deletions index/scorch/snapshot_index_vr.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ func (i *IndexSnapshot) CentroidCardinalities(field string, limit int, descendin

for _, segment := range i.segment {
if sv, ok := segment.segment.(segment_api.VectorSegment); ok {
vecIndex, err := sv.InterpretVectorIndex(field,
false /* does not require filtering */, segment.deleted)
vecIndex, err := sv.InterpretVectorIndex(field, segment.deleted)
if err != nil {
return nil, fmt.Errorf("failed to interpret vector index for field %s in segment: %v", field, err)
}
Expand Down
82 changes: 75 additions & 7 deletions index/scorch/snapshot_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"

"github.com/bits-and-blooms/bitset"
index "github.com/blevesearch/bleve_index_api"
segment_api "github.com/blevesearch/scorch_segment_api/v2"
)
Expand All @@ -48,14 +49,76 @@ func (is *IndexSnapshot) VectorReader(ctx context.Context, vector []float32,
// eligibleDocumentSelector is used to filter out documents that are eligible for
// the KNN search from a pre-filter query.
type eligibleDocumentSelector struct {
// segment ID -> segment local doc nums
eligibleDocNums map[int][]uint64
// segment ID -> segment local doc nums in a bitset
eligibleDocNums []*bitset.BitSet
is *IndexSnapshot
}

// SegmentEligibleDocs returns the list of eligible local doc numbers for the given segment.
func (eds *eligibleDocumentSelector) SegmentEligibleDocs(segmentID int) []uint64 {
return eds.eligibleDocNums[segmentID]
// eligibleDocumentList represents the list of eligible documents within a segment.
type eligibleDocumentList struct {
bs *bitset.BitSet
}

// Iterator returns an iterator for the eligible document IDs.
func (edl *eligibleDocumentList) Iterator() index.EligibleDocumentIterator {
if edl.bs == nil {
// no eligible documents
return emptyEligibleIterator
}
// return the iterator
return &eligibleDocumentIterator{
bs: edl.bs,
max: uint(edl.bs.Len()),
}
}

// Count returns the number of eligible document IDs.
func (edl *eligibleDocumentList) Count() int {
if edl.bs == nil {
return 0
}
return int(edl.bs.Count())
}

// emptyEligibleDocumentList is a reusable empty eligible document list.
var emptyEligibleDocumentList = &eligibleDocumentList{}

// eligibleDocumentIterator iterates over eligible document IDs within a segment.
type eligibleDocumentIterator struct {
bs *bitset.BitSet
current uint
max uint
}

// Next returns the next eligible document ID and whether it exists.
func (it *eligibleDocumentIterator) Next() (id uint64, ok bool) {
if it.bs == nil {
return 0, false // no eligible documents
}
next, found := it.bs.NextSet(it.current)
if next >= it.max || !found {
return 0, false
}
it.current = next + 1
return uint64(next), true
}

// emptyIterator is a reusable empty eligible document iterator.
var emptyEligibleIterator = &eligibleDocumentIterator{}

// SegmentEligibleDocuments returns an EligibleDocumentList for the specified segment ID.
func (eds *eligibleDocumentSelector) SegmentEligibleDocuments(segmentID int) index.EligibleDocumentList {
if eds.eligibleDocNums == nil || segmentID < 0 || segmentID >= len(eds.eligibleDocNums) {
return emptyEligibleDocumentList
}
bs := eds.eligibleDocNums[segmentID]
if bs == nil {
// no eligible documents for this segment
return emptyEligibleDocumentList
}
return &eligibleDocumentList{
bs: bs,
}
}

// AddEligibleDocumentMatch adds a document match to the list of eligible documents.
Expand All @@ -68,14 +131,19 @@ func (eds *eligibleDocumentSelector) AddEligibleDocumentMatch(id index.IndexInte
if err != nil {
return err
}
// allocate a bitset for this segment if needed
if eds.eligibleDocNums[segIdx] == nil {
// the size of the bitset is the full size of the segment (which is the max local doc num + 1)
eds.eligibleDocNums[segIdx] = bitset.New(uint(eds.is.segment[segIdx].FullSize()))
}
// Add the local doc number to the list of eligible doc numbers for this segment.
eds.eligibleDocNums[segIdx] = append(eds.eligibleDocNums[segIdx], docNum)
eds.eligibleDocNums[segIdx].Set(uint(docNum))
return nil
}

func (is *IndexSnapshot) NewEligibleDocumentSelector() index.EligibleDocumentSelector {
return &eligibleDocumentSelector{
eligibleDocNums: map[int][]uint64{},
eligibleDocNums: make([]*bitset.BitSet, len(is.segment)),
is: is,
}
}
32 changes: 21 additions & 11 deletions index/scorch/unadorned.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func init() {
type unadornedPostingsIteratorBitmap struct {
actual roaring.IntPeekable
actualBM *roaring.Bitmap
next UnadornedPosting // reused across Next() calls
}

func (i *unadornedPostingsIteratorBitmap) Next() (segment.Posting, error) {
Expand All @@ -53,7 +54,10 @@ func (i *unadornedPostingsIteratorBitmap) nextAtOrAfter(atOrAfter uint64) (segme
if !exists {
return nil, nil
}
return UnadornedPosting(docNum), nil
i.next = UnadornedPosting{} // clear the struct
rv := &i.next
rv.docNum = docNum
return rv, nil
}

func (i *unadornedPostingsIteratorBitmap) nextDocNumAtOrAfter(atOrAfter uint64) (uint64, bool) {
Expand Down Expand Up @@ -112,8 +116,9 @@ func newUnadornedPostingsIteratorFromBitmap(bm *roaring.Bitmap) segment.Postings
const docNum1HitFinished = math.MaxUint64

type unadornedPostingsIterator1Hit struct {
docNumOrig uint64 // original 1-hit docNum used to create this iterator
docNum uint64 // current docNum
docNumOrig uint64 // original 1-hit docNum used to create this iterator
docNum uint64 // current docNum
next UnadornedPosting // reused across Next() calls
}

func (i *unadornedPostingsIterator1Hit) Next() (segment.Posting, error) {
Expand All @@ -129,7 +134,10 @@ func (i *unadornedPostingsIterator1Hit) nextAtOrAfter(atOrAfter uint64) (segment
if !exists {
return nil, nil
}
return UnadornedPosting(docNum), nil
i.next = UnadornedPosting{} // clear the struct
rv := &i.next
rv.docNum = docNum
return rv, nil
}

func (i *unadornedPostingsIterator1Hit) nextDocNumAtOrAfter(atOrAfter uint64) (uint64, bool) {
Expand Down Expand Up @@ -176,24 +184,26 @@ type ResetablePostingsIterator interface {
ResetIterator()
}

type UnadornedPosting uint64
type UnadornedPosting struct {
docNum uint64
}

func (p UnadornedPosting) Number() uint64 {
return uint64(p)
func (p *UnadornedPosting) Number() uint64 {
return p.docNum
}

func (p UnadornedPosting) Frequency() uint64 {
func (p *UnadornedPosting) Frequency() uint64 {
return 0
}

func (p UnadornedPosting) Norm() float64 {
func (p *UnadornedPosting) Norm() float64 {
return 0
}

func (p UnadornedPosting) Locations() []segment.Location {
func (p *UnadornedPosting) Locations() []segment.Location {
return nil
}

func (p UnadornedPosting) Size() int {
func (p *UnadornedPosting) Size() int {
return reflectStaticSizeUnadornedPosting
}
12 changes: 6 additions & 6 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,9 @@ func TestBytesRead(t *testing.T) {
stats, _ := idx.StatsMap()["index"].(map[string]interface{})
prevBytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)

expectedBytesRead := uint64(22049)
expectedBytesRead := uint64(21164)
if supportForVectorSearch {
expectedBytesRead = 22459
expectedBytesRead = 21574
}

if prevBytesRead != expectedBytesRead && res.Cost == prevBytesRead {
Expand Down Expand Up @@ -770,9 +770,9 @@ func TestBytesReadStored(t *testing.T) {
stats, _ := idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)

expectedBytesRead := uint64(11911)
expectedBytesRead := uint64(11025)
if supportForVectorSearch {
expectedBytesRead = 12321
expectedBytesRead = 11435
}

if bytesRead != expectedBytesRead && bytesRead == res.Cost {
Expand Down Expand Up @@ -847,9 +847,9 @@ func TestBytesReadStored(t *testing.T) {
stats, _ = idx1.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)

expectedBytesRead = uint64(4097)
expectedBytesRead = uint64(3212)
if supportForVectorSearch {
expectedBytesRead = 4507
expectedBytesRead = 3622
}

if bytesRead != expectedBytesRead && bytesRead == res.Cost {
Expand Down
Loading