Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changelog/6425.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
go/worker/common: Replace epoch transitions with committee transitions

The following metric has been removed:

- `oasis_worker_epoch_transition_count`

The following metric has been added:

- `oasis_worker_committee_transition_count`
20 changes: 10 additions & 10 deletions docs/oasis-node/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,16 @@ oasis_worker_batch_runtime_processing_time | Summary | Time it takes for a batch
oasis_worker_batch_size | Summary | Number of transactions in a batch. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_client_lb_healthy_instance_count | Gauge | Number of healthy instances in the load balancer. | runtime | [runtime/host/loadbalance](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/host/loadbalance/metrics.go)
oasis_worker_client_lb_requests | Counter | Number of requests processed by the given load balancer instance. | runtime, lb_instance | [runtime/host/loadbalance](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/host/loadbalance/metrics.go)
oasis_worker_epoch_number | Gauge | Current epoch number as seen by the worker. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_epoch_transition_count | Counter | Number of epoch transitions. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_committee_transition_count | Counter | Number of committee transitions. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_epoch_number | Gauge | Current epoch number as seen by the worker. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_execution_discrepancy_detected_count | Counter | Number of detected execute discrepancies. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_executor_committee_p2p_peers | Gauge | Number of executor committee P2P peers. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_executor_is_backup_worker | Gauge | 1 if worker is currently an executor backup worker, 0 otherwise. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_executor_is_worker | Gauge | 1 if worker is currently an executor worker, 0 otherwise. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_executor_liveness_live_ratio | Gauge | Ratio between live and total rounds. Reports 1 if node is not in committee. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_executor_liveness_live_rounds | Gauge | Number of live rounds in last epoch. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_executor_liveness_total_rounds | Gauge | Number of total rounds in last epoch. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_failed_round_count | Counter | Number of failed roothash rounds. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_executor_committee_p2p_peers | Gauge | Number of executor committee P2P peers. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_executor_is_backup_worker | Gauge | 1 if worker is currently an executor backup worker, 0 otherwise. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_executor_is_worker | Gauge | 1 if worker is currently an executor worker, 0 otherwise. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_executor_liveness_live_ratio | Gauge | Ratio between live and total rounds. Reports 1 if node is not in committee. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_executor_liveness_live_rounds | Gauge | Number of live rounds in last epoch. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_executor_liveness_total_rounds | Gauge | Number of total rounds in last epoch. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_failed_round_count | Counter | Number of failed roothash rounds. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_keymanager_churp_committee_size | Gauge | Number of nodes in the committee | runtime, churp | [worker/keymanager](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/keymanager/metrics.go)
oasis_worker_keymanager_churp_confirmed_applications_total | Gauge | Number of confirmed applications | runtime, churp | [worker/keymanager](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/keymanager/metrics.go)
oasis_worker_keymanager_churp_enclave_rpc_failures_total | Counter | Number of failed enclave rpc calls. | runtime, churp, method | [worker/keymanager](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/keymanager/metrics.go)
Expand Down Expand Up @@ -142,7 +142,7 @@ oasis_worker_node_registration_eligible | Gauge | Is oasis node eligible for reg
oasis_worker_node_status_frozen | Gauge | Is oasis node frozen (binary). | | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go)
oasis_worker_node_status_runtime_faults | Gauge | Number of runtime faults. | runtime | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go)
oasis_worker_node_status_runtime_suspended | Gauge | Runtime node suspension status (binary). | runtime | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go)
oasis_worker_processed_block_count | Counter | Number of processed roothash blocks. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_processed_block_count | Counter | Number of processed roothash blocks. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/metrics.go)
oasis_worker_processed_event_count | Counter | Number of processed roothash events. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_storage_commit_latency | Summary | Latency of storage commit calls (state + outputs) (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_storage_full_round | Gauge | The last round that was fully synced and finalized. | runtime | [worker/storage/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/storage/committee/metrics.go)
Expand Down
9 changes: 8 additions & 1 deletion go/runtime/api/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ type BlockInfo struct {

// Epoch is the epoch the runtime block belongs to.
Epoch beacon.EpochTime
}

// DispatchInfo provides the context for checking, executing, or scheduling
// a batch of transactions.
type DispatchInfo struct {
// BlockInfo holds information about the latest runtime block.
BlockInfo *BlockInfo

// ActiveDescriptor is the runtime descriptor active for the runtime block.
// ActiveDescriptor is the runtime descriptor currently in use for dispatch.
ActiveDescriptor *registry.Runtime
}
83 changes: 50 additions & 33 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
runtime "github.com/oasisprotocol/oasis-core/go/runtime/api"
"github.com/oasisprotocol/oasis-core/go/runtime/history"
Expand Down Expand Up @@ -107,8 +106,11 @@ type TransactionPool interface {
// and only the following transactions will be returned.
GetSchedulingExtra(offset *hash.Hash, limit int) []*TxQueueMeta

// ProcessBlock updates the last known runtime block information.
ProcessBlock(bi *runtime.BlockInfo)
// RecheckTxs triggers a recheck of all transactions.
RecheckTxs()

// ProcessDispatchInfo updates the last known runtime dispatch information.
ProcessDispatchInfo(di *runtime.DispatchInfo)

// ProcessIncomingMessages loads transactions from incoming messages into the pool.
ProcessIncomingMessages(inMsgs []*message.IncomingMessage)
Expand Down Expand Up @@ -158,10 +160,10 @@ type txPool struct {
proposedTxsLock sync.Mutex
proposedTxs map[hash.Hash]*TxQueueMeta

blockInfoLock sync.Mutex
blockInfo *runtime.BlockInfo
lastBlockProcessed time.Time
lastRecheckRound uint64
dispatchInfoLock sync.Mutex
dispatchInfo *runtime.DispatchInfo
lastDispatchInfoProcessed time.Time
lastRecheckRound uint64

republishCh *channels.RingChannel
}
Expand Down Expand Up @@ -385,24 +387,39 @@ HASH_LOOP:
return txs, missingTxs
}

func (t *txPool) ProcessBlock(bi *runtime.BlockInfo) {
t.blockInfoLock.Lock()
defer t.blockInfoLock.Unlock()
func (t *txPool) ProcessDispatchInfo(di *runtime.DispatchInfo) {
t.dispatchInfoLock.Lock()
defer t.dispatchInfoLock.Unlock()

if t.blockInfo == nil {
if t.dispatchInfo == nil {
close(t.initCh)
}

t.blockInfo = bi
t.lastBlockProcessed = time.Now()
t.dispatchInfo = di
t.lastDispatchInfoProcessed = time.Now()

roundDifference := di.BlockInfo.RuntimeBlock.Header.Round - t.lastRecheckRound
if roundDifference > t.cfg.RecheckInterval {
t.recheckTxsLocked()
}
}

func (t *txPool) RecheckTxs() {
t.dispatchInfoLock.Lock()
defer t.dispatchInfoLock.Unlock()

t.recheckTxsLocked()
}

// Force transaction rechecks on epoch transitions and if needed.
isEpochTransition := bi.RuntimeBlock.Header.HeaderType == block.EpochTransition
roundDifference := bi.RuntimeBlock.Header.Round - t.lastRecheckRound
if isEpochTransition || roundDifference > t.cfg.RecheckInterval {
t.recheckTxCh.In() <- struct{}{}
t.lastRecheckRound = bi.RuntimeBlock.Header.Round
func (t *txPool) recheckTxsLocked() {
select {
case <-t.initCh:
default:
return
}

t.recheckTxCh.In() <- struct{}{}
t.lastRecheckRound = t.dispatchInfo.BlockInfo.RuntimeBlock.Header.Round
}

func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) {
Expand All @@ -427,14 +444,14 @@ func (t *txPool) All() [][]byte {
return txs
}

func (t *txPool) getCurrentBlockInfo() (*runtime.BlockInfo, time.Time, error) {
t.blockInfoLock.Lock()
defer t.blockInfoLock.Unlock()
func (t *txPool) getCurrentDispatchInfo() (*runtime.DispatchInfo, time.Time, error) {
t.dispatchInfoLock.Lock()
defer t.dispatchInfoLock.Unlock()

if t.blockInfo == nil {
return nil, time.Time{}, fmt.Errorf("no current block available")
if t.dispatchInfo == nil {
return nil, time.Time{}, fmt.Errorf("no current dispatch info available")
}
return t.blockInfo, t.lastBlockProcessed, nil
return t.dispatchInfo, t.lastDispatchInfoProcessed, nil
}

// checkTxBatch requests the runtime to check the validity of a transaction batch.
Expand All @@ -445,22 +462,22 @@ func (t *txPool) checkTxBatch(ctx context.Context) error {
return fmt.Errorf("runtime is not available")
}

// Get the current block info.
bi, lastBlockProcessed, err := t.getCurrentBlockInfo()
// Get the current dispatch info.
di, lastDispatchInfoProcessed, err := t.getCurrentDispatchInfo()
if err != nil {
return fmt.Errorf("failed to get current block info: %w", err)
return fmt.Errorf("failed to get current dispatch info: %w", err)
}

// Ensure block round is synced to storage.
waitSyncCtx, cancelWaitSyncCtx := context.WithTimeout(ctx, checkTxWaitRoundSyncedTimeout)
defer cancelWaitSyncCtx()

t.logger.Debug("ensuring block round is synced", "round", bi.RuntimeBlock.Header.Round)
if _, err = t.history.WaitRoundSynced(waitSyncCtx, bi.RuntimeBlock.Header.Round); err != nil {
t.logger.Debug("ensuring block round is synced", "round", di.BlockInfo.RuntimeBlock.Header.Round)
if _, err = t.history.WaitRoundSynced(waitSyncCtx, di.BlockInfo.RuntimeBlock.Header.Round); err != nil {
// Block round isn't synced yet, so make sure the batch check is
// retried later to avoid aborting the runtime, as it is not its fault.
t.logger.Info("block round is not synced yet, retrying transaction batch check later",
"round", bi.RuntimeBlock.Header.Round,
"round", di.BlockInfo.RuntimeBlock.Header.Round,
"err", err,
)
t.checkTxCh.In() <- struct{}{}
Expand All @@ -482,7 +499,7 @@ func (t *txPool) checkTxBatch(ctx context.Context) error {
for _, pct := range batch {
rawTxBatch = append(rawTxBatch, pct.Raw())
}
return t.runtime.CheckTx(checkCtx, bi.RuntimeBlock, bi.ConsensusBlock, bi.Epoch, bi.ActiveDescriptor.Executor.MaxMessages, rawTxBatch)
return t.runtime.CheckTx(checkCtx, di.BlockInfo.RuntimeBlock, di.BlockInfo.ConsensusBlock, di.BlockInfo.Epoch, di.ActiveDescriptor.Executor.MaxMessages, rawTxBatch)
}()
switch {
case err == nil:
Expand Down Expand Up @@ -625,7 +642,7 @@ func (t *txPool) checkTxBatch(ctx context.Context) error {
// Kick off publishing for any new txs after waiting for block publish delay based on when
// we received the block that we just used to check the transaction batch.
go func() {
time.Sleep(time.Until(lastBlockProcessed.Add(newBlockPublishDelay)))
time.Sleep(time.Until(lastDispatchInfoProcessed.Add(newBlockPublishDelay)))
t.republishCh.In() <- struct{}{}
}()

Expand Down
25 changes: 13 additions & 12 deletions go/worker/client/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/logging"
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
runtime "github.com/oasisprotocol/oasis-core/go/runtime/api"
"github.com/oasisprotocol/oasis-core/go/runtime/bundle/component"
Expand Down Expand Up @@ -79,13 +81,13 @@ func (n *Node) Initialized() <-chan struct{} {
return n.initCh
}

// HandleNewBlockLocked is guarded by CrossNode.
func (n *Node) HandleNewBlockLocked(*runtime.BlockInfo) {
// HandleNewDispatchInfo implements NodeHooks.
func (n *Node) HandleNewDispatchInfo(*runtime.DispatchInfo) {
// Nothing to do here.
}

// HandleRuntimeHostEventLocked is guarded by CrossNode.
func (n *Node) HandleRuntimeHostEventLocked(ev *host.Event) {
// HandleRuntimeHostEvent implements NodeHooks.
func (n *Node) HandleRuntimeHostEvent(ev *host.Event) {
if n.roleProvider == nil {
return
}
Expand Down Expand Up @@ -169,15 +171,14 @@ func (n *Node) CheckTx(ctx context.Context, tx []byte) (*protocol.CheckTxResult,
func (n *Node) Query(ctx context.Context, round uint64, method string, args []byte, comp *component.ID) ([]byte, error) {
hrt := n.commonNode.GetHostedRuntime()

// Fetch the active descriptor so we can get the current message limits.
n.commonNode.CrossNode.Lock()
dsc := n.commonNode.CurrentDescriptor
n.commonNode.CrossNode.Unlock()

if dsc == nil {
return nil, api.ErrNoHostedRuntime
rs, err := n.commonNode.Consensus.RootHash().GetRuntimeState(ctx, &roothash.RuntimeRequest{
RuntimeID: n.commonNode.Runtime.ID(),
Height: consensus.HeightLatest,
})
if err != nil {
return nil, fmt.Errorf("client: failed to get runtime state: %w", err)
}
maxMessages := dsc.Executor.MaxMessages
maxMessages := rs.Runtime.Executor.MaxMessages

annBlk, err := n.commonNode.Runtime.History().GetAnnotatedBlock(ctx, round)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/worker/client/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
// leave it like that before initialization starts, so the registration
// is blocked until the runtime finishes initializing.
// The availability of the role provider is changed in
// HandleRuntimeHostEventLocked in worker/client/committee/node.go.
// HandleRuntimeHostEvent in worker/client/committee/node.go.
default:
}

Expand Down
8 changes: 5 additions & 3 deletions go/worker/common/committee/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ func (g *Group) Suspend() {
g.committee = nil
}

// EpochTransition processes an epoch transition that just happened.
func (g *Group) EpochTransition(ctx context.Context, committee *scheduler.Committee) error {
// CommitteeTransition processes a committee transition that just happened.
func (g *Group) CommitteeTransition(ctx context.Context, committee *scheduler.Committee) error {
g.Lock()
defer g.Unlock()

g.logger.Info("committee transition")

// Invalidate current committee. In case we cannot process this transition,
// this should cause the node to transition into NotReady and stay there
// until the next epoch transition.
Expand Down Expand Up @@ -203,7 +205,7 @@ func (g *Group) EpochTransition(ctx context.Context, committee *scheduler.Commit
nodes: g.nodes,
}

g.logger.Info("epoch transition complete",
g.logger.Info("committee transition complete",
"epoch", epochNumber,
"executor_roles", g.committee.Roles,
)
Expand Down
Loading
Loading