Skip to content

Commit d13337b

Browse files
Merge pull request #6355 from oasisprotocol/martin/buxfix/runtime-pruning
go: Fix runtime pruning
2 parents 00801ce + ef82d0f commit d13337b

File tree

10 files changed

+151
-76
lines changed

10 files changed

+151
-76
lines changed

.changelog/6352.bugfix.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Fix runtime state pruning performance
2+
3+
Previously, pruning the runtime state with many local versions could
4+
significantly slow down state synchronization.
5+
6+
This has been fixed by moving state pruning out of the BadgerDB
7+
transaction responsible for pruning the runtime’s light history.

go/consensus/api/api.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -428,18 +428,15 @@ type StatePruner interface {
428428
RegisterHandler(handler StatePruneHandler)
429429
}
430430

431-
// StatePruneHandler is a handler that is called when heights are pruned
431+
// StatePruneHandler is a handler that is called before heights are pruned
432432
// from history.
433433
type StatePruneHandler interface {
434-
// Prune is called before the specified height is pruned.
435-
//
436-
// If an error is returned, pruning is aborted and the height is
437-
// not pruned from history.
434+
// CanPruneConsensus is called to check if the specified height can be pruned.
438435
//
439436
// Note that this can be called for the same height multiple
440437
// times (e.g., if one of the handlers fails but others succeed
441438
// and pruning is later retried).
442-
Prune(height int64) error
439+
CanPruneConsensus(height int64) error
443440
}
444441

445442
// EstimateGasRequest is a EstimateGas request.

go/consensus/cometbft/abci/prune.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -136,25 +136,20 @@ func (p *genericPruner) Prune(latestVersion uint64) error {
136136
)
137137

