Skip to content

Commit 863ad9e

Browse files
Merge #4108
4108: [General] Badger log files cleaner r=durkmurder a=durkmurder ### Context Updated `Cleaner` to run on timely basis using `timer.Ticker`. Cleaner now runs in a separate component using dedicated goroutine, initialized at node startup for each node type. Default interval is 10min(previously was the same) + 20% jitter to avoid scheduling uniformly. Co-authored-by: Yurii Oleksyshyn <[email protected]>
2 parents 2549636 + 8f5f86b commit 863ad9e

File tree

9 files changed

+87
-118
lines changed

9 files changed

+87
-118
lines changed

cmd/consensus/main.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -685,20 +685,17 @@ func main() {
685685
return hot, nil
686686
}).
687687
Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
688-
// initialize the entity database accessors
689-
cleaner := bstorage.NewCleaner(node.Logger, node.DB, node.Metrics.CleanCollector, flow.DefaultValueLogGCFrequency)
690-
691688
// initialize the pending blocks cache
692689
proposals := buffer.NewPendingBlocks()
693690

694691
logger := createLogger(node.Logger, node.RootChainID)
695-
complianceCore, err := compliance.NewCore(logger,
692+
complianceCore, err := compliance.NewCore(
693+
logger,
696694
node.Metrics.Engine,
697695
node.Metrics.Mempool,
698696
mainMetrics,
699697
node.Metrics.Compliance,
700698
node.Tracer,
701-
cleaner,
702699
node.Storage.Headers,
703700
node.Storage.Payloads,
704701
mutableState,

cmd/scaffold.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,10 @@ func (fnb *FlowNodeBuilder) initDB() error {
887887
return nil
888888
})
889889

890+
fnb.Component("badger log cleaner", func(node *NodeConfig) (module.ReadyDoneAware, error) {
891+
return bstorage.NewCleaner(node.Logger, node.DB, node.Metrics.CleanCollector, flow.DefaultValueLogGCWaitDuration), nil
892+
})
893+
890894
return nil
891895
}
892896

