Skip to content

Commit 5a32c15

Browse files
committed
code cleanup and refactor
1 parent f7eb349 commit 5a32c15

File tree

4 files changed

+27
-46
lines changed

4 files changed

+27
-46
lines changed

index/scorch/introducer.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,9 +468,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
468468
if skipped {
469469
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsObsoleted, 1)
470470
} else {
471-
// tbd: should this stat correspond to total number of merged segments introduced?
472-
// or is it like number of merge introductions done
473-
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
471+
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, uint64(len(nextMerge.new)))
474472
}
475473

476474
atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)

index/scorch/merge.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,6 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
334334
}
335335
}
336336

337-
var oldNewDocNums map[uint64][]uint64
338337
var seg segment.Segment
339338
var filename string
340339
if len(segmentsToMerge) > 0 {
@@ -375,10 +374,10 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
375374
totalBytesRead := seg.BytesRead() + prevBytesReadTotal
376375
seg.ResetBytesRead(totalBytesRead)
377376

378-
oldNewDocNums = make(map[uint64][]uint64, len(newDocNums))
379377
for i, segNewDocNums := range newDocNums {
380-
oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
381-
mergedSegHistory[task.Segments[i].Id()].oldNewDocIDs = segNewDocNums
378+
if mergedSegHistory[task.Segments[i].Id()] != nil {
379+
mergedSegHistory[task.Segments[i].Id()].oldNewDocIDs = segNewDocNums
380+
}
382381
}
383382

384383
atomic.AddUint64(&s.stats.TotFileMergeSegments, uint64(len(segmentsToMerge)))
@@ -489,7 +488,7 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
489488
// we're tracking the merged segments and their doc number per worker
490489
// to be able to introduce them all at once, so the first dimension of the
491490
// slices here correspond to workerID
492-
newDocNumsSet := make([][][]uint64, len(flushableObjs))
491+
newDocIDsSet := make([][][]uint64, len(flushableObjs))
493492
newMergedSegments := make([]segment.Segment, len(flushableObjs))
494493
newMergedSegmentIDs := make([]uint64, len(flushableObjs))
495494
numFlushes := len(flushableObjs)
@@ -507,15 +506,15 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
507506

508507
// the newly merged segment is already flushed out to disk, just needs
509508
// to be opened using mmap.
510-
newDocNums, _, err :=
509+
newDocIDs, _, err :=
511510
s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
512511
if err != nil {
513512
errs[id] = err
514513
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
515514
return
516515
}
517516
newMergedSegmentIDs[id] = newSegmentID
518-
newDocNumsSet[id] = newDocNums
517+
newDocIDsSet[id] = newDocIDs
519518
newMergedSegments[id], err = s.segPlugin.Open(path)
520519
if err != nil {
521520
errs[id] = err
@@ -559,7 +558,7 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
559558
// oldSegmentSnapshot.id -> {workerID, oldSegmentSnapshot, docIDs}
560559
sm.mergedSegHistory[ss.id] = &mergedSegmentHistory{
561560
workerID: uint64(i),
562-
oldNewDocIDs: newDocNumsSet[i][j],
561+
oldNewDocIDs: newDocIDsSet[i][j],
563562
oldSegment: ss,
564563
}
565564
}

index/scorch/persister.go

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math"
2525
"os"
2626
"path/filepath"
27+
"slices"
2728
"sort"
2829
"strconv"
2930
"strings"
@@ -381,11 +382,11 @@ type flushable struct {
381382

382383
// number workers which parallely perform an in-memory merge of the segments
383384
// followed by a flush operation.
384-
var DefaultNumPersisterWorkers = 8
385+
var DefaultNumPersisterWorkers = 1
385386

386387
// maximum size of data that a single worker is allowed to perform the in-memory
387388
// merge operation.
388-
var DefaultMaxSizeInMemoryMerge = 200 * 1024 * 1024
389+
var DefaultMaxSizeInMemoryMerge = 0
389390

390391
func legacyFlushBehaviour(maxSizeInMemoryMerge, numPersisterWorkers int) bool {
391392
// DefaultMaxSizeInMemoryMerge = 0 is a special value to preserve the leagcy
@@ -432,27 +433,20 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot, po *persiste
432433
// constructs a flushSet where each flushable object contains a set of segments
433434
// to be merged and flushed out to disk.
434435
for i, snapshot := range snapshot.segment {
435-
if totSize >= po.MaxSizeInMemoryMerge {
436-
if len(sbs) >= DefaultMinSegmentsForInMemoryMerge {
437-
numSegsToFlushOut += len(sbs)
438-
val := &flushable{
439-
segments: make([]segment.Segment, len(sbs)),
440-
drops: make([]*roaring.Bitmap, len(sbsDrops)),
441-
sbIdxs: make([]int, len(sbsIndexes)),
442-
totDocs: totDocs,
443-
}
444-
copy(val.segments, sbs)
445-
copy(val.drops, sbsDrops)
446-
copy(val.sbIdxs, sbsIndexes)
447-
flushSet = append(flushSet, val)
448-
449-
oldSegIdxs = append(oldSegIdxs, sbsIndexes...)
450-
sbs = sbs[:0]
451-
sbsDrops = sbsDrops[:0]
452-
sbsIndexes = sbsIndexes[:0]
453-
totSize = 0
454-
totDocs = 0
436+
if totSize >= po.MaxSizeInMemoryMerge &&
437+
len(sbs) >= DefaultMinSegmentsForInMemoryMerge {
438+
numSegsToFlushOut += len(sbs)
439+
val := &flushable{
440+
segments: slices.Clone(sbs),
441+
drops: slices.Clone(sbsDrops),
442+
sbIdxs: slices.Clone(sbsIndexes),
443+
totDocs: totDocs,
455444
}
445+
flushSet = append(flushSet, val)
446+
oldSegIdxs = append(oldSegIdxs, sbsIndexes...)
447+
448+
sbs, sbsDrops, sbsIndexes = sbs[:0], sbsDrops[:0], sbsIndexes[:0]
449+
totSize, totDocs = 0, 0
456450
}
457451

458452
if len(flushSet) >= int(po.NumPersisterWorkers) {
@@ -471,22 +465,13 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot, po *persiste
471465
if len(flushSet) < po.NumPersisterWorkers {
472466
numSegsToFlushOut += len(sbs)
473467
val := &flushable{
474-
segments: make([]segment.Segment, len(sbs)),
475-
drops: make([]*roaring.Bitmap, len(sbsDrops)),
476-
sbIdxs: make([]int, len(sbsIndexes)),
468+
segments: slices.Clone(sbs),
469+
drops: slices.Clone(sbsDrops),
470+
sbIdxs: slices.Clone(sbsIndexes),
477471
totDocs: totDocs,
478472
}
479-
copy(val.segments, sbs)
480-
copy(val.drops, sbsDrops)
481-
copy(val.sbIdxs, sbsIndexes)
482473
flushSet = append(flushSet, val)
483-
484474
oldSegIdxs = append(oldSegIdxs, sbsIndexes...)
485-
sbs = sbs[:0]
486-
sbsDrops = sbsDrops[:0]
487-
sbsIndexes = sbsIndexes[:0]
488-
totSize = 0
489-
totDocs = 0
490475
}
491476
}
492477

search/scorer/scorer_term.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,5 @@ func (s *TermQueryScorer) Score(ctx *search.SearchContext, termMatch *index.Term
272272
})
273273
}
274274
}
275-
276275
return rv
277276
}

0 commit comments

Comments
 (0)