Skip to content
2 changes: 1 addition & 1 deletion index/scorch/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (o *Builder) Close() error {
}

// fill the root bolt with this fake index snapshot
_, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin, nil)
_, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin, nil, nil)
if err != nil {
_ = tx.Rollback()
_ = rootBolt.Close()
Expand Down
99 changes: 56 additions & 43 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,33 +352,39 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
creator: "introduceMerge",
}

// iterate through current segments
newSegmentDeleted := roaring.NewBitmap()
var running, docsToPersistCount, memSegments, fileSegments uint64
var droppedSegmentFiles []string
newSegmentDeleted := make([]*roaring.Bitmap, len(nextMerge.new))
for i := range newSegmentDeleted {
// create a bitmaps to track the obsoletes per newly merged segments
newSegmentDeleted[i] = roaring.NewBitmap()
}

// iterate through current segments
for i := range root.segment {
segmentID := root.segment[i].id
if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
if segSnapAtMerge, ok := nextMerge.mergedSegHistory[segmentID]; ok {
// this segment is going away, see if anything else was deleted since we started the merge
if segSnapAtMerge != nil && root.segment[i].deleted != nil {
// assume all these deletes are new
deletedSince := root.segment[i].deleted
// if we already knew about some of them, remove
if segSnapAtMerge.deleted != nil {
deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.deleted)
if segSnapAtMerge.oldSegment.deleted != nil {
deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.oldSegment.deleted)
}
deletedSinceItr := deletedSince.Iterator()
for deletedSinceItr.HasNext() {
oldDocNum := deletedSinceItr.Next()
newDocNum := nextMerge.oldNewDocNums[segmentID][oldDocNum]
newSegmentDeleted.Add(uint32(newDocNum))
newDocNum := segSnapAtMerge.oldNewDocIDs[oldDocNum]
newSegmentDeleted[segSnapAtMerge.workerID].Add(uint32(newDocNum))
}
}

// clean up the old segment map to figure out the
// obsolete segments wrt root in meantime, whatever
// segments left behind in old map after processing
// the root segments would be the obsolete segment set
delete(nextMerge.old, segmentID)
delete(nextMerge.mergedSegHistory, segmentID)
} else if root.segment[i].LiveSize() > 0 {
// this segment is staying
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
Expand Down Expand Up @@ -410,52 +416,59 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// before the newMerge introduction, need to clean the newly
// merged segment wrt the current root segments, hence
// applying the obsolete segment contents to newly merged segment
for segID, ss := range nextMerge.old {
obsoleted := ss.DocNumbersLive()
for _, ss := range nextMerge.mergedSegHistory {
obsoleted := ss.oldSegment.DocNumbersLive()
if obsoleted != nil {
obsoletedIter := obsoleted.Iterator()
for obsoletedIter.HasNext() {
oldDocNum := obsoletedIter.Next()
newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum]
newSegmentDeleted.Add(uint32(newDocNum))
newDocNum := ss.oldNewDocIDs[oldDocNum]
newSegmentDeleted[ss.workerID].Add(uint32(newDocNum))
}
}
}
var skipped bool
// In case where all the docs in the newly merged segment getting
// deleted by the time we reach here, can skip the introduction.
if nextMerge.new != nil &&
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {

stats := newFieldStats()
if fsr, ok := nextMerge.new.(segment.FieldStatsReporter); ok {
fsr.UpdateFieldStats(stats)
}

// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
deleted: newSegmentDeleted,
stats: stats,
cachedDocs: &cachedDocs{cache: nil},
cachedMeta: &cachedMeta{meta: nil},
creator: "introduceMerge",
mmaped: nextMerge.mmaped,
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
skipped := true
// make the newly merged segments part of the newSnapshot being constructed
for i, newMergedSegment := range nextMerge.new {
// checking if this newly merged segment is worth keeping based on
// obsoleted doc count since the merge intro started
if newMergedSegment != nil &&
newMergedSegment.Count() > newSegmentDeleted[i].GetCardinality() {
stats := newFieldStats()
if fsr, ok := newMergedSegment.(segment.FieldStatsReporter); ok {
fsr.UpdateFieldStats(stats)
}

switch nextMerge.new.(type) {
case segment.PersistedSegment:
fileSegments++
default:
docsToPersistCount += nextMerge.new.Count() - newSegmentDeleted.GetCardinality()
memSegments++
// put the merged segment at the end of newSnapshot
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id[i],
segment: newMergedSegment, // take ownership for nextMerge.new's ref-count
deleted: newSegmentDeleted[i],
stats: stats,
cachedDocs: &cachedDocs{cache: nil},
cachedMeta: &cachedMeta{meta: nil},
creator: "introduceMerge",
mmaped: nextMerge.mmaped,
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
running += newMergedSegment.Count()

switch newMergedSegment.(type) {
case segment.PersistedSegment:
fileSegments++
default:
docsToPersistCount += newMergedSegment.Count() - newSegmentDeleted[i].GetCardinality()
memSegments++
}
skipped = false
}
} else {
skipped = true
}

if skipped {
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsObsoleted, 1)
} else {
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, uint64(len(nextMerge.new)))
}

atomic.StoreUint64(&s.stats.TotItemsToPersist, docsToPersistCount)
Expand Down
Loading