Skip to content

Commit a4b2982

Browse files
committed
keeping legacy behaviour as an option + error handling
1 parent 525b363 commit a4b2982

File tree

3 files changed

+106
-63
lines changed

3 files changed

+106
-63
lines changed

index/scorch/introducer.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -352,15 +352,15 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
352352
creator: "introduceMerge",
353353
}
354354

355-
// iterate through current segments
356-
// newSegmentDeleted := roaring.NewBitmap()
355+
var running, docsToPersistCount, memSegments, fileSegments uint64
356+
var droppedSegmentFiles []string
357357
newSegmentDeleted := make([]*roaring.Bitmap, len(nextMerge.new))
358358
for i := range newSegmentDeleted {
359+
// create a bitmaps to track the obsoletes per newly merged segments
359360
newSegmentDeleted[i] = roaring.NewBitmap()
360361
}
361362

362-
var running, docsToPersistCount, memSegments, fileSegments uint64
363-
var droppedSegmentFiles []string
363+
// iterate through current segments
364364
for i := range root.segment {
365365
segmentID := root.segment[i].id
366366
if segSnapAtMerge, ok := nextMerge.mergedSegHistory[segmentID]; ok {
@@ -439,7 +439,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
439439
fsr.UpdateFieldStats(stats)
440440
}
441441

442-
// put new segment at end
442+
// put the merged segment at the end of newSnapshot
443443
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
444444
id: nextMerge.id[i],
445445
segment: newMergedSegment, // take ownership for nextMerge.new's ref-count
@@ -467,6 +467,8 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
467467
if skipped {
468468
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsObsoleted, 1)
469469
} else {
470+
// tbd: should this stat correspond to total number of merged segments introduced?
471+
// or is it like number of merge introductions done
470472
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
471473
}
472474

