diff --git a/.changelog/6425.internal.md b/.changelog/6425.internal.md new file mode 100644 index 00000000000..a5b1adcfb4f --- /dev/null +++ b/.changelog/6425.internal.md @@ -0,0 +1,9 @@ +go/worker/common: Replace epoch transitions with committee transitions + +The following metric has been removed: + +- `oasis_worker_epoch_transition_count` + +The following metric has been added: + +- `oasis_worker_committee_transition_count` diff --git a/docs/oasis-node/metrics.md b/docs/oasis-node/metrics.md index 0d3a893f7c2..4f4fc6f39a5 100644 --- a/docs/oasis-node/metrics.md +++ b/docs/oasis-node/metrics.md @@ -102,16 +102,16 @@ oasis_worker_batch_runtime_processing_time | Summary | Time it takes for a batch oasis_worker_batch_size | Summary | Number of transactions in a batch. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go) oasis_worker_client_lb_healthy_instance_count | Gauge | Number of healthy instances in the load balancer. | runtime | [runtime/host/loadbalance](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/host/loadbalance/metrics.go) oasis_worker_client_lb_requests | Counter | Number of requests processed by the given load balancer instance. | runtime, lb_instance | [runtime/host/loadbalance](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/host/loadbalance/metrics.go) -oasis_worker_epoch_number | Gauge | Current epoch number as seen by the worker. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) -oasis_worker_epoch_transition_count | Counter | Number of epoch transitions. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) +oasis_worker_committee_transition_count | Counter | Number of committee transitions. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) +oasis_worker_epoch_number | Gauge | Current epoch number as seen by the worker. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) oasis_worker_execution_discrepancy_detected_count | Counter | Number of detected execute discrepancies. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go) -oasis_worker_executor_committee_p2p_peers | Gauge | Number of executor committee P2P peers. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) -oasis_worker_executor_is_backup_worker | Gauge | 1 if worker is currently an executor backup worker, 0 otherwise. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) -oasis_worker_executor_is_worker | Gauge | 1 if worker is currently an executor worker, 0 otherwise. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) -oasis_worker_executor_liveness_live_ratio | Gauge | Ratio between live and total rounds. Reports 1 if node is not in committee. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) -oasis_worker_executor_liveness_live_rounds | Gauge | Number of live rounds in last epoch. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) -oasis_worker_executor_liveness_total_rounds | Gauge | Number of total rounds in last epoch. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) -oasis_worker_failed_round_count | Counter | Number of failed roothash rounds. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) +oasis_worker_executor_committee_p2p_peers | Gauge | Number of executor committee P2P peers. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) +oasis_worker_executor_is_backup_worker | Gauge | 1 if worker is currently an executor backup worker, 0 otherwise. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) +oasis_worker_executor_is_worker | Gauge | 1 if worker is currently an executor worker, 0 otherwise. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) +oasis_worker_executor_liveness_live_ratio | Gauge | Ratio between live and total rounds. Reports 1 if node is not in committee. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) +oasis_worker_executor_liveness_live_rounds | Gauge | Number of live rounds in last epoch. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) +oasis_worker_executor_liveness_total_rounds | Gauge | Number of total rounds in last epoch. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) +oasis_worker_failed_round_count | Counter | Number of failed roothash rounds. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) oasis_worker_keymanager_churp_committee_size | Gauge | Number of nodes in the committee | runtime, churp | [worker/keymanager](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/keymanager/metrics.go) oasis_worker_keymanager_churp_confirmed_applications_total | Gauge | Number of confirmed applications | runtime, churp | [worker/keymanager](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/keymanager/metrics.go) oasis_worker_keymanager_churp_enclave_rpc_failures_total | Counter | Number of failed enclave rpc calls. | runtime, churp, method | [worker/keymanager](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/keymanager/metrics.go) @@ -142,7 +142,7 @@ oasis_worker_node_registration_eligible | Gauge | Is oasis node eligible for reg oasis_worker_node_status_frozen | Gauge | Is oasis node frozen (binary). | | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go) oasis_worker_node_status_runtime_faults | Gauge | Number of runtime faults. | runtime | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go) oasis_worker_node_status_runtime_suspended | Gauge | Runtime node suspension status (binary). | runtime | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go) -oasis_worker_processed_block_count | Counter | Number of processed roothash blocks. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go) +oasis_worker_processed_block_count | Counter | Number of processed roothash blocks. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go) oasis_worker_processed_event_count | Counter | Number of processed roothash events. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go) oasis_worker_storage_commit_latency | Summary | Latency of storage commit calls (state + outputs) (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go) oasis_worker_storage_full_round | Gauge | The last round that was fully synced and finalized. | runtime | [worker/storage/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/storage/committee/metrics.go) diff --git a/go/runtime/api/info.go b/go/runtime/api/info.go index 8c06857d48b..9bf2834f63c 100644 --- a/go/runtime/api/info.go +++ b/go/runtime/api/info.go @@ -21,7 +21,14 @@ type BlockInfo struct { // Epoch is the epoch the runtime block belongs to. Epoch beacon.EpochTime +} + +// DispatchInfo provides the context for checking, executing, or scheduling +// a batch of transactions. +type DispatchInfo struct { + // BlockInfo holds information about the latest runtime block. + BlockInfo *BlockInfo - // ActiveDescriptor is the runtime descriptor active for the runtime block. + // ActiveDescriptor is the runtime descriptor currently in use for dispatch. ActiveDescriptor *registry.Runtime } diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index f65e5aeb456..fab54384dd9 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -14,7 +14,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/pubsub" - "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/roothash/api/message" runtime "github.com/oasisprotocol/oasis-core/go/runtime/api" "github.com/oasisprotocol/oasis-core/go/runtime/history" @@ -107,8 +106,11 @@ type TransactionPool interface { // and only the following transactions will be returned. GetSchedulingExtra(offset *hash.Hash, limit int) []*TxQueueMeta - // ProcessBlock updates the last known runtime block information. - ProcessBlock(bi *runtime.BlockInfo) + // RecheckTxs triggers a recheck of all transactions. + RecheckTxs() + + // ProcessDispatchInfo updates the last known runtime dispatch information. + ProcessDispatchInfo(di *runtime.DispatchInfo) // ProcessIncomingMessages loads transactions from incoming messages into the pool. ProcessIncomingMessages(inMsgs []*message.IncomingMessage) @@ -158,10 +160,10 @@ type txPool struct { proposedTxsLock sync.Mutex proposedTxs map[hash.Hash]*TxQueueMeta - blockInfoLock sync.Mutex - blockInfo *runtime.BlockInfo - lastBlockProcessed time.Time - lastRecheckRound uint64 + dispatchInfoLock sync.Mutex + dispatchInfo *runtime.DispatchInfo + lastDispatchInfoProcessed time.Time + lastRecheckRound uint64 republishCh *channels.RingChannel } @@ -385,24 +387,39 @@ HASH_LOOP: return txs, missingTxs } -func (t *txPool) ProcessBlock(bi *runtime.BlockInfo) { - t.blockInfoLock.Lock() - defer t.blockInfoLock.Unlock() +func (t *txPool) ProcessDispatchInfo(di *runtime.DispatchInfo) { + t.dispatchInfoLock.Lock() + defer t.dispatchInfoLock.Unlock() - if t.blockInfo == nil { + if t.dispatchInfo == nil { close(t.initCh) } - t.blockInfo = bi - t.lastBlockProcessed = time.Now() + t.dispatchInfo = di + t.lastDispatchInfoProcessed = time.Now() + + roundDifference := di.BlockInfo.RuntimeBlock.Header.Round - t.lastRecheckRound + if roundDifference > t.cfg.RecheckInterval { + t.recheckTxsLocked() + } +} + +func (t *txPool) RecheckTxs() { + t.dispatchInfoLock.Lock() + defer t.dispatchInfoLock.Unlock() + + t.recheckTxsLocked() +} - // Force transaction rechecks on epoch transitions and if needed. - isEpochTransition := bi.RuntimeBlock.Header.HeaderType == block.EpochTransition - roundDifference := bi.RuntimeBlock.Header.Round - t.lastRecheckRound - if isEpochTransition || roundDifference > t.cfg.RecheckInterval { - t.recheckTxCh.In() <- struct{}{} - t.lastRecheckRound = bi.RuntimeBlock.Header.Round +func (t *txPool) recheckTxsLocked() { + select { + case <-t.initCh: + default: + return } + + t.recheckTxCh.In() <- struct{}{} + t.lastRecheckRound = t.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round } func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) { @@ -427,14 +444,14 @@ func (t *txPool) All() [][]byte { return txs } -func (t *txPool) getCurrentBlockInfo() (*runtime.BlockInfo, time.Time, error) { - t.blockInfoLock.Lock() - defer t.blockInfoLock.Unlock() +func (t *txPool) getCurrentDispatchInfo() (*runtime.DispatchInfo, time.Time, error) { + t.dispatchInfoLock.Lock() + defer t.dispatchInfoLock.Unlock() - if t.blockInfo == nil { - return nil, time.Time{}, fmt.Errorf("no current block available") + if t.dispatchInfo == nil { + return nil, time.Time{}, fmt.Errorf("no current dispatch info available") } - return t.blockInfo, t.lastBlockProcessed, nil + return t.dispatchInfo, t.lastDispatchInfoProcessed, nil } // checkTxBatch requests the runtime to check the validity of a transaction batch. @@ -445,22 +462,22 @@ func (t *txPool) checkTxBatch(ctx context.Context) error { return fmt.Errorf("runtime is not available") } - // Get the current block info. - bi, lastBlockProcessed, err := t.getCurrentBlockInfo() + // Get the current dispatch info. + di, lastDispatchInfoProcessed, err := t.getCurrentDispatchInfo() if err != nil { - return fmt.Errorf("failed to get current block info: %w", err) + return fmt.Errorf("failed to get current dispatch info: %w", err) } // Ensure block round is synced to storage. waitSyncCtx, cancelWaitSyncCtx := context.WithTimeout(ctx, checkTxWaitRoundSyncedTimeout) defer cancelWaitSyncCtx() - t.logger.Debug("ensuring block round is synced", "round", bi.RuntimeBlock.Header.Round) - if _, err = t.history.WaitRoundSynced(waitSyncCtx, bi.RuntimeBlock.Header.Round); err != nil { + t.logger.Debug("ensuring block round is synced", "round", di.BlockInfo.RuntimeBlock.Header.Round) + if _, err = t.history.WaitRoundSynced(waitSyncCtx, di.BlockInfo.RuntimeBlock.Header.Round); err != nil { // Block round isn't synced yet, so make sure the batch check is // retried later to avoid aborting the runtime, as it is not its fault. t.logger.Info("block round is not synced yet, retrying transaction batch check later", - "round", bi.RuntimeBlock.Header.Round, + "round", di.BlockInfo.RuntimeBlock.Header.Round, "err", err, ) t.checkTxCh.In() <- struct{}{} @@ -482,7 +499,7 @@ func (t *txPool) checkTxBatch(ctx context.Context) error { for _, pct := range batch { rawTxBatch = append(rawTxBatch, pct.Raw()) } - return t.runtime.CheckTx(checkCtx, bi.RuntimeBlock, bi.ConsensusBlock, bi.Epoch, bi.ActiveDescriptor.Executor.MaxMessages, rawTxBatch) + return t.runtime.CheckTx(checkCtx, di.BlockInfo.RuntimeBlock, di.BlockInfo.ConsensusBlock, di.BlockInfo.Epoch, di.ActiveDescriptor.Executor.MaxMessages, rawTxBatch) }() switch { case err == nil: @@ -625,7 +642,7 @@ func (t *txPool) checkTxBatch(ctx context.Context) error { // Kick off publishing for any new txs after waiting for block publish delay based on when // we received the block that we just used to check the transaction batch. go func() { - time.Sleep(time.Until(lastBlockProcessed.Add(newBlockPublishDelay))) + time.Sleep(time.Until(lastDispatchInfoProcessed.Add(newBlockPublishDelay))) t.republishCh.In() <- struct{}{} }() diff --git a/go/worker/client/committee/node.go b/go/worker/client/committee/node.go index ecd8a7bf8a6..5b819aaacbb 100644 --- a/go/worker/client/committee/node.go +++ b/go/worker/client/committee/node.go @@ -12,6 +12,8 @@ import ( cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/logging" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" runtime "github.com/oasisprotocol/oasis-core/go/runtime/api" "github.com/oasisprotocol/oasis-core/go/runtime/bundle/component" @@ -79,13 +81,13 @@ func (n *Node) Initialized() <-chan struct{} { return n.initCh } -// HandleNewBlockLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockLocked(*runtime.BlockInfo) { +// HandleNewDispatchInfo implements NodeHooks. +func (n *Node) HandleNewDispatchInfo(*runtime.DispatchInfo) { // Nothing to do here. } -// HandleRuntimeHostEventLocked is guarded by CrossNode. -func (n *Node) HandleRuntimeHostEventLocked(ev *host.Event) { +// HandleRuntimeHostEvent implements NodeHooks. +func (n *Node) HandleRuntimeHostEvent(ev *host.Event) { if n.roleProvider == nil { return } @@ -169,15 +171,14 @@ func (n *Node) CheckTx(ctx context.Context, tx []byte) (*protocol.CheckTxResult, func (n *Node) Query(ctx context.Context, round uint64, method string, args []byte, comp *component.ID) ([]byte, error) { hrt := n.commonNode.GetHostedRuntime() - // Fetch the active descriptor so we can get the current message limits. - n.commonNode.CrossNode.Lock() - dsc := n.commonNode.CurrentDescriptor - n.commonNode.CrossNode.Unlock() - - if dsc == nil { - return nil, api.ErrNoHostedRuntime + rs, err := n.commonNode.Consensus.RootHash().GetRuntimeState(ctx, &roothash.RuntimeRequest{ + RuntimeID: n.commonNode.Runtime.ID(), + Height: consensus.HeightLatest, + }) + if err != nil { + return nil, fmt.Errorf("client: failed to get runtime state: %w", err) } - maxMessages := dsc.Executor.MaxMessages + maxMessages := rs.Runtime.Executor.MaxMessages annBlk, err := n.commonNode.Runtime.History().GetAnnotatedBlock(ctx, round) if err != nil { diff --git a/go/worker/client/worker.go b/go/worker/client/worker.go index 4aaae919090..85ce209d6d4 100644 --- a/go/worker/client/worker.go +++ b/go/worker/client/worker.go @@ -148,7 +148,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { // leave it like that before initialization starts, so the registration // is blocked until the runtime finishes initializing. // The availability of the role provider is changed in - // HandleRuntimeHostEventLocked in worker/client/committee/node.go. + // HandleRuntimeHostEvent in worker/client/committee/node.go. default: } diff --git a/go/worker/common/committee/group.go b/go/worker/common/committee/group.go index 890e47c78bd..9cf2d284241 100644 --- a/go/worker/common/committee/group.go +++ b/go/worker/common/committee/group.go @@ -140,11 +140,13 @@ func (g *Group) Suspend() { g.committee = nil } -// EpochTransition processes an epoch transition that just happened. -func (g *Group) EpochTransition(ctx context.Context, committee *scheduler.Committee) error { +// CommitteeTransition processes a committee transition that just happened. +func (g *Group) CommitteeTransition(ctx context.Context, committee *scheduler.Committee) error { g.Lock() defer g.Unlock() + g.logger.Info("committee transition") + // Invalidate current committee. In case we cannot process this transition, // this should cause the node to transition into NotReady and stay there // until the next epoch transition. @@ -203,7 +205,7 @@ func (g *Group) EpochTransition(ctx context.Context, committee *scheduler.Commit nodes: g.nodes, } - g.logger.Info("epoch transition complete", + g.logger.Info("committee transition complete", "epoch", epochNumber, "executor_roles", g.committee.Roles, ) diff --git a/go/worker/common/committee/metrics.go b/go/worker/common/committee/metrics.go new file mode 100644 index 00000000000..8c1a3fc5066 --- /dev/null +++ b/go/worker/common/committee/metrics.go @@ -0,0 +1,177 @@ +package committee + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" +) + +const periodicMetricsInterval = time.Minute + +var ( + processedBlockCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "oasis_worker_processed_block_count", + Help: "Number of processed roothash blocks.", + }, + []string{"runtime"}, + ) + failedRoundCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "oasis_worker_failed_round_count", + Help: "Number of failed roothash rounds.", + }, + []string{"runtime"}, + ) + committeeTransitionCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "oasis_worker_committee_transition_count", + Help: "Number of committee transitions.", + }, + []string{"runtime"}, + ) + epochNumber = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_worker_epoch_number", + Help: "Current epoch number as seen by the worker.", + }, + []string{"runtime"}, + ) + workerIsExecutorWorker = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_worker_executor_is_worker", + Help: "1 if worker is currently an executor worker, 0 otherwise.", + }, + []string{"runtime"}, + ) + workerIsExecutorBackup = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_worker_executor_is_backup_worker", + Help: "1 if worker is currently an executor backup worker, 0 otherwise.", + }, + []string{"runtime"}, + ) + executorCommitteeP2PPeers = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_worker_executor_committee_p2p_peers", + Help: "Number of executor committee P2P peers.", + }, + []string{"runtime"}, + ) + livenessTotalRounds = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_worker_executor_liveness_total_rounds", + Help: "Number of total rounds in last epoch.", + }, + []string{"runtime"}, + ) + livenessLiveRounds = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_worker_executor_liveness_live_rounds", + Help: "Number of live rounds in last epoch.", + }, + []string{"runtime"}, + ) + livenessRatio = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_worker_executor_liveness_live_ratio", + Help: "Ratio between live and total rounds. Reports 1 if node is not in committee.", + }, + []string{"runtime"}, + ) + + nodeCollectors = []prometheus.Collector{ + processedBlockCount, + failedRoundCount, + committeeTransitionCount, + epochNumber, + // Periodically collected metrics. + workerIsExecutorWorker, + workerIsExecutorBackup, + executorCommitteeP2PPeers, + livenessTotalRounds, + livenessLiveRounds, + livenessRatio, + } + + metricsOnce sync.Once +) + +func (n *Node) metricsWorker() { + n.logger.Info("delaying metrics worker start until worker is initialized") + select { + case <-n.stopCh: + return + case <-n.initCh: + } + + n.logger.Debug("starting metrics worker") + + t := time.NewTicker(periodicMetricsInterval) + defer t.Stop() + + for { + select { + case <-n.stopCh: + return + case <-t.C: + } + + n.updatePeriodicMetrics() + } +} + +func (n *Node) updatePeriodicMetrics() { + boolToMetricVal := func(b bool) float64 { + if b { + return 1.0 + } + return 0.0 + } + + labels := n.getMetricLabels() + + n.logger.Debug("updating periodic worker node metrics") + + committeeInfo, ok := n.Group.CommitteeInfo() + if !ok { + return + } + + executorCommitteeP2PPeers.With(labels).Set(float64(len(n.P2P.Peers(n.Runtime.ID())))) + workerIsExecutorWorker.With(labels).Set(boolToMetricVal(committeeInfo.IsWorker())) + workerIsExecutorBackup.With(labels).Set(boolToMetricVal(committeeInfo.IsBackupWorker())) + + if !committeeInfo.IsMember() { + // Default to 1 if node is not in committee. + livenessRatio.With(labels).Set(1.0) + return + } + + rs, err := n.Consensus.RootHash().GetRuntimeState(n.ctx, &roothash.RuntimeRequest{ + RuntimeID: n.Runtime.ID(), + Height: consensus.HeightLatest, + }) + if err != nil || rs.LivenessStatistics == nil { + return + } + + totalRounds := rs.LivenessStatistics.TotalRounds + var liveRounds uint64 + for _, index := range committeeInfo.Indices { + liveRounds += rs.LivenessStatistics.LiveRounds[index] + } + livenessTotalRounds.With(labels).Set(float64(totalRounds)) + livenessLiveRounds.With(labels).Set(float64(liveRounds)) + livenessRatio.With(labels).Set(float64(liveRounds) / float64(totalRounds)) +} + +func (n *Node) getMetricLabels() prometheus.Labels { + return prometheus.Labels{ + "runtime": n.Runtime.ID().String(), + } +} diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index b946490d98b..f165b9334a6 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -5,7 +5,6 @@ import ( "math" "sync" "sync/atomic" - "time" "github.com/prometheus/client_golang/prometheus" @@ -34,104 +33,14 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/txsync" ) -const periodicMetricsInterval = 60 * time.Second - -var ( - processedBlockCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "oasis_worker_processed_block_count", - Help: "Number of processed roothash blocks.", - }, - []string{"runtime"}, - ) - failedRoundCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "oasis_worker_failed_round_count", - Help: "Number of failed roothash rounds.", - }, - []string{"runtime"}, - ) - epochTransitionCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "oasis_worker_epoch_transition_count", - Help: "Number of epoch transitions.", - }, - []string{"runtime"}, - ) - epochNumber = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "oasis_worker_epoch_number", - Help: "Current epoch number as seen by the worker.", - }, - []string{"runtime"}, - ) - workerIsExecutorWorker = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "oasis_worker_executor_is_worker", - Help: "1 if worker is currently an executor worker, 0 otherwise.", - }, - []string{"runtime"}, - ) - workerIsExecutorBackup = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "oasis_worker_executor_is_backup_worker", - Help: "1 if worker is currently an executor backup worker, 0 otherwise.", - }, - []string{"runtime"}, - ) - executorCommitteeP2PPeers = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "oasis_worker_executor_committee_p2p_peers", - Help: "Number of executor committee P2P peers.", - }, - []string{"runtime"}, - ) - livenessTotalRounds = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "oasis_worker_executor_liveness_total_rounds", - Help: "Number of total rounds in last epoch.", - }, - []string{"runtime"}, - ) - livenessLiveRounds = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "oasis_worker_executor_liveness_live_rounds", - Help: "Number of live rounds in last epoch.", - }, - []string{"runtime"}, - ) - livenessRatio = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "oasis_worker_executor_liveness_live_ratio", - Help: "Ratio between live and total rounds. Reports 1 if node is not in committee.", - }, - []string{"runtime"}, - ) - - nodeCollectors = []prometheus.Collector{ - processedBlockCount, - failedRoundCount, - epochTransitionCount, - epochNumber, - // Periodically collected metrics. - workerIsExecutorWorker, - workerIsExecutorBackup, - executorCommitteeP2PPeers, - livenessTotalRounds, - livenessLiveRounds, - livenessRatio, - } - - metricsOnce sync.Once -) - // NodeHooks defines a worker's duties at common events. // These are called from the runtime's common node's worker. type NodeHooks interface { - // Guarded by CrossNode. - HandleNewBlockLocked(*runtime.BlockInfo) - // Guarded by CrossNode. - HandleRuntimeHostEventLocked(*host.Event) + // HandleNewDispatchInfo handles the latest block information and the + // active runtime descriptor for transaction dispatch. + HandleNewDispatchInfo(*runtime.DispatchInfo) + // HandleRuntimeHostEvent handles new runtime host event. + HandleRuntimeHostEvent(*host.Event) // Initialized returns a channel that will be closed when the worker is initialized and ready // to service requests. @@ -181,13 +90,14 @@ type Node struct { workersInitialized uint32 runtimeSuspended uint32 - // Mutable and shared between nodes' workers. - // Guarded by .CrossNode. - CrossNode sync.Mutex - CurrentBlockRound uint64 - CurrentBlockHeight int64 - CurrentDescriptor *registry.Runtime - CurrentEpoch beacon.EpochTime + mu sync.Mutex + latestRound uint64 + latestHeight int64 + + committeeRound uint64 + lastBlockInfo *runtime.BlockInfo + dispatchInfoCh chan struct{} + activeDescriptor *registry.Runtime logger *logging.Logger } @@ -265,12 +175,14 @@ func (n *Node) AddHooks(hooks NodeHooks) { // GetStatus returns the common committee node status. func (n *Node) GetStatus() (*api.Status, error) { + n.mu.Lock() status := api.Status{ Status: n.getStatusState(), - LatestRound: n.CurrentBlockRound, - LatestHeight: n.CurrentBlockHeight, + LatestRound: n.latestRound, + LatestHeight: n.latestHeight, SchedulerRank: math.MaxUint64, } + n.mu.Unlock() switch activeVersion, err := n.GetHostedRuntime().GetActiveVersion(); err { case nil: @@ -313,22 +225,14 @@ func (n *Node) GetStatus() (*api.Status, error) { return &status, nil } -func (n *Node) getMetricLabels() prometheus.Labels { - return prometheus.Labels{ - "runtime": n.Runtime.ID().String(), - } -} - -func (n *Node) handleEpochTransition(committee *scheduler.Committee) { - n.logger.Info("epoch transition has occurred") - - if err := n.Group.EpochTransition(n.ctx, committee); err != nil { - n.logger.Error("unable to handle epoch transition", +func (n *Node) handleCommitteeTransition(committee *scheduler.Committee) { + if err := n.Group.CommitteeTransition(n.ctx, committee); err != nil { + n.logger.Error("unable to handle committee transition", "err", err, ) } - epochTransitionCount.With(n.getMetricLabels()).Inc() + committeeTransitionCount.With(n.getMetricLabels()).Inc() epochNumber.With(n.getMetricLabels()).Set(float64(committee.ValidFor)) } @@ -378,110 +282,7 @@ func (n *Node) updateHostedRuntimeVersion(rt *registry.Runtime) { } } -func (n *Node) handleNewBlock(blk *block.Block, height int64) { - n.CrossNode.Lock() - defer n.CrossNode.Unlock() - - processedBlockCount.With(n.getMetricLabels()).Inc() - - // The first received block will be treated an epoch transition (if valid). - // This will refresh the committee on the first block, - // instead of waiting for the next epoch transition to occur. - // Helps in cases where node is restarted mid epoch. - firstBlockReceived := n.CurrentBlockHeight == 0 - - // Update the current block. - n.CurrentBlockRound = blk.Header.Round - n.CurrentBlockHeight = height - - // Update active descriptor on epoch transitions. - if firstBlockReceived || blk.Header.HeaderType == block.EpochTransition || blk.Header.HeaderType == block.Suspended { - rs, err := n.Consensus.RootHash().GetRuntimeState(n.ctx, &roothash.RuntimeRequest{ - RuntimeID: n.Runtime.ID(), - Height: height, - }) - if err != nil { - n.logger.Error("failed to query runtime state", - "err", err, - ) - return - } - n.CurrentDescriptor = rs.Runtime - - n.CurrentEpoch, err = n.Consensus.Beacon().GetEpoch(n.ctx, height) - if err != nil { - n.logger.Error("failed to fetch current epoch", - "err", err, - ) - return - } - - n.updateHostedRuntimeVersion(rs.Runtime) - - // Make sure to update the key manager if needed. - n.KeyManagerClient.SetKeyManagerID(n.CurrentDescriptor.KeyManager) - - switch rs.Suspended { - case true: - n.handleSuspend() - atomic.StoreUint32(&n.runtimeSuspended, 1) - case false: - n.handleEpochTransition(rs.Committee) - atomic.StoreUint32(&n.runtimeSuspended, 0) - } - } - - // Count failed rounds. - if blk.Header.HeaderType == block.RoundFailed && !firstBlockReceived { - n.logger.Warn("round has failed") - failedRoundCount.With(n.getMetricLabels()).Inc() - } - - // Fetch light consensus block. - consensusBlk, err := n.Consensus.Core().GetLightBlock(n.ctx, height) - if err != nil { - n.logger.Error("failed to query light block", - "err", err, - "height", height, - "round", blk.Header.Round, - ) - return - } - - // Fetch incoming messages. - inMsgs, err := n.Consensus.RootHash().GetIncomingMessageQueue(n.ctx, &roothash.InMessageQueueRequest{ - RuntimeID: n.Runtime.ID(), - Height: height, - }) - if err != nil { - n.logger.Error("failed to query incoming messages", - "err", err, - "height", height, - "round", blk.Header.Round, - ) - return - } - - bi := &runtime.BlockInfo{ - RuntimeBlock: blk, - ConsensusBlock: consensusBlk, - IncomingMessages: inMsgs, - Epoch: n.CurrentEpoch, - ActiveDescriptor: n.CurrentDescriptor, - } - - n.TxPool.ProcessBlock(bi) - n.TxPool.ProcessIncomingMessages(inMsgs) - - for _, hooks := range n.hooks { - hooks.HandleNewBlockLocked(bi) - } -} - func (n *Node) handleRuntimeHostEvent(ev *host.Event) { - n.CrossNode.Lock() - defer n.CrossNode.Unlock() - n.logger.Debug("got runtime event", "ev", ev) switch { @@ -492,15 +293,18 @@ func (n *Node) handleRuntimeHostEvent(ev *host.Event) { } for _, hooks := range n.hooks { - hooks.HandleRuntimeHostEventLocked(ev) + hooks.HandleRuntimeHostEvent(ev) } } -func (n *Node) worker() { +func (n *Node) worker() { //nolint: gocyclo n.logger.Info("starting committee node") + var wg sync.WaitGroup + defer wg.Wait() + defer close(n.quitCh) - defer (n.cancelCtx)() + defer n.cancelCtx() // Wait for consensus sync. n.logger.Info("delaying worker start until after initial synchronization") @@ -521,7 +325,7 @@ func (n *Node) worker() { } // Wait for the runtime. - rt, err := n.Runtime.ActiveDescriptor(n.ctx) + rt, err := n.Runtime.RegistryDescriptor(n.ctx) if err != nil { n.logger.Error("failed to wait for registry descriptor", "err", err, @@ -532,10 +336,6 @@ func (n *Node) worker() { n.logger.Info("runtime is registered with the registry") - // Initialize the CurrentDescriptor to make sure there is one even if the runtime gets - // suspended. - n.CurrentDescriptor = rt - // If the runtime requires a key manager, wait for the key manager to actually become available // before processing any requests. if rt.KeyManager != nil { @@ -583,16 +383,6 @@ func (n *Node) worker() { n.logger.Debug("all child workers are initialized") atomic.StoreUint32(&n.workersInitialized, 1) - // Subscribe to runtime blocks. - blkCh, blkSub, err := n.Runtime.History().WatchCommittedBlocks() - if err != nil { - n.logger.Error("failed to subscribe to runtime blocks", - "err", err, - ) - return - } - defer blkSub.Close() - // Start watching runtime components so that we can provision new versions // once they are discovered. bundleRegistry := n.RuntimeRegistry.GetBundleRegistry() @@ -610,6 +400,27 @@ func (n *Node) worker() { return } } + // Start watching runtime committees so we know when the runtime committee + // changes and can update our worker role accordingly. + cmCh, cmSub, err := n.Consensus.Scheduler().WatchCommittees(n.ctx) + if err != nil { + n.logger.Error("failed to watch committees", + "err", err, + ) + return + } + defer cmSub.Close() + + // Start watching runtime blocks so we can schedule new transactions and + // check existing ones based on the latest block and active runtime descriptor. + blkCh, blkSub, err := n.Consensus.RootHash().WatchBlocks(n.ctx, n.Runtime.ID()) + if err != nil { + n.logger.Error("failed to watch runtime blocks", + "err", err, + ) + return + } + defer blkSub.Close() // Start watching runtime descriptors so we know when to update the hosted // runtime version, ensuring we never miss any deployment updates, even if @@ -636,14 +447,8 @@ func (n *Node) worker() { defer hrt.Stop() // Start the runtime host notifier and other services. - var wg sync.WaitGroup - defer wg.Wait() - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - wg.Go(func() { - if err := n.services.Serve(ctx); err != nil { + if err := n.services.Serve(n.ctx); err != nil { n.logger.Error("service group stopped", "err", err) } }) @@ -654,9 +459,12 @@ func (n *Node) worker() { case <-n.stopCh: n.logger.Info("termination requested") return + case cm := <-cmCh: + n.handleCommittee(n.ctx, cm) case blk := <-blkCh: - // Received a block (annotated). - n.handleNewBlock(blk.Block, blk.Height) + n.handleRuntimeBlock(n.ctx, blk) + case <-n.dispatchInfoCh: + n.handleDispatchInfo() case ev := <-hrtEventCh: // Received a hosted runtime event. n.handleRuntimeHostEvent(ev) @@ -690,75 +498,135 @@ func (n *Node) worker() { } } -func (n *Node) updatePeriodicMetrics() { - boolToMetricVal := func(b bool) float64 { - if b { - return 1.0 - } - return 0.0 +func (n *Node) handleCommittee(ctx context.Context, committee *scheduler.Committee) { + if committee.Kind != scheduler.KindComputeExecutor { + return + } + if committee.RuntimeID != n.Runtime.ID() { + return + } + + rs, err := n.Consensus.RootHash().GetRuntimeState(ctx, &roothash.RuntimeRequest{ + RuntimeID: n.Runtime.ID(), + Height: consensus.HeightLatest, + }) + if err != nil { + n.logger.Error("failed to get runtime state", + "err", err, + ) + return } - labels := n.getMetricLabels() + n.KeyManagerClient.SetKeyManagerID(rs.Runtime.KeyManager) + + n.updateHostedRuntimeVersion(rs.Runtime) - n.CrossNode.Lock() - defer n.CrossNode.Unlock() + switch rs.Suspended { + case true: + n.handleSuspend() + atomic.StoreUint32(&n.runtimeSuspended, 1) + case false: + n.handleCommitteeTransition(rs.Committee) + atomic.StoreUint32(&n.runtimeSuspended, 0) + } - n.logger.Debug("updating periodic worker node metrics") + n.committeeRound = rs.LastBlock.Header.Round + n.activeDescriptor = rs.Runtime - committeeInfo, ok := n.Group.CommitteeInfo() - if !ok { - return + select { + case n.dispatchInfoCh <- struct{}{}: + default: } +} + +func (n *Node) handleRuntimeBlock(ctx context.Context, blk *roothash.AnnotatedBlock) { + processedBlockCount.With(n.getMetricLabels()).Inc() - executorCommitteeP2PPeers.With(labels).Set(float64(len(n.P2P.Peers(n.Runtime.ID())))) - workerIsExecutorWorker.With(labels).Set(boolToMetricVal(committeeInfo.IsWorker())) - workerIsExecutorBackup.With(labels).Set(boolToMetricVal(committeeInfo.IsBackupWorker())) + // Update status of the current block. + n.mu.Lock() + n.latestRound = blk.Block.Header.Round + n.latestHeight = blk.Height + n.mu.Unlock() + + // Track how many rounds have failed. + if blk.Block.Header.HeaderType == block.RoundFailed { + n.logger.Warn("round has failed") + failedRoundCount.With(n.getMetricLabels()).Inc() + } - if !committeeInfo.IsMember() { - // Default to 1 if node is not in committee. - livenessRatio.With(labels).Set(1.0) + // Fetch light consensus block. + lb, err := n.Consensus.Core().GetLightBlock(ctx, blk.Height) + if err != nil { + n.logger.Error("failed to get light block", + "err", err, + "height", blk.Height, + "round", blk.Block.Header.Round, + ) return } - rs, err := n.Consensus.RootHash().GetRuntimeState(n.ctx, &roothash.RuntimeRequest{ + // Fetch incoming messages. + inMsgs, err := n.Consensus.RootHash().GetIncomingMessageQueue(ctx, &roothash.InMessageQueueRequest{ RuntimeID: n.Runtime.ID(), - Height: consensus.HeightLatest, + Height: blk.Height, }) - if err != nil || rs.LivenessStatistics == nil { + if err != nil { + n.logger.Error("failed to get incoming messages", + "err", err, + "height", blk.Height, + "round", blk.Block.Header.Round, + ) return } - totalRounds := rs.LivenessStatistics.TotalRounds - var liveRounds uint64 - for _, index := range committeeInfo.Indices { - liveRounds += rs.LivenessStatistics.LiveRounds[index] + // Fetch epoch of the latest block. + epoch, err := n.Consensus.Beacon().GetEpoch(ctx, blk.Height) + if err != nil { + n.logger.Error("failed to get epoch", + "err", err, + "height", blk.Height, + "round", blk.Block.Header.Round, + ) + return + } + + n.TxPool.ProcessIncomingMessages(inMsgs) + + n.lastBlockInfo = &runtime.BlockInfo{ + RuntimeBlock: blk.Block, + ConsensusBlock: lb, + IncomingMessages: inMsgs, + Epoch: epoch, } - livenessTotalRounds.With(labels).Set(float64(totalRounds)) - livenessLiveRounds.With(labels).Set(float64(liveRounds)) - livenessRatio.With(labels).Set(float64(liveRounds) / float64(totalRounds)) -} -func (n *Node) metricsWorker() { - n.logger.Info("delaying metrics worker start until worker is initialized") select { - case <-n.stopCh: + case n.dispatchInfoCh <- struct{}{}: + default: + } +} + +func (n *Node) handleDispatchInfo() { + if n.lastBlockInfo == nil || n.activeDescriptor == nil { return - case <-n.initCh: } - n.logger.Debug("starting metrics worker") + if n.lastBlockInfo.RuntimeBlock.Header.Round < n.committeeRound { + return + } - t := time.NewTicker(periodicMetricsInterval) - defer t.Stop() + di := &runtime.DispatchInfo{ + BlockInfo: n.lastBlockInfo, + ActiveDescriptor: n.activeDescriptor, + } - for { - select { - case <-n.stopCh: - return - case <-t.C: - } + n.TxPool.ProcessDispatchInfo(di) + + if n.lastBlockInfo.RuntimeBlock.Header.Round == n.committeeRound { + n.TxPool.RecheckTxs() + } - n.updatePeriodicMetrics() + for _, hooks := range n.hooks { + hooks.HandleNewDispatchInfo(di) } } @@ -807,6 +675,7 @@ func NewNode( stopCh: make(chan struct{}), quitCh: make(chan struct{}), initCh: make(chan struct{}), + dispatchInfoCh: make(chan struct{}, 1), logger: logging.GetLogger("worker/common/committee").With("runtime_id", runtime.ID()), } diff --git a/go/worker/compute/executor/committee/discrepancy.go b/go/worker/compute/executor/committee/discrepancy.go index faea6833c89..2416499876b 100644 --- a/go/worker/compute/executor/committee/discrepancy.go +++ b/go/worker/compute/executor/committee/discrepancy.go @@ -16,7 +16,7 @@ type discrepancyEvent struct { } func (n *Node) handleDiscrepancy(ctx context.Context, ev *discrepancyEvent) { - if ev.round != n.blockInfo.RuntimeBlock.Header.Round+1 { + if ev.round != n.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round+1 { n.logger.Debug("ignoring bad discrepancy event", "height", ev.height, "round", ev.round, @@ -71,7 +71,7 @@ func (n *Node) predictDiscrepancy(ctx context.Context, ec *commitment.ExecutorCo } // Verify and add the commitment. - if err := commitment.VerifyExecutorCommitment(ctx, n.blockInfo.RuntimeBlock, n.blockInfo.ActiveDescriptor, n.committeeInfo.Committee.ValidFor, ec, nil, n.committeeInfo); err != nil { + if err := commitment.VerifyExecutorCommitment(ctx, n.dispatchInfo.BlockInfo.RuntimeBlock, n.dispatchInfo.ActiveDescriptor, n.committeeInfo.Committee.ValidFor, ec, nil, n.committeeInfo); err != nil { n.logger.Debug("ignoring bad observed executor commitment, verification failed", "err", err, "node_id", ec.NodeID, @@ -88,15 +88,15 @@ func (n *Node) predictDiscrepancy(ctx context.Context, ec *commitment.ExecutorCo } // In case observed commits indicate a discrepancy, preempt consensus and immediately handle. - if _, err := n.commitPool.ProcessCommitments(n.committeeInfo.Committee, n.blockInfo.ActiveDescriptor.Executor.AllowedStragglers, false); err != commitment.ErrDiscrepancyDetected { + if _, err := n.commitPool.ProcessCommitments(n.committeeInfo.Committee, n.dispatchInfo.ActiveDescriptor.Executor.AllowedStragglers, false); err != commitment.ErrDiscrepancyDetected { return } n.logger.Warn("observed commitments indicate discrepancy") n.handleDiscrepancy(ctx, &discrepancyEvent{ - height: uint64(n.blockInfo.ConsensusBlock.Height), - round: n.blockInfo.RuntimeBlock.Header.Round + 1, + height: uint64(n.dispatchInfo.BlockInfo.ConsensusBlock.Height), + round: n.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round + 1, rank: n.commitPool.HighestRank, timeout: false, authoritative: false, diff --git a/go/worker/compute/executor/committee/hooks.go b/go/worker/compute/executor/committee/hooks.go index 969e14a7f8e..f921acb9d08 100644 --- a/go/worker/compute/executor/committee/hooks.go +++ b/go/worker/compute/executor/committee/hooks.go @@ -8,18 +8,19 @@ import ( // Ensure Node implements NodeHooks. var _ committee.NodeHooks = (*Node)(nil) -// HandleNewBlockLocked implements NodeHooks. -// Guarded by n.commonNode.CrossNode. -func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) { +// HandleNewDispatchInfo implements NodeHooks. +func (n *Node) HandleNewDispatchInfo(di *runtime.DispatchInfo) { // Update our availability. + n.mu.Lock() n.nudgeAvailabilityLocked(false) + n.mu.Unlock() - // Drop blocks if the worker falls behind. + // Drop if the worker falls behind. select { - case <-n.blockInfoCh: + case <-n.dispatchInfoCh: default: } // Non-blocking send. - n.blockInfoCh <- bi + n.dispatchInfoCh <- di } diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 0565fc500d9..6fd2ac7cb89 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -50,9 +50,10 @@ const executeBatchTimeoutFactor = 3 // Node is a committee node. type Node struct { - runtimeReady bool - runtimeTrustSynced bool - runtimeTrustSyncCncl context.CancelFunc + mu sync.Mutex + runtimeReady bool + runtimeTrustSynced bool + runtimeTrustSyncCancel context.CancelFunc commonNode *committee.Node commonCfg commonWorker.Config @@ -77,7 +78,7 @@ type Node struct { proposals *proposalQueue commitPool *commitment.Pool - blockInfoCh chan *runtime.BlockInfo + dispatchInfoCh chan *runtime.DispatchInfo processedBatchCh chan *processedBatch reselectCh chan struct{} missingTxCh chan [][]byte @@ -90,7 +91,7 @@ type Node struct { rt host.RichRuntime committeeInfo *committee.CommitteeInfo - blockInfo *runtime.BlockInfo + dispatchInfo *runtime.DispatchInfo rtState *roothash.RuntimeState roundResults *roothash.RoundResults discrepancy *discrepancyEvent @@ -343,33 +344,24 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) { // If the next block will be an epoch transition block, do not propose anything as it will be // reverted anyway (since the committee will change). - epochState, err := n.commonNode.Consensus.Beacon().GetFutureEpoch(ctx, n.blockInfo.ConsensusBlock.Height) + height, err := n.commonNode.Consensus.Core().GetLatestHeight(ctx) if err != nil { - n.logger.Error("failed to fetch future epoch state", + n.logger.Error("failed to fetch latest height", "err", err, ) return } - if epochState != nil && epochState.Height == n.blockInfo.ConsensusBlock.Height+1 { - n.logger.Debug("not scheduling, next consensus block is an epoch transition") - return - } - - // Fetch incoming message queue metadata to see if there's any queued messages. - inMsgMeta, err := n.commonNode.Consensus.RootHash().GetIncomingMessageQueueMeta(ctx, &roothash.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), - // We make the check at the latest height even though we will later only look at the last - // height. This will make sure that any messages eventually get processed even if there are - // no other runtime transactions being sent. In the worst case this will result in an empty - // block being generated. - Height: consensus.HeightLatest, - }) + epochState, err := n.commonNode.Consensus.Beacon().GetFutureEpoch(ctx, height) if err != nil { - n.logger.Error("failed to fetch incoming runtime message queue metadata", + n.logger.Error("failed to fetch future epoch state", "err", err, ) return } + if epochState != nil && epochState.Height == height+1 { + n.logger.Debug("not scheduling, next consensus block is an epoch transition") + return + } // Check what the runtime supports. rtInfo, err := n.rt.GetInfo(ctx) @@ -394,7 +386,7 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) { // We have some transactions, schedule batch. case len(n.roundResults.Messages) > 0: // We have runtime message results (and batch timeout expired), schedule batch. - case inMsgMeta.Size > 0: + case len(n.dispatchInfo.BlockInfo.IncomingMessages) > 0: // We have queued incoming runtime messages (and batch timeout expired), schedule batch. case n.rtState.LastNormalRound == n.rtState.GenesisBlock.Header.Round: // This is the runtime genesis, schedule batch. @@ -460,10 +452,10 @@ func (n *Node) startSchedulingBatch(ctx context.Context, batch []*txpool.TxQueue ctx, n.rt, protocol.ExecutionModeSchedule, - n.blockInfo.Epoch, - n.blockInfo.ConsensusBlock, - n.blockInfo.RuntimeBlock, - n.blockInfo.IncomingMessages, + n.dispatchInfo.BlockInfo.Epoch, + n.dispatchInfo.BlockInfo.ConsensusBlock, + n.dispatchInfo.BlockInfo.RuntimeBlock, + n.dispatchInfo.BlockInfo.IncomingMessages, n.rtState, n.roundResults, hash.Hash{}, // IORoot is ignored as it is yet to be determined. @@ -487,8 +479,8 @@ func (n *Node) startSchedulingBatch(ctx context.Context, batch []*txpool.TxQueue proposal := commitment.Proposal{ NodeID: n.commonNode.Identity.NodeSigner.Public(), Header: commitment.ProposalHeader{ - Round: n.blockInfo.RuntimeBlock.Header.Round + 1, - PreviousHash: n.blockInfo.RuntimeBlock.Header.EncodedHash(), + Round: n.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round + 1, + PreviousHash: n.dispatchInfo.BlockInfo.RuntimeBlock.Header.EncodedHash(), BatchHash: rsp.TxInputRoot, }, Batch: rsp.TxHashes, @@ -607,10 +599,10 @@ func (n *Node) startProcessingBatch(ctx context.Context, proposal *commitment.Pr ctx, n.rt, protocol.ExecutionModeExecute, - n.blockInfo.Epoch, - n.blockInfo.ConsensusBlock, - n.blockInfo.RuntimeBlock, - n.blockInfo.IncomingMessages, + n.dispatchInfo.BlockInfo.Epoch, + n.dispatchInfo.BlockInfo.ConsensusBlock, + n.dispatchInfo.BlockInfo.RuntimeBlock, + n.dispatchInfo.BlockInfo.IncomingMessages, n.rtState, n.roundResults, proposal.Header.BatchHash, @@ -739,10 +731,10 @@ func (n *Node) proposeBatch( // Submit commitment. // Make sure we are still in the right state/round. state, ok := n.state.(StateProcessingBatch) - if !ok || lastHeader.Round != n.blockInfo.RuntimeBlock.Header.Round { + if !ok || lastHeader.Round != n.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round { n.logger.Error("new state or round since started proposing batch", "state", state, - "round", n.blockInfo.RuntimeBlock.Header.Round, + "round", n.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round, "expected_round", lastHeader.Round, ) return @@ -861,8 +853,8 @@ func (n *Node) processProposal(ctx context.Context, proposal *commitment.Proposa // Submit failure if the batch is invalid. // The scheduler is violating the protocol and should be punished. - maxBatchSize := n.blockInfo.ActiveDescriptor.TxnScheduler.MaxBatchSize - maxBytes := n.blockInfo.ActiveDescriptor.TxnScheduler.MaxBatchSizeBytes + maxBatchSize := n.dispatchInfo.ActiveDescriptor.TxnScheduler.MaxBatchSize + maxBytes := n.dispatchInfo.ActiveDescriptor.TxnScheduler.MaxBatchSizeBytes if batchSize > maxBatchSize || maxBytes > 0 && bytes > maxBytes { n.transitionStateToProcessingFailure(proposal, rank, bytes, maxBytes, batchSize, maxBatchSize) return @@ -912,7 +904,7 @@ func (n *Node) processProposal(ctx context.Context, proposal *commitment.Proposa // TODO: Handle proposal equivocation. // Maybe process if we have the correct block. - currentHash := n.blockInfo.RuntimeBlock.Header.EncodedHash() + currentHash := n.dispatchInfo.BlockInfo.RuntimeBlock.Header.EncodedHash() if !currentHash.Equal(&proposal.Header.PreviousHash) { return } @@ -962,7 +954,11 @@ func (n *Node) nudgeAvailabilityLocked(force bool) { } } -func (n *Node) HandleRuntimeHostEventLocked(ev *host.Event) { +// HandleRuntimeHostEvent implements NodeHooks. +func (n *Node) HandleRuntimeHostEvent(ev *host.Event) { + n.mu.Lock() + defer n.mu.Unlock() + switch { case ev.Started != nil: // Make sure the runtime supports all the required features. @@ -1023,7 +1019,7 @@ func (n *Node) handleProcessedBatch(ctx context.Context, batch *processedBatch) ) return } - lastHeader := n.blockInfo.RuntimeBlock.Header + lastHeader := n.dispatchInfo.BlockInfo.RuntimeBlock.Header // A nil batch indicates that scheduling or processing has failed. // Return to the initial state and retry. @@ -1131,7 +1127,7 @@ func (n *Node) handleObservedExecutorCommitment(ctx context.Context, ec *commitm func (n *Node) estimatePoolRank(ctx context.Context, ec *commitment.ExecutorCommitment, observed bool) { // Filter for this round only. - round := n.blockInfo.RuntimeBlock.Header.Round + 1 + round := n.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round + 1 if ec.Header.Header.Round != round { n.logger.Debug("ignoring bad executor commitment, not for this round", "round", round, @@ -1153,7 +1149,7 @@ func (n *Node) estimatePoolRank(ctx context.Context, ec *commitment.ExecutorComm if observed { // Verify the commitment. - if err := commitment.VerifyExecutorCommitment(ctx, n.blockInfo.RuntimeBlock, n.blockInfo.ActiveDescriptor, n.committeeInfo.Committee.ValidFor, ec, nil, n.committeeInfo); err != nil { + if err := commitment.VerifyExecutorCommitment(ctx, n.dispatchInfo.BlockInfo.RuntimeBlock, n.dispatchInfo.ActiveDescriptor, n.committeeInfo.Committee.ValidFor, ec, nil, n.committeeInfo); err != nil { n.logger.Debug("ignoring bad executor commitment, verification failed", "err", err, "node_id", ec.NodeID, @@ -1185,18 +1181,18 @@ func (n *Node) estimatePoolRank(ctx context.Context, ec *commitment.ExecutorComm func (n *Node) finalizePreviousRound() { n.logger.Info("considering the round finalized", - "round", n.blockInfo.RuntimeBlock.Header.Round, - "header_hash", n.blockInfo.RuntimeBlock.Header.EncodedHash(), - "header_type", n.blockInfo.RuntimeBlock.Header.HeaderType, + "round", n.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round, + "header_hash", n.dispatchInfo.BlockInfo.RuntimeBlock.Header.EncodedHash(), + "header_type", n.dispatchInfo.BlockInfo.RuntimeBlock.Header.HeaderType, ) - if n.proposedBatch != nil && n.blockInfo.RuntimeBlock.Header.HeaderType == block.Normal { - switch n.blockInfo.RuntimeBlock.Header.IORoot.Equal(&n.proposedBatch.proposedIORoot) { + if n.proposedBatch != nil && n.dispatchInfo.BlockInfo.RuntimeBlock.Header.HeaderType == block.Normal { + switch n.dispatchInfo.BlockInfo.RuntimeBlock.Header.IORoot.Equal(&n.proposedBatch.proposedIORoot) { case false: n.logger.Error("proposed batch was not finalized", - "header_io_root", n.blockInfo.RuntimeBlock.Header.IORoot, + "header_io_root", n.dispatchInfo.BlockInfo.RuntimeBlock.Header.IORoot, "proposed_io_root", n.proposedBatch.proposedIORoot, - "header_type", n.blockInfo.RuntimeBlock.Header.HeaderType, + "header_type", n.dispatchInfo.BlockInfo.RuntimeBlock.Header.HeaderType, "batch_size", len(n.proposedBatch.txHashes), ) case true: @@ -1205,7 +1201,7 @@ func (n *Node) finalizePreviousRound() { n.logger.Debug("removing processed batch from queue", "batch_size", len(n.proposedBatch.txHashes), - "io_root", n.blockInfo.RuntimeBlock.Header.IORoot, + "io_root", n.dispatchInfo.BlockInfo.RuntimeBlock.Header.IORoot, ) // Remove processed transactions from queue. @@ -1313,15 +1309,15 @@ func (n *Node) worker() { n.logger.Info("termination requested") return case <-n.commonNode.KeyManagerClient.Initialized(): - n.commonNode.CrossNode.Lock() + n.mu.Lock() n.nudgeAvailabilityLocked(false) - n.commonNode.CrossNode.Unlock() + n.mu.Unlock() } }() // Restart the round worker every time a runtime block is finalized. for { - var bi *runtime.BlockInfo + var di *runtime.DispatchInfo func() { var wg sync.WaitGroup @@ -1337,12 +1333,12 @@ func (n *Node) worker() { select { case <-n.stopCh: - case bi = <-n.blockInfoCh: + case di = <-n.dispatchInfoCh: } }() - // Round worker stopped, so it is safe to update the last block info. - n.blockInfo = bi + // Round worker stopped, so it is safe to update the last dispatch info. + n.dispatchInfo = di select { case <-n.stopCh: @@ -1354,10 +1350,10 @@ func (n *Node) worker() { } func (n *Node) roundWorker(ctx context.Context) { - if n.blockInfo == nil { + if n.dispatchInfo == nil { return } - round := n.blockInfo.RuntimeBlock.Header.Round + 1 + round := n.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round + 1 n.logger.Debug("round worker started", "round", round, @@ -1399,7 +1395,7 @@ func (n *Node) roundWorker(ctx context.Context) { // Fetch state and round results upfront. var err error - n.rtState, n.roundResults, err = n.getRtStateAndRoundResults(ctx, n.blockInfo.ConsensusBlock.Height) + n.rtState, n.roundResults, err = n.getRtStateAndRoundResults(ctx, n.dispatchInfo.BlockInfo.ConsensusBlock.Height) if err != nil { n.logger.Debug("skipping round, failed to fetch state and round results", "err", err, @@ -1409,7 +1405,7 @@ func (n *Node) roundWorker(ctx context.Context) { // Prepare flush timer for the primary transaction scheduler. flush := false - flushTimer := time.NewTimer(n.blockInfo.ActiveDescriptor.TxnScheduler.BatchFlushTimeout) + flushTimer := time.NewTimer(n.dispatchInfo.ActiveDescriptor.TxnScheduler.BatchFlushTimeout) defer flushTimer.Stop() // Compute node's rank when scheduling transactions. @@ -1538,7 +1534,7 @@ func NewNode( state: StateWaitingForBatch{}, txSync: txsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()), stateTransitions: pubsub.NewBroker(false), - blockInfoCh: make(chan *runtime.BlockInfo, 1), + dispatchInfoCh: make(chan *runtime.DispatchInfo, 1), processedBatchCh: make(chan *processedBatch, 1), reselectCh: make(chan struct{}, 1), missingTxCh: make(chan [][]byte, 1), diff --git a/go/worker/compute/executor/committee/status.go b/go/worker/compute/executor/committee/status.go index 09ef0eabb3c..a89d6c0cf33 100644 --- a/go/worker/compute/executor/committee/status.go +++ b/go/worker/compute/executor/committee/status.go @@ -6,8 +6,8 @@ import ( // GetStatus returns the executor committee node status. func (n *Node) GetStatus() (*api.Status, error) { - n.commonNode.CrossNode.Lock() - defer n.commonNode.CrossNode.Unlock() + n.mu.Lock() + defer n.mu.Unlock() var status api.Status switch { diff --git a/go/worker/compute/executor/committee/trust.go b/go/worker/compute/executor/committee/trust.go index 0ddd983996c..d446e881cd0 100644 --- a/go/worker/compute/executor/committee/trust.go +++ b/go/worker/compute/executor/committee/trust.go @@ -9,7 +9,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/host" ) -// startRuntimeTrustSyncLocked asks the runtime to start syncing its light client up to the current +// startRuntimeTrustSync asks the runtime to start syncing its light client up to the current // latest height. If the runtime does not actually use a trust root, this will be a no-op. // // When syncing is completed, the runtimeTrustSycned flag will be set. @@ -17,7 +17,7 @@ func (n *Node) startRuntimeTrustSyncLocked(rt host.RichRuntime) { n.cancelRuntimeTrustSyncLocked() // Cancel any outstanding sync. var ctx context.Context - ctx, n.runtimeTrustSyncCncl = context.WithCancel(n.ctx) + ctx, n.runtimeTrustSyncCancel = context.WithCancel(n.ctx) syncOp := func() error { height, err := n.commonNode.Consensus.Core().GetLatestHeight(ctx) @@ -53,8 +53,8 @@ func (n *Node) startRuntimeTrustSyncLocked(rt host.RichRuntime) { n.logger.Info("runtime light client sync succeeded") // Runtime has successfully synced its light client. - n.commonNode.CrossNode.Lock() - defer n.commonNode.CrossNode.Unlock() + n.mu.Lock() + defer n.mu.Unlock() n.runtimeTrustSynced = true n.nudgeAvailabilityLocked(true) @@ -62,9 +62,9 @@ func (n *Node) startRuntimeTrustSyncLocked(rt host.RichRuntime) { } func (n *Node) cancelRuntimeTrustSyncLocked() { - if n.runtimeTrustSyncCncl == nil { + if n.runtimeTrustSyncCancel == nil { return } - n.runtimeTrustSyncCncl() - n.runtimeTrustSyncCncl = nil + n.runtimeTrustSyncCancel() + n.runtimeTrustSyncCancel = nil } diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index 062b37eaa06..ab3d2ffa80b 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -329,15 +329,13 @@ func (w *Worker) GetLocalStorage() storageApi.LocalBackend { return w.localStorage } -// NodeHooks implementation. - -// HandleNewBlockLocked is guarded by CrossNode. -func (w *Worker) HandleNewBlockLocked(*runtime.BlockInfo) { +// HandleNewDispatchInfo implements NodeHooks. +func (w *Worker) HandleNewDispatchInfo(*runtime.DispatchInfo) { // Nothing to do here. } -// HandleRuntimeHostEventLocked is guarded by CrossNode. -func (w *Worker) HandleRuntimeHostEventLocked(*host.Event) { +// HandleRuntimeHostEvent implements NodeHooks. +func (w *Worker) HandleRuntimeHostEvent(*host.Event) { // Nothing to do here. }