138138
preserveFrom := latestVersion - p.keepN
139-
PruneLoop:
140139
for i := p.earliestVersion; i <= latestVersion; i++ {
141140
if i >= preserveFrom {
142141
p.earliestVersion = i
143142
break
144143
}
145144

146-
// Before pruning anything, run all prune handlers. If any of them
147-
// fails we abort the prune.
148-
for _, ph := range p.handlers {
149-
if err := ph.Prune(int64(i)); err != nil {
150-
p.logger.Debug("prune handler blocked pruning version",
151-
"err", err,
152-
"latest_version", latestVersion,
153-
"version", i,
154-
)
155-
p.earliestVersion = i
156-
break PruneLoop
157-
}
145+
if err := p.canPrune(int64(i)); err != nil {
146+
p.logger.Debug("prune handler blocked pruning version",
147+
"err", err,
148+
"latest_version", latestVersion,
149+
"version", i,
150+
)
151+
p.earliestVersion = i
152+
break
158153
}
159154

160155
p.logger.Debug("Prune: Delete",
@@ -195,6 +190,16 @@ PruneLoop:
195190
return nil
196191
}
197192

193+
// canPrune checks if all prune handlers allow pruning the given version.
194+
func (p *genericPruner) canPrune(v int64) error {
195+
for _, ph := range p.handlers {
196+
if err := ph.CanPruneConsensus(v); err != nil {
197+
return err
198+
}
199+
}
200+
return nil
201+
}
202+
198203
func (p *genericPruner) RegisterHandler(handler consensus.StatePruneHandler) {
199204
p.Lock()
200205
defer p.Unlock()

go/runtime/history/history.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,13 @@ func (h *runtimeHistory) GetEarliestBlock(ctx context.Context) (*block.Block, er
287287
return annBlk.Block, nil
288288
}
289289

290-
func (h *runtimeHistory) Prune(height int64) error {
290+
// CanPruneConsenus returns no error when the specified consensus height has been already reindexed.
291+
//
292+
// Implements consensus.api.StatePruneHandler
293+
func (h *runtimeHistory) CanPruneConsensus(height int64) error {
291294
lastHeight, err := h.LastConsensusHeight()
292295
if err != nil {
293-
h.logger.Warn("failed to fetch last consensus height for tracked runtime",
294-
"err", err,
295-
)
296+
h.logger.Warn("failed to fetch last consensus height for tracked runtime", "err", err)
296297
// We can't be sure if it is ok to prune this version, so prevent pruning to be safe.
297298
return fmt.Errorf("failed to fetch last consensus height for tracked runtime: %w", err)
298299
}
@@ -334,14 +335,10 @@ func (h *runtimeHistory) pruneWorker() {
334335
return
335336
}
336337

337-
h.logger.Debug("pruning runtime history",
338-
"round", round.(uint64),
339-
)
338+
h.logger.Debug("pruning runtime history", "round", round.(uint64))
340339

341340
if err := h.pruner.Prune(round.(uint64)); err != nil {
342-
h.logger.Error("failed to prune",
343-
"err", err,
344-
)
341+
h.logger.Debug("failed to prune", "err", err)
345342
continue
346343
}
347344
case <-h.stopCh:

go/runtime/history/history_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ type testPruneHandler struct {
372372
batches []int
373373
}
374374

375-
func (h *testPruneHandler) Prune(rounds []uint64) error {
375+
func (h *testPruneHandler) CanPruneRuntime(rounds []uint64) error {
376376
// NOTE: Users must ensure that accessing prunedRounds is safe (e.g., that
377377
// no more pruning happens using this handler before prunedRounds is
378378
// accessed from a different goroutine).
@@ -486,7 +486,7 @@ func TestHistoryPrune(t *testing.T) {
486486

487487
type testPruneFailingHandler struct{}
488488

489-
func (h *testPruneFailingHandler) Prune([]uint64) error {
489+
func (h *testPruneFailingHandler) CanPruneRuntime([]uint64) error {
490490
return fmt.Errorf("thou shall not pass")
491491
}
492492

go/runtime/history/prune.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,15 @@ const (
2424
// PrunerFactory is the runtime history pruner factory interface.
2525
type PrunerFactory func(runtimeID common.Namespace, db *DB) (Pruner, error)
2626

27-
// PruneHandler is a handler that is called when rounds are pruned
27+
// PruneHandler is a handler that is called before rounds are pruned
2828
// from history.
2929
type PruneHandler interface {
30-
// Prune is called before the specified rounds are pruned.
31-
//
32-
// If an error is returned, pruning is aborted and the rounds are
33-
// not pruned from history.
30+
// CanPruneRuntime is called before to check if the specified round can be pruned.
3431
//
3532
// Note that this can be called for the same round multiple
3633
// times (e.g., if one of the handlers fails but others succeed
3734
// and pruning is later retried).
38-
Prune(rounds []uint64) error
35+
CanPruneRuntime(rounds []uint64) error
3936
}
4037

4138
// Pruner is the runtime history pruner interface.
@@ -131,32 +128,38 @@ func (p *keepLastPruner) Prune(latestRound uint64) error {
131128
pruned = append(pruned, round)
132129
}
133130

134-
// If there is nothing to prune, do not call any handlers.
135-
if len(pruned) == 0 {
131+
if len(pruned) == 0 { // nothing to prune
136132
return nil
137133
}
138134

139-
// Before pruning anything, run all prune handlers. If any of them
140-
// fails we abort the prune.
141-
p.mu.RLock()
142-
defer p.mu.RUnlock()
143-
144-
for _, ph := range p.handlers {
145-
if err := ph.Prune(pruned); err != nil {
146-
p.logger.Error("prune handler failed, aborting prune",
147-
"err", err,
148-
"round_count", len(pruned),
149-
"round_min", pruned[0],
150-
"round_max", pruned[len(pruned)-1],
151-
)
152-
return fmt.Errorf("runtime/history: prune handler failed: %w", err)
153-
}
135+
if err := p.canPrune(pruned); err != nil {
136+
p.logger.Debug("prune handler blocked pruning, aborting prune",
137+
"err", err,
138+
"round_count", len(pruned),
139+
"round_min", pruned[0],
140+
"round_max", pruned[len(pruned)-1], // safe due to length check above
141+
)
142+
return fmt.Errorf("prune handler blocked pruning: %w", err)
154143
}
155144

156145
return nil
157146
})
158147
}
159148

149+
// canPrune checks if all prune handlers allow pruning specified rounds.
150+
func (p *keepLastPruner) canPrune(rounds []uint64) error {
151+
p.mu.RLock()
152+
defer p.mu.RUnlock()
153+
154+
for _, ph := range p.handlers {
155+
if err := ph.CanPruneRuntime(rounds); err != nil {
156+
return err
157+
}
158+
}
159+
160+
return nil
161+
}
162+
160163
// PruneInterval implements Pruner.
161164
func (p *keepLastPruner) PruneInterval() time.Duration {
162165
return p.pruneInterval

go/worker/compute/executor/committee/prune.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ func NewPruneHandler(runtimeID common.Namespace, consensus consensus.Service) *P
2323
}
2424
}
2525

26-
// Prune verifies that no rounds beyond the last normal round are pruned.
27-
func (p *PruneHandler) Prune(rounds []uint64) error {
28-
// Make sure we never prune past the last normal round, as some runtimes will do historic queries
29-
// for things that are not available in the last consensus state (e.g. delegation/undelegation
30-
// events that happened while the runtime was suspended or not producing blocks).
26+
// CanPruneRuntime returns no error when pruning runtime rounds would not go past last normal round.
27+
//
28+
// This is important as some runtimes will do historic queries for things that are not available
29+
// in the last consensus state (e.g. delegation/undelegation events that happened while the runtime
30+
// was suspended or not producing blocks).
31+
//
32+
// Implements runtime.history.PruneHandler.
33+
func (p *PruneHandler) CanPruneRuntime(rounds []uint64) error {
3134
state, err := p.consensus.RootHash().GetRuntimeState(context.TODO(), &roothash.RuntimeRequest{
3235
RuntimeID: p.runtimeID,
3336
Height: consensus.HeightLatest,

go/worker/storage/committee/prune.go

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package committee
22

33
import (
4+
"context"
45
"fmt"
6+
"time"
57

68
"github.com/oasisprotocol/oasis-core/go/common/logging"
9+
"github.com/oasisprotocol/oasis-core/go/runtime/history"
710
mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
811
)
912

@@ -12,8 +15,10 @@ type pruneHandler struct {
1215
worker *Worker
1316
}
1417

15-
func (p *pruneHandler) Prune(rounds []uint64) error {
16-
// Make sure we never prune past what was synced.
18+
// CanPruneRuntime returns no error when pruning would not go past last synced round.
19+
//
20+
// Implements runtime.history.PruneHandler.
21+
func (p *pruneHandler) CanPruneRuntime(rounds []uint64) error {
1722
lastSycnedRound, _, _ := p.worker.GetLastSynced()
1823

1924
for _, round := range rounds {
@@ -24,23 +29,68 @@ func (p *pruneHandler) Prune(rounds []uint64) error {
2429
}
2530

2631
// Old suggestion: Make sure we don't prune rounds that need to be checkpointed but haven't been yet.
32+
}
2733

28-
p.logger.Debug("pruning storage for round", "round", round)
34+
return nil
35+
}
2936

30-
// Prune given block.
31-
err := p.worker.localStorage.NodeDB().Prune(round)
32-
switch err {
33-
case nil:
34-
case mkvsDB.ErrNotEarliest:
35-
p.logger.Debug("skipping non-earliest round",
36-
"round", round,
37-
)
38-
continue
39-
default:
40-
p.logger.Error("failed to prune block",
41-
"err", err,
42-
)
43-
return err
37+
// statePruner handles pruning of the runtime state.
38+
//
39+
// Everytime pruning is triggered, the pruner removes rounds that are older than the earliest
40+
// round in the runtime’s history.
41+
//
42+
// TODO: Pruning logic is not robust as developer changing the pruning of the runtime history
43+
// may also unexpectedly change the pruning behavior of the state db. This could be fixed
44+
// by making the storage committee worker responsible for both syncing and pruning of the
45+
// history and state DB. See https://github.com/oasisprotocol/oasis-core/issues/6400.
46+
type statePruner struct {
47+
state mkvsDB.NodeDB
48+
history history.History
49+
interval time.Duration
50+
logger *logging.Logger
51+
}
52+
53+
// newPruner creates new runtime state pruner.
54+
func newPruner(ndb mkvsDB.NodeDB, history history.History, interval time.Duration) *statePruner {
55+
return &statePruner{
56+
state: ndb,
57+
history: history,
58+
interval: max(interval, time.Second),
59+
logger: logging.GetLogger("/worker/storage/state-pruner").With("runtime_id", history.RuntimeID()),
60+
}
61+
}
62+
63+
// serve periodically triggers the pruning of the runtime state db.
64+
func (p *statePruner) serve(ctx context.Context) error {
65+
p.logger.Info("starting")
66+
defer p.logger.Info("stopped")
67+
68+
ticker := time.NewTicker(p.interval)
69+
defer ticker.Stop()
70+
71+
for {
72+
select {
73+
case <-ctx.Done():
74+
return ctx.Err()
75+
case <-ticker.C:
76+
}
77+
78+
if err := p.prune(ctx); err != nil {
79+
p.logger.Warn("failed to prune", "err", err)
80+
}
81+
}
82+
}
83+
84+
func (p *statePruner) prune(ctx context.Context) error {
85+
blk, err := p.history.GetEarliestBlock(ctx)
86+
if err != nil {
87+
return fmt.Errorf("failed to get earliest block from runtime history: %w", err)
88+
}
89+
90+
for round := p.state.GetEarliestVersion(); round < blk.Header.Round; round++ {
91+
p.logger.Debug("pruning", "round", round)
92+
if err := p.state.Prune(round); err != nil {
93+
return fmt.Errorf("failed to prune round %d: %w", round, err)
4494
}
4595
}
4696

go/worker/storage/committee/worker.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ type Worker struct {
166166
diffCh chan *fetchedDiff
167167
finalizeCh chan finalizeResult
168168

169+
pruneInterval time.Duration
170+
169171
initCh chan struct{}
170172
}
171173

@@ -177,6 +179,7 @@ func New(
177179
workerCommonCfg workerCommon.Config,
178180
localStorage storageApi.LocalBackend,
179181
checkpointSyncCfg *CheckpointSyncConfig,
182+
pruneInterval time.Duration,
180183
) (*Worker, error) {
181184
initMetrics()
182185

@@ -200,6 +203,8 @@ func New(
200203
diffCh: make(chan *fetchedDiff),
201204
finalizeCh: make(chan finalizeResult),
202205

206+
pruneInterval: pruneInterval,
207+
203208
initCh: make(chan struct{}),
204209
}
205210

@@ -1014,6 +1019,13 @@ func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo
10141019
close(w.initCh)
10151020
w.logger.Info("initialized")
10161021

1022+
statePruner := newPruner(w.localStorage.NodeDB(), w.commonNode.Runtime.History(), w.pruneInterval)
1023+
wg.Go(func() {
1024+
if err := statePruner.serve(ctx); err != nil && !errors.Is(err, context.Canceled) {
1025+
w.logger.Error("state pruner failed: %w", err)
1026+
}
1027+
})
1028+
10171029
// Notify the checkpointer of the genesis round so it can be checkpointed.
10181030
w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round)
10191031
w.checkpointer.Flush()

go/worker/storage/worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
109109
Disabled: config.GlobalConfig.Storage.CheckpointSyncDisabled,
110110
ChunkFetcherCount: config.GlobalConfig.Storage.FetcherCount,
111111
},
112+
config.GlobalConfig.Runtime.Prune.Interval,
112113
)
113114
if err != nil {
114115
return err

0 commit comments

Comments
 (0)