Skip to content

Commit 73f46b1

Browse files
committed
error handling using slice
1 parent 5b98242 commit 73f46b1

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

index/scorch/merge.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,8 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
493493
newMergedSegmentIDs := make([]uint64, len(flushableObjs))
494494
numFlushes := len(flushableObjs)
495495
var numSegments, newMergedCount uint64
496-
errs := make([]error, numFlushes)
496+
var em sync.Mutex
497+
var errs []error
497498

498499
// deploy the workers to merge and flush the batches of segments parallely
499500
for i := 0; i < numFlushes; i++ {
@@ -509,31 +510,42 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
509510
newDocIDs, _, err :=
510511
s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
511512
if err != nil {
512-
errs[id] = err
513+
em.Lock()
514+
errs = append(errs, err)
515+
em.Unlock()
513516
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
514517
return
515518
}
516519
newMergedSegmentIDs[id] = newSegmentID
517520
newDocIDsSet[id] = newDocIDs
518521
newMergedSegments[id], err = s.segPlugin.Open(path)
519522
if err != nil {
520-
errs[id] = err
523+
em.Lock()
524+
errs = append(errs, err)
525+
em.Unlock()
521526
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
522527
return
523528
}
524529
atomic.AddUint64(&newMergedCount, newMergedSegments[id].Count())
525530
atomic.AddUint64(&numSegments, uint64(len(segsBatch)))
526531
}(flushableObjs[i].segments, flushableObjs[i].drops, i)
527532
}
528-
529533
wg.Wait()
530534

531-
if errs[0] != nil {
535+
if errs != nil {
532536
// close the new merged segments
533537
_ = closeNewMergedSegments(newMergedSegments)
534-
535-
// tbd: need a better way to consolidate errors
536-
return nil, nil, errs[0]
538+
var errf error
539+
for _, err := range errs {
540+
if err == segment.ErrClosed {
541+
// the index snapshot was closed which will be handled gracefully
542+
// by retrying the whole merge+flush operation in a later iteration
543+
// so its safe to early exit the same error.
544+
return nil, nil, err
545+
}
546+
errf = fmt.Errorf("%w; %v", errf, err)
547+
}
548+
return nil, nil, errf
537549
}
538550

539551
atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1)

0 commit comments

Comments
 (0)