Skip to content

Commit 4d8af1a

Browse files
committed
Changed badger cleaner to run on interval basis and run for all node types
1 parent 557e4cf commit 4d8af1a

File tree

6 files changed

+78
-94
lines changed

6 files changed

+78
-94
lines changed

cmd/consensus/main.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -685,31 +685,11 @@ func main() {
685685
return hot, nil
686686
}).
687687
Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
688-
// initialize the entity database accessors
689-
cleaner := bstorage.NewCleaner(node.Logger, node.DB, node.Metrics.CleanCollector, flow.DefaultValueLogGCFrequency)
690-
691688
// initialize the pending blocks cache
692689
proposals := buffer.NewPendingBlocks()
693690

694691
logger := createLogger(node.Logger, node.RootChainID)
695-
complianceCore, err := compliance.NewCore(logger,
696-
node.Metrics.Engine,
697-
node.Metrics.Mempool,
698-
mainMetrics,
699-
node.Metrics.Compliance,
700-
node.Tracer,
701-
cleaner,
702-
node.Storage.Headers,
703-
node.Storage.Payloads,
704-
mutableState,
705-
proposals,
706-
syncCore,
707-
hotstuffModules.Validator,
708-
hot,
709-
hotstuffModules.VoteAggregator,
710-
hotstuffModules.TimeoutAggregator,
711-
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
712-
)
692+
complianceCore, err := compliance.NewCore(logger, node.Metrics.Engine, node.Metrics.Mempool, mainMetrics, node.Metrics.Compliance, node.Tracer, node.Storage.Headers, node.Storage.Payloads, mutableState, proposals, syncCore, hotstuffModules.Validator, hot, hotstuffModules.VoteAggregator, hotstuffModules.TimeoutAggregator, modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold))
713693
if err != nil {
714694
return nil, fmt.Errorf("could not initialize compliance core: %w", err)
715695
}

cmd/scaffold.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,10 @@ func (fnb *FlowNodeBuilder) initDB() error {
873873
return nil
874874
})
875875

876+
fnb.Component("badger log cleaner", func(node *NodeConfig) (module.ReadyDoneAware, error) {
877+
return bstorage.NewCleaner(node.Logger, node.DB, node.Metrics.CleanCollector, flow.DefaultValueLogGCWaitDuration), nil
878+
})
879+
876880
return nil
877881
}
878882