index/scorch/merge.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,9 @@ func cumulateBytesRead(sbs []segment.Segment) uint64 {
468468

469469
func closeNewMergedSegments(segs []segment.Segment) error {
470470
for _, seg := range segs {
471-
_ = seg.DecRef()
471+
if seg != nil {
472+
_ = seg.DecRef()
473+
}
472474
}
473475
return nil
474476
}
@@ -482,13 +484,16 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
482484

483485
var wg sync.WaitGroup
484486
// we're tracking the merged segments and their doc number per worker
485-
// to be able to introduce them all at once
487+
// to be able to introduce them all at once, so the first dimension of the
488+
// slices here correspond to workerID
486489
newDocNumsSet := make([][][]uint64, len(flushableObjs))
487490
newMergedSegments := make([]segment.Segment, len(flushableObjs))
488491
newMergedSegmentIDs := make([]uint64, len(flushableObjs))
489492
numFlushes := len(flushableObjs)
490493
var numSegments, newMergedCount uint64
494+
errs := make([]error, numFlushes)
491495

496+
// deploy the workers to merge and flush the batches of segments parallely
492497
for i := 0; i < numFlushes; i++ {
493498
wg.Add(1)
494499
go func(segsBatch []segment.Segment, dropsBatch []*roaring.Bitmap, id int) {
@@ -499,15 +504,15 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
499504
newDocNums, _, err :=
500505
s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
501506
if err != nil {
502-
// handle error
507+
errs[id] = err
503508
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
504509
return
505510
}
506511
newMergedSegmentIDs[id] = newSegmentID
507512
newDocNumsSet[id] = newDocNums
508513
newMergedSegments[id], err = s.segPlugin.Open(path)
509514
if err != nil {
510-
// handle error
515+
errs[id] = err
511516
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
512517
return
513518
}
@@ -517,6 +522,15 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
517522
}
518523

519524
wg.Wait()
525+
526+
if errs[0] != nil {
527+
// close the new merged segments
528+
_ = closeNewMergedSegments(newMergedSegments)
529+
530+
// tbd: need a better way to handle error
531+
return nil, nil, errs[0]
532+
}
533+
520534
atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1)
521535

522536
memMergeZapTime := uint64(time.Since(memMergeZapStartTime))
@@ -536,7 +550,7 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
536550
for i, flushable := range flushableObjs {
537551
for j, idx := range flushable.sbIdxs {
538552
ss := snapshot.segment[idx]
539-
// oldSegmentSnapshot.id -> {threadIdx, oldSegmentSnapshot, docIDs}
553+
// oldSegmentSnapshot.id -> {workerID, oldSegmentSnapshot, docIDs}
540554
sm.mergedSegHistory[ss.id] = &mergedSegmentHistory{
541555
workerID: uint64(i),
542556
oldNewDocIDs: newDocNumsSet[i][j],

index/scorch/persister.go

Lines changed: 80 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,17 @@ type flushable struct {
369369
totDocs uint64
370370
}
371371

372-
var DefaultNumPersisterWorkers = 4
372+
var DefaultNumPersisterWorkers = 1
373373

374374
// maximum size of data that a single worker is allowed to perform the in-memory
375375
// merge operation.
376-
var DefaultMaxSizeInMemoryMerge = 200 * 1024 * 1024
376+
var DefaultMaxSizeInMemoryMerge = 0
377+
378+
func legacyFlushBehaviour() bool {
379+
// DefaultMaxSizeInMemoryMerge = 0 is a special value to preserve the leagcy
380+
// one-shot in-memory merge + flush behaviour.
381+
return DefaultMaxSizeInMemoryMerge == 0 && DefaultNumPersisterWorkers == 1
382+
}
377383

378384
// persistSnapshotMaybeMerge examines the snapshot and might merge and
379385
// persist the in-memory zap segments if there are enough of them
@@ -390,63 +396,84 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
390396
var numSegsToFlushOut int
391397
var totDocs uint64
392398

393-
for i, snapshot := range snapshot.segment {
394-
if totSize >= DefaultMaxSizeInMemoryMerge {
395-
if len(sbs) >= DefaultMinSegmentsForInMemoryMerge {
396-
numSegsToFlushOut += len(sbs)
397-
val := &flushable{
398-
segments: make([]segment.Segment, len(sbs)),
399-
drops: make([]*roaring.Bitmap, len(sbsDrops)),
400-
sbIdxs: make([]int, len(sbsIndexes)),
401-
totDocs: totDocs,
402-
}
403-
copy(val.segments, sbs)
404-
copy(val.drops, sbsDrops)
405-
copy(val.sbIdxs, sbsIndexes)
406-
flushSet = append(flushSet, val)
407-
408-
oldSegIdxs = append(oldSegIdxs, sbsIndexes...)
409-
sbs = sbs[:0]
410-
sbsDrops = sbsDrops[:0]
411-
sbsIndexes = sbsIndexes[:0]
412-
totSize = 0
413-
totDocs = 0
399+
// legacy behaviour of merge + flush of all in-memory segments in one-shot
400+
if legacyFlushBehaviour() {
401+
val := &flushable{
402+
segments: make([]segment.Segment, 0),
403+
drops: make([]*roaring.Bitmap, 0),
404+
sbIdxs: make([]int, 0),
405+
totDocs: totDocs,
406+
}
407+
for i, snapshot := range snapshot.segment {
408+
if _, ok := snapshot.segment.(segment.PersistedSegment); !ok {
409+
val.segments = append(val.segments, snapshot.segment)
410+
val.drops = append(val.drops, snapshot.deleted)
411+
val.sbIdxs = append(val.sbIdxs, i)
412+
oldSegIdxs = append(oldSegIdxs, i)
413+
val.totDocs += snapshot.segment.Count()
414+
numSegsToFlushOut++
414415
}
415416
}
416417

417-
if len(flushSet) >= DefaultNumPersisterWorkers {
418-
break
419-
}
418+
flushSet = append(flushSet, val)
419+
} else {
420+
for i, snapshot := range snapshot.segment {
421+
if totSize >= DefaultMaxSizeInMemoryMerge {
422+
if len(sbs) >= DefaultMinSegmentsForInMemoryMerge {
423+
numSegsToFlushOut += len(sbs)
424+
val := &flushable{
425+
segments: make([]segment.Segment, len(sbs)),
426+
drops: make([]*roaring.Bitmap, len(sbsDrops)),
427+
sbIdxs: make([]int, len(sbsIndexes)),
428+
totDocs: totDocs,
429+
}
430+
copy(val.segments, sbs)
431+
copy(val.drops, sbsDrops)
432+
copy(val.sbIdxs, sbsIndexes)
433+
flushSet = append(flushSet, val)
434+
435+
oldSegIdxs = append(oldSegIdxs, sbsIndexes...)
436+
sbs = sbs[:0]
437+
sbsDrops = sbsDrops[:0]
438+
sbsIndexes = sbsIndexes[:0]
439+
totSize = 0
440+
totDocs = 0
441+
}
442+
}
420443

421-
if _, ok := snapshot.segment.(segment.PersistedSegment); !ok {
422-
sbs = append(sbs, snapshot.segment)
423-
sbsDrops = append(sbsDrops, snapshot.deleted)
424-
sbsIndexes = append(sbsIndexes, i)
425-
totDocs += snapshot.segment.Count()
426-
totSize += snapshot.segment.Size()
427-
}
428-
}
444+
if len(flushSet) >= DefaultNumPersisterWorkers {
445+
break
446+
}
429447

430-
// if there were too few segments just merge them all as part of a single worker
431-
if len(flushSet) < DefaultNumPersisterWorkers {
432-
numSegsToFlushOut += len(sbs)
433-
val := &flushable{
434-
segments: make([]segment.Segment, len(sbs)),
435-
drops: make([]*roaring.Bitmap, len(sbsDrops)),
436-
sbIdxs: make([]int, len(sbsIndexes)),
437-
totDocs: totDocs,
448+
if _, ok := snapshot.segment.(segment.PersistedSegment); !ok {
449+
sbs = append(sbs, snapshot.segment)
450+
sbsDrops = append(sbsDrops, snapshot.deleted)
451+
sbsIndexes = append(sbsIndexes, i)
452+
totDocs += snapshot.segment.Count()
453+
totSize += snapshot.segment.Size()
454+
}
455+
}
456+
// if there were too few segments just merge them all as part of a single worker
457+
if len(flushSet) < DefaultNumPersisterWorkers {
458+
numSegsToFlushOut += len(sbs)
459+
val := &flushable{
460+
segments: make([]segment.Segment, len(sbs)),
461+
drops: make([]*roaring.Bitmap, len(sbsDrops)),
462+
sbIdxs: make([]int, len(sbsIndexes)),
463+
totDocs: totDocs,
464+
}
465+
copy(val.segments, sbs)
466+
copy(val.drops, sbsDrops)
467+
copy(val.sbIdxs, sbsIndexes)
468+
flushSet = append(flushSet, val)
469+
470+
oldSegIdxs = append(oldSegIdxs, sbsIndexes...)
471+
sbs = sbs[:0]
472+
sbsDrops = sbsDrops[:0]
473+
sbsIndexes = sbsIndexes[:0]
474+
totSize = 0
475+
totDocs = 0
438476
}
439-
copy(val.segments, sbs)
440-
copy(val.drops, sbsDrops)
441-
copy(val.sbIdxs, sbsIndexes)
442-
flushSet = append(flushSet, val)
443-
444-
oldSegIdxs = append(oldSegIdxs, sbsIndexes...)
445-
sbs = sbs[:0]
446-
sbsDrops = sbsDrops[:0]
447-
sbsIndexes = sbsIndexes[:0]
448-
totSize = 0
449-
totDocs = 0
450477
}
451478

452479
if numSegsToFlushOut < DefaultMinSegmentsForInMemoryMerge {

0 commit comments

Comments
 (0)