Skip to content

Commit 86e7c6c

Browse files
committed
making configs part of persister options
1 parent 2f4e4d1 commit 86e7c6c

File tree

1 file changed

+20
-10
lines changed

1 file changed

+20
-10
lines changed

index/scorch/persister.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,14 @@ type persisterOptions struct {
7979
// for the number of paused application threads. The default value would
8080
// be a very high number to always favour the merging of memory segments.
8181
MemoryPressurePauseThreshold uint64
82+
83+
// NumPersisterWorkers decides the number of parallel workers that will
84+
// perform the in-memory merge of segments followed by a flush operation.
85+
NumPersisterWorkers int
86+
87+
// MaxSizeInMemoryMerge is the maximum size of data that a single persister
88+
// worker is allowed to work on
89+
MaxSizeInMemoryMerge int
8290
}
8391

8492
type notificationChan chan struct{}
@@ -323,6 +331,8 @@ func (s *Scorch) parsePersisterOptions() (*persisterOptions, error) {
323331
PersisterNapTimeMSec: DefaultPersisterNapTimeMSec,
324332
PersisterNapUnderNumFiles: DefaultPersisterNapUnderNumFiles,
325333
MemoryPressurePauseThreshold: DefaultMemoryPressurePauseThreshold,
334+
NumPersisterWorkers: DefaultNumPersisterWorkers,
335+
MaxSizeInMemoryMerge: DefaultMaxSizeInMemoryMerge,
326336
}
327337
if v, ok := s.config["scorchPersisterOptions"]; ok {
328338
b, err := util.MarshalJSON(v)
@@ -344,7 +354,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot,
344354
// below the configured threshold, else the persister performs the
345355
// direct persistence of segments.
346356
if s.NumEventsBlocking() < po.MemoryPressurePauseThreshold {
347-
persisted, err := s.persistSnapshotMaybeMerge(snapshot)
357+
persisted, err := s.persistSnapshotMaybeMerge(snapshot, po)
348358
if err != nil {
349359
return err
350360
}
@@ -369,23 +379,23 @@ type flushable struct {
369379
totDocs uint64
370380
}
371381

372-
// number workers which parallely perform an in-memory merge of the segments followed
373-
// by a flush operation.
382+
// number workers which parallely perform an in-memory merge of the segments
383+
// followed by a flush operation.
374384
var DefaultNumPersisterWorkers = 4
375385

376386
// maximum size of data that a single worker is allowed to perform the in-memory
377387
// merge operation.
378388
var DefaultMaxSizeInMemoryMerge = 200 * 1024 * 1024
379389

380-
func legacyFlushBehaviour() bool {
390+
func legacyFlushBehaviour(maxSizeInMemoryMerge, numPersisterWorkers int) bool {
381391
// DefaultMaxSizeInMemoryMerge = 0 is a special value to preserve the leagcy
382392
// one-shot in-memory merge + flush behaviour.
383-
return DefaultMaxSizeInMemoryMerge == 0 && DefaultNumPersisterWorkers == 1
393+
return maxSizeInMemoryMerge == 0 && numPersisterWorkers == 1
384394
}
385395

386396
// persistSnapshotMaybeMerge examines the snapshot and might merge and
387397
// persist the in-memory zap segments if there are enough of them
388-
func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
398+
func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot, po *persisterOptions) (
389399
bool, error) {
390400
// collect the in-memory zap segments (SegmentBase instances)
391401
var sbs []segment.Segment
@@ -399,7 +409,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
399409
var totDocs uint64
400410

401411
// legacy behaviour of merge + flush of all in-memory segments in one-shot
402-
if legacyFlushBehaviour() {
412+
if legacyFlushBehaviour(po.MaxSizeInMemoryMerge, po.NumPersisterWorkers) {
403413
val := &flushable{
404414
segments: make([]segment.Segment, 0),
405415
drops: make([]*roaring.Bitmap, 0),
@@ -422,7 +432,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
422432
// constructs a flushSet where each flushable object contains a set of segments
423433
// to be merged and flushed out to disk.
424434
for i, snapshot := range snapshot.segment {
425-
if totSize >= DefaultMaxSizeInMemoryMerge {
435+
if totSize >= po.MaxSizeInMemoryMerge {
426436
if len(sbs) >= DefaultMinSegmentsForInMemoryMerge {
427437
numSegsToFlushOut += len(sbs)
428438
val := &flushable{
@@ -445,7 +455,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
445455
}
446456
}
447457

448-
if len(flushSet) >= DefaultNumPersisterWorkers {
458+
if len(flushSet) >= int(po.NumPersisterWorkers) {
449459
break
450460
}
451461

@@ -458,7 +468,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
458468
}
459469
}
460470
// if there were too few segments just merge them all as part of a single worker
461-
if len(flushSet) < DefaultNumPersisterWorkers {
471+
if len(flushSet) < po.NumPersisterWorkers {
462472
numSegsToFlushOut += len(sbs)
463473
val := &flushable{
464474
segments: make([]segment.Segment, len(sbs)),

0 commit comments

Comments
 (0)