Skip to content

Commit 2f4e4d1

Browse files
committed
code comments
1 parent 6cfa39b commit 2f4e4d1

File tree

6 files changed

+19
-11
lines changed

6 files changed

+19
-11
lines changed

index/scorch/introducer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
429429
}
430430

431431
skipped := true
432+
// make the newly merged segments part of the newSnapshot being constructed
432433
for i, newMergedSegment := range nextMerge.new {
433434
// checking if this newly merged segment is worth keeping based on
434435
// obsoleted doc count since the merge intro started

index/scorch/merge.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,9 @@ type mergeTaskIntroStatus struct {
443443
skipped bool
444444
}
445445

446+
// this is important when it comes to introducing multiple merged segments in a
447+
// single introducer channel push. That way there is a check to ensure that the
448+
// file count doesn't explode during the index's lifetime.
446449
type mergedSegmentHistory struct {
447450
workerID uint64
448451
oldNewDocIDs []uint64
@@ -501,6 +504,9 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
501504
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
502505
filename := zapFileName(newSegmentID)
503506
path := s.path + string(os.PathSeparator) + filename
507+
508+
// the newly merged segment is already flushed out to disk, just needs
509+
// to be opened using mmap.
504510
newDocNums, _, err :=
505511
s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
506512
if err != nil {
@@ -527,7 +533,7 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
527533
// close the new merged segments
528534
_ = closeNewMergedSegments(newMergedSegments)
529535

530-
// tbd: need a better way to handle error
536+
// tbd: need a better way to consolidate errors
531537
return nil, nil, errs[0]
532538
}
533539

index/scorch/persister.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,13 @@ type flushable struct {
369369
totDocs uint64
370370
}
371371

372-
var DefaultNumPersisterWorkers = 1
372+
// number workers which parallely perform an in-memory merge of the segments followed
373+
// by a flush operation.
374+
var DefaultNumPersisterWorkers = 4
373375

374376
// maximum size of data that a single worker is allowed to perform the in-memory
375377
// merge operation.
376-
var DefaultMaxSizeInMemoryMerge = 0
378+
var DefaultMaxSizeInMemoryMerge = 200 * 1024 * 1024
377379

378380
func legacyFlushBehaviour() bool {
379381
// DefaultMaxSizeInMemoryMerge = 0 is a special value to preserve the leagcy
@@ -417,6 +419,8 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
417419

418420
flushSet = append(flushSet, val)
419421
} else {
422+
// constructs a flushSet where each flushable object contains a set of segments
423+
// to be merged and flushed out to disk.
420424
for i, snapshot := range snapshot.segment {
421425
if totSize >= DefaultMaxSizeInMemoryMerge {
422426
if len(sbs) >= DefaultMinSegmentsForInMemoryMerge {
@@ -480,12 +484,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
480484
return false, nil
481485
}
482486

483-
// deploy the workers, have a wait group which waits for the flush set to complete
484-
// each worker
485-
// 1. merges the segments using mergeSegmentBases()
486-
// wait for group to finish
487-
//
488-
// construct equiv snapshot and do a persistSnapshotDirect()
487+
// drains out (after merging in memory) the segments in the flushSet parallely
489488
newSnapshot, newSegmentIDs, err := s.mergeSegmentBasesParallel(snapshot, flushSet)
490489
if err != nil {
491490
return false, err
@@ -694,7 +693,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
694693
}
695694
filenames = append(filenames, filename)
696695
case segment.UnpersistedSegment:
697-
// need to persist this to disk
696+
// need to persist this to disk if its not part of exclude list (which
697+
// restricts which in-memory segment to be persisted to disk)
698698
if _, ok := exclude[segmentSnapshot.id]; !ok {
699699
filename := zapFileName(segmentSnapshot.id)
700700
path := filepath.Join(path, filename)

index/scorch/scorch_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,6 @@ func TestIndexInsertThenDelete(t *testing.T) {
438438
t.Fatal(err)
439439
}
440440

441-
fmt.Println("start delete")
442441
err = idx.Delete("1")
443442
if err != nil {
444443
t.Errorf("Error deleting entry from index: %v", err)

index/scorch/snapshot_index_vr.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func (i *IndexSnapshotVectorReader) Next(preAlloced *index.VectorDoc) (
118118
nnum := next.Number()
119119
rv.ID = docNumberToBytes(rv.ID, nnum+globalOffset)
120120
rv.Score = float64(next.Score())
121+
121122
i.currID = rv.ID
122123
i.currPosting = next
123124

search/scorer/scorer_term.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func (s *TermQueryScorer) SetQueryNorm(qnorm float64) {
119119

120120
// update the query weight
121121
s.queryWeight = s.queryBoost * s.idf * s.queryNorm
122+
122123
if s.options.Explain {
123124
childrenExplanations := make([]*search.Explanation, 3)
124125
childrenExplanations[0] = &search.Explanation{

0 commit comments

Comments
 (0)