consensus/integration/nodes_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -427,9 +427,6 @@ func createNode(
427427
notifier.AddConsumer(counterConsumer)
428428
notifier.AddConsumer(logConsumer)
429429

430-
cleaner := &storagemock.Cleaner{}
431-
cleaner.On("RunGC")
432-
433430
require.Equal(t, participant.nodeInfo.NodeID, localID)
434431
privateKeys, err := participant.nodeInfo.PrivateKeys()
435432
require.NoError(t, err)
@@ -589,7 +586,6 @@ func createNode(
589586
metricsCollector,
590587
metricsCollector,
591588
tracer,
592-
cleaner,
593589
headersDB,
594590
payloadsDB,
595591
fullState,

engine/consensus/compliance/core.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ type Core struct {
4343
hotstuffMetrics module.HotstuffMetrics
4444
complianceMetrics module.ComplianceMetrics
4545
tracer module.Tracer
46-
cleaner storage.Cleaner
4746
headers storage.Headers
4847
payloads storage.Payloads
4948
state protocol.ParticipantState
@@ -66,7 +65,6 @@ func NewCore(
6665
hotstuffMetrics module.HotstuffMetrics,
6766
complianceMetrics module.ComplianceMetrics,
6867
tracer module.Tracer,
69-
cleaner storage.Cleaner,
7068
headers storage.Headers,
7169
payloads storage.Payloads,
7270
state protocol.ParticipantState,
@@ -92,7 +90,6 @@ func NewCore(
9290
mempoolMetrics: mempool,
9391
hotstuffMetrics: hotstuffMetrics,
9492
complianceMetrics: complianceMetrics,
95-
cleaner: cleaner,
9693
headers: headers,
9794
payloads: payloads,
9895
state: state,
@@ -238,12 +235,6 @@ func (c *Core) OnBlockProposal(originID flow.Identifier, proposal *messages.Bloc
238235
return fmt.Errorf("could not process block proposal: %w", err)
239236
}
240237

241-
// most of the heavy database checks are done at this point, so this is a
242-
// good moment to potentially kick-off a garbage collection of the DB
243-
// NOTE: this is only effectively run every 1000th calls, which corresponds
244-
// to every 1000th successfully processed block
245-
c.cleaner.RunGC()
246-
247238
return nil
248239
}
249240

engine/consensus/compliance/core_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ type CommonSuite struct {
6161
me *module.Local
6262
metrics *metrics.NoopCollector
6363
tracer realModule.Tracer
64-
cleaner *storage.Cleaner
6564
headers *storage.Headers
6665
payloads *storage.Payloads
6766
state *protocol.ParticipantState
@@ -111,10 +110,6 @@ func (cs *CommonSuite) SetupTest() {
111110
},
112111
)
113112

114-
// set up storage cleaner
115-
cs.cleaner = &storage.Cleaner{}
116-
cs.cleaner.On("RunGC").Return()
117-
118113
// set up header storage mock
119114
cs.headers = &storage.Headers{}
120115
cs.headers.On("Store", mock.Anything).Return(
@@ -257,7 +252,6 @@ func (cs *CommonSuite) SetupTest() {
257252
cs.metrics,
258253
cs.metrics,
259254
cs.tracer,
260-
cs.cleaner,
261255
cs.headers,
262256
cs.payloads,
263257
cs.state,

model/flow/constants.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,8 @@ const DefaultMaxCollectionTotalGas = 10_000_000 // 10M
4040
// DefaultMaxCollectionSize is the default maximum number of transactions allowed inside a collection.
4141
const DefaultMaxCollectionSize = 100
4242

43-
// DefaultValueLogGCFrequency is the default frequency in blocks that we call the
44-
// badger value log GC. Equivalent to 10 mins for a 1 second block time
45-
const DefaultValueLogGCFrequency = 10 * 60
43+
// DefaultValueLogGCWaitDuration is the default wait duration before we repeatedly call the badger value log GC.
44+
const DefaultValueLogGCWaitDuration time.Duration = 10 * time.Minute
4645

4746
// DefaultRequiredApprovalsForSealConstruction is the default number of approvals required to construct a candidate seal
4847
// for subsequent inclusion in block.

storage/badger/cleaner.go

Lines changed: 79 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -10,78 +10,103 @@ import (
1010
"github.com/rs/zerolog"
1111

1212
"github.com/onflow/flow-go/module"
13+
"github.com/onflow/flow-go/module/component"
14+
"github.com/onflow/flow-go/module/irrecoverable"
1315
)
1416

17+
// Cleaner uses component.ComponentManager to implement module.Startable and module.ReadyDoneAware
18+
// to run an internal goroutine which run badger value log garbage collection at a semi-regular interval.
19+
// The Cleaner exists for 2 reasons:
20+
// - Run GC frequently enough that each GC is relatively inexpensive
21+
// - Avoid GC being synchronized across all nodes. Since in the happy path, all nodes have very similar
22+
// database load patterns, without intervention they are likely to schedule GC at the same time, which
23+
// can cause temporary consensus halts.
1524
type Cleaner struct {
16-
log zerolog.Logger
17-
db *badger.DB
18-
metrics module.CleanerMetrics
19-
enabled bool
20-
ratio float64
21-
freq int
22-
calls int
25+
component.Component
26+
log zerolog.Logger
27+
db *badger.DB
28+
metrics module.CleanerMetrics
29+
ratio float64
30+
interval time.Duration
2331
}
2432

25-
// NewCleaner returns a cleaner that runs the badger value log garbage collection once every `frequency` calls
26-
// if a frequency of zero is passed in, we will not run the GC at all
27-
func NewCleaner(log zerolog.Logger, db *badger.DB, metrics module.CleanerMetrics, frequency int) *Cleaner {
33+
var _ component.Component = (*Cleaner)(nil)
34+
35+
// NewCleaner returns a cleaner that runs the badger value log garbage collection once every `interval` duration
36+
// if an interval of zero is passed in, we will not run the GC at all.
37+
func NewCleaner(log zerolog.Logger, db *badger.DB, metrics module.CleanerMetrics, interval time.Duration) *Cleaner {
2838
// NOTE: we run garbage collection frequently at points in our business
2939
// logic where we are likely to have a small breather in activity; it thus
3040
// makes sense to run garbage collection often, with a smaller ratio, rather
3141
// than running it rarely and having big rewrites at once
3242
c := &Cleaner{
33-
log: log.With().Str("component", "cleaner").Logger(),
34-
db: db,
35-
metrics: metrics,
36-
ratio: 0.2,
37-
freq: frequency,
38-
enabled: frequency > 0, // Disable if passed in 0 as frequency
43+
log: log.With().Str("component", "cleaner").Logger(),
44+
db: db,
45+
metrics: metrics,
46+
ratio: 0.2,
47+
interval: interval,
3948
}
40-
// we don't want the entire network to run GC at the same time, so
41-
// distribute evenly over time
42-
if c.enabled {
43-
c.calls = rand.Intn(c.freq)
49+
50+
// Disable if passed in 0 as interval
51+
if c.interval == 0 {
52+
c.Component = &module.NoopComponent{}
53+
return c
4454
}
55+
56+
c.Component = component.NewComponentManagerBuilder().
57+
AddWorker(c.gcWorkerRoutine).
58+
Build()
59+
4560
return c
4661
}
4762

48-
func (c *Cleaner) RunGC() {
49-
if !c.enabled {
63+
// gcWorkerRoutine runs badger GC on timely basis.
64+
func (c *Cleaner) gcWorkerRoutine(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
65+
ready()
66+
ticker := time.NewTicker(c.nextWaitDuration())
67+
defer ticker.Stop()
68+
for {
69+
select {
70+
case <-ctx.Done():
71+
return
72+
case <-ticker.C:
73+
c.runGC()
74+
75+
// reset the ticker with a new interval and random jitter
76+
ticker.Reset(c.nextWaitDuration())
77+
}
78+
}
79+
}
80+
81+
// nextWaitDuration calculates next duration for Cleaner to wait before attempting to run GC.
82+
// We add 20% jitter into the interval, so that we don't risk nodes syncing their GC calls over time.
83+
// Therefore GC is run every X seconds, where X is uniformly sampled from [interval, interval*1.2]
84+
func (c *Cleaner) nextWaitDuration() time.Duration {
85+
return time.Duration(c.interval.Milliseconds() + rand.Int63n(c.interval.Milliseconds()/5))
86+
}
87+
88+
// runGC runs garbage collection for badger DB, handles sentinel errors and reports metrics.
89+
func (c *Cleaner) runGC() {
90+
started := time.Now()
91+
err := c.db.RunValueLogGC(c.ratio)
92+
if err == badger.ErrRejected {
93+
// NOTE: this happens when a GC call is already running
94+
c.log.Warn().Msg("garbage collection on value log already running")
5095
return
5196
}
52-
// only actually run approximately every frequency number of calls
53-
c.calls++
54-
if c.calls < c.freq {
97+
if err == badger.ErrNoRewrite {
98+
// NOTE: this happens when no files have any garbage to drop
99+
c.log.Debug().Msg("garbage collection on value log unnecessary")
100+
return
101+
}
102+
if err != nil {
103+
c.log.Error().Err(err).Msg("garbage collection on value log failed")
55104
return
56105
}
57106

58-
// we add 20% jitter into the interval, so that we don't risk nodes syncing
59-
// up on their GC calls over time
60-
c.calls = rand.Intn(c.freq / 5)
61-
62-
// run the garbage collection in own goroutine and handle sentinel errors
63-
go func() {
64-
started := time.Now()
65-
err := c.db.RunValueLogGC(c.ratio)
66-
if err == badger.ErrRejected {
67-
// NOTE: this happens when a GC call is already running
68-
c.log.Warn().Msg("garbage collection on value log already running")
69-
return
70-
}
71-
if err == badger.ErrNoRewrite {
72-
// NOTE: this happens when no files have any garbage to drop
73-
c.log.Debug().Msg("garbage collection on value log unnecessary")
74-
return
75-
}
76-
if err != nil {
77-
c.log.Error().Err(err).Msg("garbage collection on value log failed")
78-
return
79-
}
80-
81-
runtime := time.Since(started)
82-
c.log.Debug().
83-
Dur("gc_duration", runtime).
84-
Msg("garbage collection on value log executed")
85-
c.metrics.RanGC(runtime)
86-
}()
107+
runtime := time.Since(started)
108+
c.log.Debug().
109+
Dur("gc_duration", runtime).
110+
Msg("garbage collection on value log executed")
111+
c.metrics.RanGC(runtime)
87112
}

storage/cleaner.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

storage/mock/cleaner.go

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)