Skip to content

Commit 91d21cf

Browse files
authored
Merge pull request #6384 from oasisprotocol/peternose/trivial/simplify-pruner
go/worker/compute/executor: Simplify prune handler
2 parents 0104878 + f8a98b0 commit 91d21cf

File tree

9 files changed

+49
-69
lines changed

9 files changed

+49
-69
lines changed

.changelog/6384.trivial.md

Whitespace-only changes.

go/worker/client/committee/node.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,6 @@ func (n *Node) Initialized() <-chan struct{} {
7979
return n.initCh
8080
}
8181

82-
// HandleNewBlockEarlyLocked is guarded by CrossNode.
83-
func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) {
84-
// Nothing to do here.
85-
}
86-
8782
// HandleNewBlockLocked is guarded by CrossNode.
8883
func (n *Node) HandleNewBlockLocked(*runtime.BlockInfo) {
8984
// Nothing to do here.

go/worker/common/committee/node.go

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ var (
128128
// NodeHooks defines a worker's duties at common events.
129129
// These are called from the runtime's common node's worker.
130130
type NodeHooks interface {
131-
// Guarded by CrossNode.
132-
HandleNewBlockEarlyLocked(*runtime.BlockInfo)
133131
// Guarded by CrossNode.
134132
HandleNewBlockLocked(*runtime.BlockInfo)
135133
// Guarded by CrossNode.
@@ -185,12 +183,11 @@ type Node struct {
185183

186184
// Mutable and shared between nodes' workers.
187185
// Guarded by .CrossNode.
188-
CrossNode sync.Mutex
189-
CurrentBlock *block.Block
190-
CurrentBlockHeight int64
191-
CurrentConsensusBlock *consensus.LightBlock
192-
CurrentDescriptor *registry.Runtime
193-
CurrentEpoch beacon.EpochTime
186+
CrossNode sync.Mutex
187+
CurrentBlock *block.Block
188+
CurrentBlockHeight int64
189+
CurrentDescriptor *registry.Runtime
190+
CurrentEpoch beacon.EpochTime
194191

195192
logger *logging.Logger
196193
}
@@ -413,6 +410,13 @@ func (n *Node) handleSuspendLocked(int64) {
413410
}
414411
}
415412

413+
func (n *Node) updateHostedRuntimeVersion() {
414+
n.CrossNode.Lock()
415+
defer n.CrossNode.Unlock()
416+
417+
n.updateHostedRuntimeVersionLocked()
418+
}
419+
416420
func (n *Node) updateHostedRuntimeVersionLocked() {
417421
if n.CurrentDescriptor == nil {
418422
return
@@ -465,8 +469,6 @@ func (n *Node) handleNewBlock(blk *block.Block, height int64) {
465469

466470
processedBlockCount.With(n.getMetricLabels()).Inc()
467471

468-
header := blk.Header
469-
470472
// The first received block will be treated an epoch transition (if valid).
471473
// This will refresh the committee on the first block,
472474
// instead of waiting for the next epoch transition to occur.
@@ -487,10 +489,9 @@ func (n *Node) handleNewBlock(blk *block.Block, height int64) {
487489
// Update the current block.
488490
n.CurrentBlock = blk
489491
n.CurrentBlockHeight = height
490-
n.CurrentConsensusBlock = consensusBlk
491492

492493
// Update active descriptor on epoch transitions.
493-
if firstBlockReceived || header.HeaderType == block.EpochTransition || header.HeaderType == block.Suspended {
494+
if firstBlockReceived || blk.Header.HeaderType == block.EpochTransition || blk.Header.HeaderType == block.Suspended {
494495
var rs *roothash.RuntimeState
495496
rs, err = n.Consensus.RootHash().GetRuntimeState(n.ctx, &roothash.RuntimeRequest{
496497
RuntimeID: n.Runtime.ID(),
@@ -524,19 +525,8 @@ func (n *Node) handleNewBlock(blk *block.Block, height int64) {
524525
n.KeyManagerClient.SetKeyManagerID(n.CurrentDescriptor.KeyManager)
525526
}
526527

527-
bi := &runtime.BlockInfo{
528-
RuntimeBlock: n.CurrentBlock,
529-
ConsensusBlock: n.CurrentConsensusBlock,
530-
Epoch: n.CurrentEpoch,
531-
ActiveDescriptor: n.CurrentDescriptor,
532-
}
533-
534-
for _, hooks := range n.hooks {
535-
hooks.HandleNewBlockEarlyLocked(bi)
536-
}
537-
538528
// Perform actions based on block type.
539-
switch header.HeaderType {
529+
switch blk.Header.HeaderType {
540530
case block.Normal:
541531
if firstBlockReceived {
542532
n.logger.Warn("forcing an epoch transition on first received block")
@@ -559,11 +549,18 @@ func (n *Node) handleNewBlock(blk *block.Block, height int64) {
559549
n.handleSuspendLocked(height)
560550
default:
561551
n.logger.Error("invalid block type",
562-
"block", bi.RuntimeBlock,
552+
"block", blk,
563553
)
564554
return
565555
}
566556

557+
bi := &runtime.BlockInfo{
558+
RuntimeBlock: n.CurrentBlock,
559+
ConsensusBlock: consensusBlk,
560+
Epoch: n.CurrentEpoch,
561+
ActiveDescriptor: n.CurrentDescriptor,
562+
}
563+
567564
n.TxPool.ProcessBlock(bi)
568565

569566
// Fetch incoming messages.
@@ -729,9 +726,7 @@ func (n *Node) worker() {
729726

730727
// Perform initial hosted runtime version update to ensure we have something even in cases where
731728
// initial block processing fails for any reason.
732-
n.CrossNode.Lock()
733-
n.updateHostedRuntimeVersionLocked()
734-
n.CrossNode.Unlock()
729+
n.updateHostedRuntimeVersion()
735730

736731
// Start the runtime.
737732
hrt := n.GetHostedRuntime()
@@ -779,11 +774,7 @@ func (n *Node) worker() {
779774
return
780775
}
781776

782-
func() {
783-
n.CrossNode.Lock()
784-
defer n.CrossNode.Unlock()
785-
n.updateHostedRuntimeVersionLocked()
786-
}()
777+
n.updateHostedRuntimeVersion()
787778
case compNotify.Removed != nil:
788779
// Received removal of a component.
789780
if err := n.RemoveHostedRuntimeComponent(*compNotify.Removed); err != nil {

go/worker/compute/executor/committee/hooks.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,19 @@
11
package committee
22

33
import (
4-
"github.com/oasisprotocol/oasis-core/go/common/crash"
54
runtime "github.com/oasisprotocol/oasis-core/go/runtime/api"
65
"github.com/oasisprotocol/oasis-core/go/worker/common/committee"
76
)
87

98
// Ensure Node implements NodeHooks.
109
var _ committee.NodeHooks = (*Node)(nil)
1110

12-
// HandleNewBlockEarlyLocked implements NodeHooks.
11+
// HandleNewBlockLocked implements NodeHooks.
1312
// Guarded by n.commonNode.CrossNode.
14-
func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) {
15-
crash.Here(crashPointRoothashReceiveAfter)
16-
13+
func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) {
1714
// Update our availability.
1815
n.nudgeAvailabilityLocked(false)
19-
}
2016

21-
// HandleNewBlockLocked implements NodeHooks.
22-
// Guarded by n.commonNode.CrossNode.
23-
func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) {
2417
// Drop blocks if the worker falls behind.
2518
select {
2619
case <-n.blockInfoCh:

go/worker/compute/executor/committee/init.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ const (
1111
crashPointBatchProposeBefore = "worker.executor.batch.propose.before"
1212
crashPointBatchProposeAfter = "worker.executor.batch.propose.after"
1313
crashPointDiscrepancyDetectedAfter = "worker.executor.batch.discrepancy_detected.after"
14-
crashPointRoothashReceiveAfter = "worker.executor.batch.roothash.receive.after"
1514
crashPointBatchPublishAfter = "worker.executor.batch.schedule.publish.after"
1615
)
1716

@@ -23,7 +22,6 @@ func init() {
2322
crashPointBatchProposeBefore,
2423
crashPointBatchProposeAfter,
2524
crashPointDiscrepancyDetectedAfter,
26-
crashPointRoothashReceiveAfter,
2725
crashPointBatchPublishAfter,
2826
)
2927
}

go/worker/compute/executor/committee/node.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,9 +1568,6 @@ func NewNode(
15681568
logger: logging.GetLogger("worker/executor/committee").With("runtime_id", commonNode.Runtime.ID()),
15691569
}
15701570

1571-
// Register prune handler.
1572-
commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{commonNode: commonNode})
1573-
15741571
// Register committee message handler.
15751572
commonNode.P2P.RegisterHandler(committeeTopic, &committeeMsgHandler{n})
15761573

go/worker/compute/executor/committee/prune.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,36 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/oasisprotocol/oasis-core/go/common"
8+
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
79
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
8-
"github.com/oasisprotocol/oasis-core/go/worker/common/committee"
910
)
1011

11-
// pruneHandler is a prune handler that prevents pruning of the last normal round.
12-
type pruneHandler struct {
13-
commonNode *committee.Node
12+
// PruneHandler is a prune handler that prevents pruning of the last normal round.
13+
type PruneHandler struct {
14+
runtimeID common.Namespace
15+
consensus consensus.Service
1416
}
1517

16-
func (p *pruneHandler) Prune(rounds []uint64) error {
17-
p.commonNode.CrossNode.Lock()
18-
height := p.commonNode.CurrentBlockHeight
19-
p.commonNode.CrossNode.Unlock()
18+
// NewPruneHandler creates a new prune handler.
19+
func NewPruneHandler(runtimeID common.Namespace, consensus consensus.Service) *PruneHandler {
20+
return &PruneHandler{
21+
runtimeID: runtimeID,
22+
consensus: consensus,
23+
}
24+
}
2025

26+
// Prune verifies that no rounds beyond the last normal round are pruned.
27+
func (p *PruneHandler) Prune(rounds []uint64) error {
2128
// Make sure we never prune past the last normal round, as some runtimes will do historic queries
2229
// for things that are not available in the last consensus state (e.g. delegation/undelegation
2330
// events that happened while the runtime was suspended or not producing blocks).
24-
state, err := p.commonNode.Consensus.RootHash().GetRuntimeState(context.Background(), &roothash.RuntimeRequest{
25-
RuntimeID: p.commonNode.Runtime.ID(),
26-
Height: height,
31+
state, err := p.consensus.RootHash().GetRuntimeState(context.TODO(), &roothash.RuntimeRequest{
32+
RuntimeID: p.runtimeID,
33+
Height: consensus.HeightLatest,
2734
})
2835
if err != nil {
29-
return fmt.Errorf("worker/executor: failed to fetch runtime state at %d: %w", height, err)
36+
return fmt.Errorf("worker/executor: failed to fetch runtime state: %w", err)
3037
}
3138

3239
for _, round := range rounds {

go/worker/compute/executor/worker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
155155
return err
156156
}
157157

158+
// Register prune handler.
159+
pruner := committee.NewPruneHandler(commonNode.Runtime.ID(), commonNode.Consensus)
160+
commonNode.Runtime.History().Pruner().RegisterHandler(pruner)
161+
158162
commonNode.AddHooks(node)
159163
w.runtimes[id] = node
160164

go/worker/storage/committee/worker.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -330,11 +330,6 @@ func (w *Worker) GetLocalStorage() storageApi.LocalBackend {
330330

331331
// NodeHooks implementation.
332332

333-
// HandleNewBlockEarlyLocked is guarded by CrossNode.
334-
func (w *Worker) HandleNewBlockEarlyLocked(*runtime.BlockInfo) {
335-
// Nothing to do here.
336-
}
337-
338333
// HandleNewBlockLocked is guarded by CrossNode.
339334
func (w *Worker) HandleNewBlockLocked(bi *runtime.BlockInfo) {
340335
// Notify the state syncer that there is a new block.

0 commit comments

Comments
 (0)