engine/consensus/compliance/core.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ type Core struct {
4343
hotstuffMetrics module.HotstuffMetrics
4444
complianceMetrics module.ComplianceMetrics
4545
tracer module.Tracer
46-
cleaner storage.Cleaner
4746
headers storage.Headers
4847
payloads storage.Payloads
4948
state protocol.ParticipantState
@@ -66,7 +65,6 @@ func NewCore(
6665
hotstuffMetrics module.HotstuffMetrics,
6766
complianceMetrics module.ComplianceMetrics,
6867
tracer module.Tracer,
69-
cleaner storage.Cleaner,
7068
headers storage.Headers,
7169
payloads storage.Payloads,
7270
state protocol.ParticipantState,
@@ -92,7 +90,6 @@ func NewCore(
9290
mempoolMetrics: mempool,
9391
hotstuffMetrics: hotstuffMetrics,
9492
complianceMetrics: complianceMetrics,
95-
cleaner: cleaner,
9693
headers: headers,
9794
payloads: payloads,
9895
state: state,
@@ -238,12 +235,6 @@ func (c *Core) OnBlockProposal(originID flow.Identifier, proposal *messages.Bloc
238235
return fmt.Errorf("could not process block proposal: %w", err)
239236
}
240237

241-
// most of the heavy database checks are done at this point, so this is a
242-
// good moment to potentially kick-off a garbage collection of the DB
243-
// NOTE: this is only effectively run every 1000th calls, which corresponds
244-
// to every 1000th successfully processed block
245-
c.cleaner.RunGC()
246-
247238
return nil
248239
}
249240

model/flow/constants.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,8 @@ const DefaultMaxCollectionTotalGas = 10_000_000 // 10M
4040
// DefaultMaxCollectionSize is the default maximum number of transactions allowed inside a collection.
4141
const DefaultMaxCollectionSize = 100
4242

43-
// DefaultValueLogGCFrequency is the default frequency in blocks that we call the
44-
// badger value log GC. Equivalent to 10 mins for a 1 second block time
45-
const DefaultValueLogGCFrequency = 10 * 60
43+
// DefaultValueLogGCWaitDuration is the default wait duration before we repeatedly call the badger value log GC.
44+
const DefaultValueLogGCWaitDuration time.Duration = 10 * time.Minute
4645

4746
// DefaultRequiredApprovalsForSealConstruction is the default number of approvals required to construct a candidate seal
4847
// for subsequent inclusion in block.

storage/badger/cleaner.go

Lines changed: 71 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
package badger
44

55
import (
6+
"github.com/onflow/flow-go/module/component"
7+
"github.com/onflow/flow-go/module/irrecoverable"
68
"math/rand"
79
"time"
810

@@ -12,76 +14,91 @@ import (
1214
"github.com/onflow/flow-go/module"
1315
)
1416

17+
// Cleaner uses component.ComponentManager to implement module.Startable and module.ReadyDoneAware
18+
// to run an internal goroutine which run badger value log garbage collection on timely basis.
1519
type Cleaner struct {
16-
log zerolog.Logger
17-
db *badger.DB
18-
metrics module.CleanerMetrics
19-
enabled bool
20-
ratio float64
21-
freq int
22-
calls int
20+
*component.ComponentManager
21+
log zerolog.Logger
22+
db *badger.DB
23+
metrics module.CleanerMetrics
24+
ratio float64
25+
interval time.Duration
2326
}
2427

25-
// NewCleaner returns a cleaner that runs the badger value log garbage collection once every `frequency` calls
26-
// if a frequency of zero is passed in, we will not run the GC at all
27-
func NewCleaner(log zerolog.Logger, db *badger.DB, metrics module.CleanerMetrics, frequency int) *Cleaner {
28+
var _ component.Component = (*Cleaner)(nil)
29+
30+
// NewCleaner returns a cleaner that runs the badger value log garbage collection once every `interval` duration
31+
// if an interval of zero is passed in, we will not run the GC at all.
32+
func NewCleaner(log zerolog.Logger, db *badger.DB, metrics module.CleanerMetrics, interval time.Duration) *Cleaner {
2833
// NOTE: we run garbage collection frequently at points in our business
2934
// logic where we are likely to have a small breather in activity; it thus
3035
// makes sense to run garbage collection often, with a smaller ratio, rather
3136
// than running it rarely and having big rewrites at once
3237
c := &Cleaner{
33-
log: log.With().Str("component", "cleaner").Logger(),
34-
db: db,
35-
metrics: metrics,
36-
ratio: 0.2,
37-
freq: frequency,
38-
enabled: frequency > 0, // Disable if passed in 0 as frequency
38+
log: log.With().Str("component", "cleaner").Logger(),
39+
db: db,
40+
metrics: metrics,
41+
ratio: 0.2,
42+
interval: interval,
3943
}
40-
// we don't want the entire network to run GC at the same time, so
41-
// distribute evenly over time
42-
if c.enabled {
43-
c.calls = rand.Intn(c.freq)
44+
45+
cmBuilder := component.NewComponentManagerBuilder()
46+
47+
// Disable if passed in 0 as interval
48+
if c.interval > 0 {
49+
cmBuilder.AddWorker(c.gcWorkerRoutine)
4450
}
51+
52+
c.ComponentManager = cmBuilder.Build()
4553
return c
4654
}
4755

48-
func (c *Cleaner) RunGC() {
49-
if !c.enabled {
56+
// gcWorkerRoutine runs badger GC on timely basis.
57+
func (c *Cleaner) gcWorkerRoutine(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
58+
ready()
59+
ticker := time.NewTicker(c.nextWaitDuration())
60+
for {
61+
select {
62+
case <-ctx.Done():
63+
return
64+
case <-ticker.C:
65+
c.runGC()
66+
67+
// reset the ticker with a new interval and random jitter
68+
ticker.Reset(c.nextWaitDuration())
69+
}
70+
}
71+
}
72+
73+
// nextWaitDuration calculates next duration for Cleaner to wait before attempting to run GC.
74+
// We add 20% jitter into the interval, so that we don't risk nodes syncing
75+
// up on their GC calls over time.
76+
func (c *Cleaner) nextWaitDuration() time.Duration {
77+
return time.Duration(c.interval.Milliseconds() + rand.Int63n(c.interval.Milliseconds()/5))
78+
}
79+
80+
// runGC runs garbage collection for badger DB, handles sentinel errors and reports metrics.
81+
func (c *Cleaner) runGC() {
82+
started := time.Now()
83+
err := c.db.RunValueLogGC(c.ratio)
84+
if err == badger.ErrRejected {
85+
// NOTE: this happens when a GC call is already running
86+
c.log.Warn().Msg("garbage collection on value log already running")
5087
return
5188
}
52-
// only actually run approximately every frequency number of calls
53-
c.calls++
54-
if c.calls < c.freq {
89+
if err == badger.ErrNoRewrite {
90+
// NOTE: this happens when no files have any garbage to drop
91+
c.log.Debug().Msg("garbage collection on value log unnecessary")
92+
return
93+
}
94+
if err != nil {
95+
c.log.Error().Err(err).Msg("garbage collection on value log failed")
5596
return
5697
}
5798

58-
// we add 20% jitter into the interval, so that we don't risk nodes syncing
59-
// up on their GC calls over time
60-
c.calls = rand.Intn(c.freq / 5)
61-
62-
// run the garbage collection in own goroutine and handle sentinel errors
63-
go func() {
64-
started := time.Now()
65-
err := c.db.RunValueLogGC(c.ratio)
66-
if err == badger.ErrRejected {
67-
// NOTE: this happens when a GC call is already running
68-
c.log.Warn().Msg("garbage collection on value log already running")
69-
return
70-
}
71-
if err == badger.ErrNoRewrite {
72-
// NOTE: this happens when no files have any garbage to drop
73-
c.log.Debug().Msg("garbage collection on value log unnecessary")
74-
return
75-
}
76-
if err != nil {
77-
c.log.Error().Err(err).Msg("garbage collection on value log failed")
78-
return
79-
}
80-
81-
runtime := time.Since(started)
82-
c.log.Debug().
83-
Dur("gc_duration", runtime).
84-
Msg("garbage collection on value log executed")
85-
c.metrics.RanGC(runtime)
86-
}()
99+
runtime := time.Since(started)
100+
c.log.Debug().
101+
Dur("gc_duration", runtime).
102+
Msg("garbage collection on value log executed")
103+
c.metrics.RanGC(runtime)
87104
}

storage/cleaner.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)