diff --git a/arbnode/mel/runner/mel.go b/arbnode/mel/runner/mel.go index d1d3a2dc21..474fb0b77c 100644 --- a/arbnode/mel/runner/mel.go +++ b/arbnode/mel/runner/mel.go @@ -240,6 +240,10 @@ func (m *MessageExtractor) GetHeadState(ctx context.Context) (*mel.State, error) return m.melDB.GetHeadMelState(ctx) } +func (m *MessageExtractor) GetState(ctx context.Context, parentchainBlocknumber uint64) (*mel.State, error) { + return m.melDB.State(ctx, parentchainBlocknumber) +} + func (m *MessageExtractor) GetMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { headState, err := m.melDB.GetHeadMelState(ctx) if err != nil { diff --git a/changelog/ganeshvanahalli-nit-4142.md b/changelog/ganeshvanahalli-nit-4142.md new file mode 100644 index 0000000000..b95a880f8e --- /dev/null +++ b/changelog/ganeshvanahalli-nit-4142.md @@ -0,0 +1,2 @@ +### Added + - Introduces MEL validator \ No newline at end of file diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index d5ba568e1b..f5f30eedea 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -47,7 +47,7 @@ import ( "github.com/offchainlabs/nitro/cmd/staterecovery" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/solgen/go/rollupgen" - "github.com/offchainlabs/nitro/staker/bold" + "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/statetransfer" "github.com/offchainlabs/nitro/util" "github.com/offchainlabs/nitro/util/arbmath" @@ -957,7 +957,7 @@ func validateGenesisAssertion(ctx context.Context, rollupAddress common.Address, if err != nil { return err } - genesisAssertionCreationInfo, err := bold.ReadBoldAssertionCreationInfo(ctx, userLogic, l1Client, rollupAddress, genesisAssertionHash) + genesisAssertionCreationInfo, err := staker.ReadBoldAssertionCreationInfo(ctx, userLogic, l1Client, rollupAddress, genesisAssertionHash) if err != nil { // If we can't find the empty genesis assertion, try to compute the assertion for non-empty genesis genesisGlobalState := protocol.GoGlobalState{ @@ -978,7 +978,7 @@ func validateGenesisAssertion(ctx context.Context, rollupAddress common.Address, if err != nil { return err } - genesisAssertionCreationInfo, err = bold.ReadBoldAssertionCreationInfo(ctx, userLogic, l1Client, rollupAddress, genesisAssertionHash) + genesisAssertionCreationInfo, err = staker.ReadBoldAssertionCreationInfo(ctx, userLogic, l1Client, rollupAddress, genesisAssertionHash) if err != nil { return err } diff --git a/cmd/pruning/pruning.go b/cmd/pruning/pruning.go index 6835058ca1..9017ec61f9 100644 --- a/cmd/pruning/pruning.go +++ b/cmd/pruning/pruning.go @@ -34,7 +34,6 @@ import ( "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/solgen/go/rollupgen" "github.com/offchainlabs/nitro/staker" - "github.com/offchainlabs/nitro/staker/bold" legacystaker "github.com/offchainlabs/nitro/staker/legacy" multiprotocolstaker "github.com/offchainlabs/nitro/staker/multi_protocol" ) @@ -260,7 +259,7 @@ func getLatestConfirmedHash(ctx context.Context, rollupAddrs chaininfo.RollupAdd if err != nil { return common.Hash{}, err } - assertion, err := bold.ReadBoldAssertionCreationInfo( + assertion, err := staker.ReadBoldAssertionCreationInfo( ctx, rollupUserLogic, l1Client, diff --git a/staker/block_validator.go b/staker/block_validator.go index 1cc8e7eb44..5d6eaf96da 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -88,6 +88,7 @@ func NewThrottledValidationSpawner(spawner validator.ValidationSpawner) *Throttl type BlockValidator struct { stopwaiter.StopWaiter *StatelessBlockValidator + melValidator MELValidatorInterface reorgMutex sync.RWMutex @@ -636,7 +637,7 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e log.Trace("create validation entry: nothing to do", "pos", pos, "validated", v.validated()) return false, nil } - streamerMsgCount, err := v.streamer.GetProcessedMessageCount() + streamerMsgCount, err := v.streamer.GetProcessedMessageCount() // Ask MEL validator LatestValidatedMELState().MsgCount if err != nil { return false, err } @@ -644,6 +645,16 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e log.Trace("create validation entry: nothing to do", "pos", pos, "streamerMsgCount", streamerMsgCount) return false, nil } + if v.melValidator != nil { + latestValidatedState, err := v.melValidator.LatestValidatedMELState(ctx) + if err != nil { + return false, err + } + if pos >= arbutil.MessageIndex(latestValidatedState.MsgCount) { + log.Trace("create validation entry: nothing to do", "pos", pos, "latestMELValidatedMsgCount", latestValidatedState.MsgCount) + return false, nil + } + } msg, err := v.streamer.GetMessage(pos) if err != nil { return false, err diff --git a/staker/bold/bold_staker.go b/staker/bold/bold_staker.go index 54a0c10ce3..e75b0252d0 100644 --- a/staker/bold/bold_staker.go +++ b/staker/bold/bold_staker.go @@ -6,13 +6,11 @@ import ( "context" "errors" "fmt" - "math/big" "strings" "time" "github.com/spf13/pflag" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -45,20 +43,6 @@ var ( boldStakerAmountStakedGauge = metrics.GetOrRegisterGauge("arb/staker/amount_staked", nil) ) -var assertionCreatedId common.Hash - -func init() { - rollupAbi, err := rollupgen.RollupCoreMetaData.GetAbi() - if err != nil { - panic(err) - } - assertionCreatedEvent, ok := rollupAbi.Events["AssertionCreated"] - if !ok { - panic("RollupCore ABI missing AssertionCreated event") - } - assertionCreatedId = assertionCreatedEvent.ID -} - type BoldConfig struct { // How often to post assertions onchain. AssertionPostingInterval time.Duration `koanf:"assertion-posting-interval"` @@ -290,7 +274,7 @@ func (b *BOLDStaker) Initialize(ctx context.Context) error { } latestStaked = latestConfirmed } - assertion, err := ReadBoldAssertionCreationInfo( + assertion, err := staker.ReadBoldAssertionCreationInfo( ctx, rollupUserLogic, b.client, @@ -629,79 +613,3 @@ func newBOLDChallengeManager( } return manager, nil } - -// Read the creation info for an assertion by looking up its creation -// event from the rollup contracts. -func ReadBoldAssertionCreationInfo( - ctx context.Context, - rollup *rollupgen.RollupUserLogic, - client bind.ContractBackend, - rollupAddress common.Address, - assertionHash common.Hash, -) (*protocol.AssertionCreatedInfo, error) { - var creationBlock uint64 - var topics [][]common.Hash - if assertionHash == (common.Hash{}) { - rollupDeploymentBlock, err := rollup.RollupDeploymentBlock(&bind.CallOpts{Context: ctx}) - if err != nil { - return nil, err - } - if !rollupDeploymentBlock.IsUint64() { - return nil, errors.New("rollup deployment block was not a uint64") - } - creationBlock = rollupDeploymentBlock.Uint64() - } else { - var b [32]byte - copy(b[:], assertionHash[:]) - assertionCreationBlock, err := rollup.GetAssertionCreationBlockForLogLookup(&bind.CallOpts{Context: ctx}, b) - if err != nil { - return nil, err - } - if !assertionCreationBlock.IsUint64() { - return nil, errors.New("assertion creation block was not a uint64") - } - creationBlock = assertionCreationBlock.Uint64() - } - topics = [][]common.Hash{{assertionCreatedId}, {assertionHash}} - var query = ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(creationBlock), - ToBlock: new(big.Int).SetUint64(creationBlock), - Addresses: []common.Address{rollupAddress}, - Topics: topics, - } - logs, err := client.FilterLogs(ctx, query) - if err != nil { - return nil, err - } - if len(logs) == 0 { - return nil, errors.New("no assertion creation logs found") - } - if len(logs) > 1 { - return nil, errors.New("found multiple instances of requested node") - } - ethLog := logs[0] - parsedLog, err := rollup.ParseAssertionCreated(ethLog) - if err != nil { - return nil, err - } - afterState := parsedLog.Assertion.AfterState - creationL1Block, err := arbutil.CorrespondingL1BlockNumber(ctx, client, ethLog.BlockNumber) - if err != nil { - return nil, err - } - return &protocol.AssertionCreatedInfo{ - ConfirmPeriodBlocks: parsedLog.ConfirmPeriodBlocks, - RequiredStake: parsedLog.RequiredStake, - ParentAssertionHash: protocol.AssertionHash{Hash: parsedLog.ParentAssertionHash}, - BeforeState: parsedLog.Assertion.BeforeState, - AfterState: afterState, - InboxMaxCount: parsedLog.InboxMaxCount, - AfterInboxBatchAcc: parsedLog.AfterInboxBatchAcc, - AssertionHash: protocol.AssertionHash{Hash: parsedLog.AssertionHash}, - WasmModuleRoot: parsedLog.WasmModuleRoot, - ChallengeManager: parsedLog.ChallengeManager, - TransactionHash: ethLog.TxHash, - CreationParentBlock: ethLog.BlockNumber, - CreationL1Block: creationL1Block, - }, nil -} diff --git a/staker/bold_assertioncreation.go b/staker/bold_assertioncreation.go new file mode 100644 index 0000000000..a4739daaa5 --- /dev/null +++ b/staker/bold_assertioncreation.go @@ -0,0 +1,105 @@ +package staker + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi/bind/v2" + "github.com/ethereum/go-ethereum/common" + + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/bold/protocol" + "github.com/offchainlabs/nitro/solgen/go/rollupgen" +) + +var assertionCreatedId common.Hash + +func init() { + rollupAbi, err := rollupgen.RollupCoreMetaData.GetAbi() + if err != nil { + panic(err) + } + assertionCreatedEvent, ok := rollupAbi.Events["AssertionCreated"] + if !ok { + panic("RollupCore ABI missing AssertionCreated event") + } + assertionCreatedId = assertionCreatedEvent.ID +} + +// Read the creation info for an assertion by looking up its creation +// event from the rollup contracts. +func ReadBoldAssertionCreationInfo( + ctx context.Context, + rollup *rollupgen.RollupUserLogic, + client bind.ContractBackend, + rollupAddress common.Address, + assertionHash common.Hash, +) (*protocol.AssertionCreatedInfo, error) { + var creationBlock uint64 + var topics [][]common.Hash + if assertionHash == (common.Hash{}) { + rollupDeploymentBlock, err := rollup.RollupDeploymentBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return nil, err + } + if !rollupDeploymentBlock.IsUint64() { + return nil, errors.New("rollup deployment block was not a uint64") + } + creationBlock = rollupDeploymentBlock.Uint64() + } else { + var b [32]byte + copy(b[:], assertionHash[:]) + assertionCreationBlock, err := rollup.GetAssertionCreationBlockForLogLookup(&bind.CallOpts{Context: ctx}, b) + if err != nil { + return nil, err + } + if !assertionCreationBlock.IsUint64() { + return nil, errors.New("assertion creation block was not a uint64") + } + creationBlock = assertionCreationBlock.Uint64() + } + topics = [][]common.Hash{{assertionCreatedId}, {assertionHash}} + var query = ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(creationBlock), + ToBlock: new(big.Int).SetUint64(creationBlock), + Addresses: []common.Address{rollupAddress}, + Topics: topics, + } + logs, err := client.FilterLogs(ctx, query) + if err != nil { + return nil, err + } + if len(logs) == 0 { + return nil, errors.New("no assertion creation logs found") + } + if len(logs) > 1 { + return nil, errors.New("found multiple instances of requested node") + } + ethLog := logs[0] + parsedLog, err := rollup.ParseAssertionCreated(ethLog) + if err != nil { + return nil, err + } + afterState := parsedLog.Assertion.AfterState + creationL1Block, err := arbutil.CorrespondingL1BlockNumber(ctx, client, ethLog.BlockNumber) + if err != nil { + return nil, err + } + return &protocol.AssertionCreatedInfo{ + ConfirmPeriodBlocks: parsedLog.ConfirmPeriodBlocks, + RequiredStake: parsedLog.RequiredStake, + ParentAssertionHash: protocol.AssertionHash{Hash: parsedLog.ParentAssertionHash}, + BeforeState: parsedLog.Assertion.BeforeState, + AfterState: afterState, + InboxMaxCount: parsedLog.InboxMaxCount, + AfterInboxBatchAcc: parsedLog.AfterInboxBatchAcc, + AssertionHash: protocol.AssertionHash{Hash: parsedLog.AssertionHash}, + WasmModuleRoot: parsedLog.WasmModuleRoot, + ChallengeManager: parsedLog.ChallengeManager, + TransactionHash: ethLog.TxHash, + CreationParentBlock: ethLog.BlockNumber, + CreationL1Block: creationL1Block, + }, nil +} diff --git a/staker/mel_validator.go b/staker/mel_validator.go new file mode 100644 index 0000000000..5e92b75d87 --- /dev/null +++ b/staker/mel_validator.go @@ -0,0 +1,449 @@ +package staker + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "net/url" + "regexp" + "sync" + "sync/atomic" + "time" + + "github.com/spf13/pflag" + + "github.com/ethereum/go-ethereum/accounts/abi/bind/v2" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/rlp" + + "github.com/offchainlabs/nitro/arbnode/mel" + "github.com/offchainlabs/nitro/arbnode/mel/extraction" + "github.com/offchainlabs/nitro/arbnode/mel/recording" + "github.com/offchainlabs/nitro/arbnode/mel/runner" + "github.com/offchainlabs/nitro/arbstate" + "github.com/offchainlabs/nitro/daprovider" + "github.com/offchainlabs/nitro/solgen/go/rollupgen" + "github.com/offchainlabs/nitro/util/rpcclient" + "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/validator" + "github.com/offchainlabs/nitro/validator/client" + "github.com/offchainlabs/nitro/validator/client/redis" + "github.com/offchainlabs/nitro/validator/retry_wrapper" +) + +type MELValidator struct { + stopwaiter.StopWaiter + + config MELValidatorConfigFetcher + arbDb ethdb.KeyValueStore + l1Client *ethclient.Client + + boldStakerAddr common.Address + rollupAddr common.Address + rollup *rollupgen.RollupUserLogic + + messageExtractor *melrunner.MessageExtractor + dapReaders arbstate.DapReaderSource + + latestValidatedGS validator.GoGlobalState + latestValidatedParentChainBlock atomic.Uint64 + + latestWasmModuleRoot common.Hash + redisValidator *redis.ValidationClient + executionSpawners []validator.ExecutionSpawner + chosenValidator map[common.Hash]validator.ValidationSpawner + + // wasmModuleRoot + moduleMutex sync.Mutex + currentWasmModuleRoot common.Hash + pendingWasmModuleRoot common.Hash +} + +type MELValidatorConfig struct { + Enable bool `koanf:"enable"` + RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` + ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` + ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` + ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` + CurrentModuleRoot string `koanf:"current-module-root"` + PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` + ValidationServerConfigsList string `koanf:"validation-server-configs-list"` + ValidationSpawningAllowedAttempts uint64 `koanf:"validation-spawning-allowed-attempts" reload:"hot"` +} + +func (c *MELValidatorConfig) Validate() error { + if err := c.RedisValidationClientConfig.Validate(); err != nil { + return fmt.Errorf("failed to validate redis validation client config: %w", err) + } + streamsEnabled := c.RedisValidationClientConfig.Enabled() + if len(c.ValidationServerConfigs) == 0 { + c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer} + if c.ValidationServerConfigsList != "default" { + var executionServersConfigs []rpcclient.ClientConfig + if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &executionServersConfigs); err != nil && !streamsEnabled { + return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err) + } + c.ValidationServerConfigs = executionServersConfigs + } + } + for i := range c.ValidationServerConfigs { + if err := c.ValidationServerConfigs[i].Validate(); err != nil { + return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", c.ValidationServerConfigs[i].URL, err) + } + serverUrl := c.ValidationServerConfigs[i].URL + if len(serverUrl) > 0 && serverUrl != "self" && serverUrl != "self-auth" { + u, err := url.Parse(serverUrl) + if err != nil { + return fmt.Errorf("failed parsing validation server's url:%s err: %w", serverUrl, err) + } + if u.Scheme != "ws" && u.Scheme != "wss" { + return fmt.Errorf("validation server's url scheme is unsupported, it should either be ws or wss, url:%s", serverUrl) + } + } + } + return nil +} + +type MELValidatorConfigFetcher func() *MELValidatorConfig + +func MELValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) { + f.Bool(prefix+".enable", DefaultMELValidatorConfig.Enable, "enable MEL state validation") + rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultMELValidatorConfig.ValidationServer) + redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f) + f.String(prefix+".validation-server-configs-list", DefaultMELValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds") + f.Duration(prefix+".validation-poll", DefaultMELValidatorConfig.ValidationPoll, "poll time to check validations") + f.String(prefix+".current-module-root", DefaultMELValidatorConfig.CurrentModuleRoot, "current wasm module root ('current' read from chain, 'latest' from machines/latest dir, or provide hash)") + f.String(prefix+".pending-upgrade-module-root", DefaultMELValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)") + BlockValidatorDangerousConfigAddOptions(prefix+".dangerous", f) + f.Uint64(prefix+".validation-spawning-allowed-attempts", DefaultMELValidatorConfig.ValidationSpawningAllowedAttempts, "number of attempts allowed when trying to spawn a validation before erroring out") +} + +var DefaultMELValidatorConfig = MELValidatorConfig{ + Enable: false, + ValidationServerConfigsList: "default", + ValidationServer: rpcclient.DefaultClientConfig, + RedisValidationClientConfig: redis.DefaultValidationClientConfig, + ValidationPoll: time.Second, + CurrentModuleRoot: "current", + PendingUpgradeModuleRoot: "latest", + ValidationSpawningAllowedAttempts: 1, +} + +func NewMELValidator( + config MELValidatorConfigFetcher, + arbDb ethdb.KeyValueStore, + l1Client *ethclient.Client, + stack *node.Node, + messageExtractor *melrunner.MessageExtractor, + dapReaders arbstate.DapReaderSource, + latestWasmModuleRoot common.Hash, +) (*MELValidator, error) { + var executionSpawners []validator.ExecutionSpawner + configs := config().ValidationServerConfigs + for i := range configs { + confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] } + executionSpawner := client.NewExecutionClient(confFetcher, stack) + executionSpawners = append(executionSpawners, executionSpawner) + } + if len(executionSpawners) == 0 { + return nil, errors.New("no enabled execution servers") + } + var redisValClient *redis.ValidationClient + if config().RedisValidationClientConfig.Enabled() { + var err error + redisValClient, err = redis.NewValidationClient(&config().RedisValidationClientConfig) + if err != nil { + return nil, fmt.Errorf("creating new redis validation client: %w", err) + } + } + if latestWasmModuleRoot == (common.Hash{}) { + return nil, errors.New("latestWasmModuleRoot not set") + } + return &MELValidator{ + config: config, + arbDb: arbDb, + l1Client: l1Client, + messageExtractor: messageExtractor, + dapReaders: dapReaders, + latestWasmModuleRoot: latestWasmModuleRoot, + redisValidator: redisValClient, + executionSpawners: executionSpawners, + }, nil +} + +func (mv *MELValidator) Initialize(ctx context.Context) error { + config := mv.config() + currentModuleRoot := config.CurrentModuleRoot + switch currentModuleRoot { + case "latest": + mv.currentWasmModuleRoot = mv.latestWasmModuleRoot + case "current": + if (mv.currentWasmModuleRoot == common.Hash{}) { + return errors.New("wasmModuleRoot set to 'current' - but info not set from chain") + } + default: + mv.currentWasmModuleRoot = common.HexToHash(currentModuleRoot) + if (mv.currentWasmModuleRoot == common.Hash{}) { + return errors.New("current-module-root config value illegal") + } + } + pendingModuleRoot := config.PendingUpgradeModuleRoot + if pendingModuleRoot != "" { + if pendingModuleRoot == "latest" { + mv.pendingWasmModuleRoot = mv.latestWasmModuleRoot + } else { + valid, _ := regexp.MatchString("(0x)?[0-9a-fA-F]{64}", pendingModuleRoot) + mv.pendingWasmModuleRoot = common.HexToHash(pendingModuleRoot) + if (!valid || mv.pendingWasmModuleRoot == common.Hash{}) { + return errors.New("pending-upgrade-module-root config value illegal") + } + } + } + log.Info("MELValidator initialized", "current", mv.currentWasmModuleRoot, "pending", mv.pendingWasmModuleRoot) + moduleRoots := mv.GetModuleRootsToValidate() + // First spawner is always RedisValidationClient if RedisStreams are enabled. + if mv.redisValidator != nil { + err := mv.redisValidator.Initialize(ctx, moduleRoots) + if err != nil { + return err + } + } + mv.chosenValidator = make(map[common.Hash]validator.ValidationSpawner) + for _, root := range moduleRoots { + if mv.redisValidator != nil && validator.SpawnerSupportsModule(mv.redisValidator, root) { + mv.chosenValidator[root] = mv.redisValidator + log.Info("validator chosen", "WasmModuleRoot", root, "chosen", "redis", "maxWorkers", mv.redisValidator.Capacity()) + } else { + for _, spawner := range mv.executionSpawners { + if validator.SpawnerSupportsModule(spawner, root) { + mv.chosenValidator[root] = spawner + log.Info("validator chosen", "WasmModuleRoot", root, "chosen", spawner.Name(), "maxWorkers", spawner.Capacity()) + break + } + } + if mv.chosenValidator[root] == nil { + return fmt.Errorf("cannot validate WasmModuleRoot %v", root) + } + } + } + return nil +} + +func (mv *MELValidator) Start(ctx context.Context) { + mv.CallIteratively(func(ctx context.Context) time.Duration { + latestStaked, err := mv.rollup.LatestStakedAssertion(&bind.CallOpts{}, mv.boldStakerAddr) + if err != nil { + log.Error("MEL validator: Error fetching latest staked assertion hash", "err", err) + return 0 + } + latestStakedAssertion, err := ReadBoldAssertionCreationInfo(ctx, mv.rollup, mv.l1Client, mv.rollupAddr, latestStaked) + if err != nil { + log.Error("MEL validator: Error fetching latest staked assertion creation info", "err", err) + return 0 + } + if latestStakedAssertion.InboxMaxCount == nil || !latestStakedAssertion.InboxMaxCount.IsUint64() { + log.Error("MEL validator: latestStakedAssertion.InboxMaxCount is not uint64") + return 0 + } + + // Create validation entry + entry, endGSParentChainBlockNumber, err := mv.CreateNextValidationEntry(ctx, mv.latestValidatedParentChainBlock.Load(), latestStakedAssertion.InboxMaxCount.Uint64()) + if err != nil { + log.Error("MEL validator: Error creating validation entry", "latestValidatedParentChainBlock", mv.latestValidatedParentChainBlock.Load(), "inboxMaxCount", latestStakedAssertion.InboxMaxCount.Uint64(), "err", err) + return 0 + } + if entry == nil { // nothing to create, so lets wait for latestStakedAssertion to progress through blockValidator + return time.Minute + } + + // Send validation entry to validation nodes + doneEntry, err := mv.SendValidationEntry(ctx, entry) + if err != nil { + log.Error("MEL validator: Error sending validation entry", "err", err) + return 0 + } + + // Advance validations + if err := mv.AdvanceValidations(ctx, doneEntry); err != nil { + log.Error("MEL validator: Error advancing validation status", "err", err) + } + mv.latestValidatedParentChainBlock.Store(endGSParentChainBlockNumber) + mv.latestValidatedGS = doneEntry.End + return 0 + }) +} + +func (mv *MELValidator) LatestValidatedMELState(ctx context.Context) (*mel.State, error) { + return mv.messageExtractor.GetState(ctx, mv.latestValidatedParentChainBlock.Load()) +} + +func (mv *MELValidator) SetCurrentWasmModuleRoot(hash common.Hash) error { + mv.moduleMutex.Lock() + defer mv.moduleMutex.Unlock() + + if (hash == common.Hash{}) { + return errors.New("trying to set zero as wasmModuleRoot") + } + if hash == mv.currentWasmModuleRoot { + return nil + } + if (mv.currentWasmModuleRoot == common.Hash{}) { + mv.currentWasmModuleRoot = hash + return nil + } + if mv.pendingWasmModuleRoot == hash { + log.Info("Block validator: detected progressing to pending machine", "hash", hash) + mv.currentWasmModuleRoot = hash + return nil + } + if mv.config().CurrentModuleRoot != "current" { + return nil + } + return fmt.Errorf( + "unexpected wasmModuleRoot! cannot validate! found %v, current %v, pending %v", + hash, mv.currentWasmModuleRoot, mv.pendingWasmModuleRoot, + ) +} + +func (mv *MELValidator) GetModuleRootsToValidate() []common.Hash { + mv.moduleMutex.Lock() + defer mv.moduleMutex.Unlock() + + validatingModuleRoots := []common.Hash{mv.currentWasmModuleRoot} + if mv.currentWasmModuleRoot != mv.pendingWasmModuleRoot && mv.pendingWasmModuleRoot != (common.Hash{}) { + validatingModuleRoots = append(validatingModuleRoots, mv.pendingWasmModuleRoot) + } + return validatingModuleRoots +} + +func (mv *MELValidator) CreateNextValidationEntry(ctx context.Context, latestValidatedParentChainBlock, toValidateMsgExtractionCount uint64) (*validationEntry, uint64, error) { + if latestValidatedParentChainBlock == 0 { // TODO: last validated. + // ending position- bold staker latest posted assertion on chain that it agrees with (l1blockhash)- + return nil, 0, errors.New("trying to create validation entry for zero block number") + } + preState, err := mv.messageExtractor.GetState(ctx, latestValidatedParentChainBlock) + if err != nil { + return nil, 0, err + } + // We have already validated message extraction of messages till count toValidateMsgExtractionCount, so can return early + // and wait for block validator to progress the toValidateMsgExtractionCount + if preState.MsgCount >= toValidateMsgExtractionCount { + return nil, 0, nil + } + preimages := make(daprovider.PreimagesMap) + delayedMsgRecordingDB, err := melrecording.NewDelayedMsgDatabase(mv.arbDb, preimages) + if err != nil { + return nil, 0, err + } + recordingDAPReaders, err := melrecording.NewDAPReaderSource(ctx, mv.dapReaders, preimages) + if err != nil { + return nil, 0, err + } + var state *mel.State // to be used in endGS + for i := latestValidatedParentChainBlock + 1; ; i++ { + header, err := mv.l1Client.HeaderByNumber(ctx, new(big.Int).SetUint64(i)) + if err != nil { + return nil, 0, err + } + txsRecorder, err := melrecording.NewTransactionRecorder(mv.l1Client, header.Hash(), preimages) + if err != nil { + return nil, 0, err + } + if err := txsRecorder.Initialize(ctx); err != nil { + return nil, 0, err + } + receiptsRecorder, err := melrecording.NewReceiptRecorder(mv.l1Client, header.Hash(), preimages) + if err != nil { + return nil, 0, err + } + if err := receiptsRecorder.Initialize(ctx); err != nil { + return nil, 0, err + } + state, _, _, _, err = melextraction.ExtractMessages(ctx, preState, header, recordingDAPReaders, delayedMsgRecordingDB, txsRecorder, receiptsRecorder, nil) + if err != nil { + return nil, 0, fmt.Errorf("error calling melextraction.ExtractMessages in recording mode: %w", err) + } + wantState, err := mv.messageExtractor.GetState(ctx, i) + if err != nil { + return nil, 0, err + } + if state.Hash() != wantState.Hash() { + return nil, 0, fmt.Errorf("calculated MEL state hash in recording mode doesn't match the one computed in native mode, parentchainBlocknumber: %d", i) + } + if err := receiptsRecorder.CollectTxIndicesPreimage(); err != nil { + return nil, 0, err + } + if state.MsgCount >= toValidateMsgExtractionCount { + break + } + preState = state + } + endGs := validator.GoGlobalState{ + // After MEL fields get added to GlobalState + // MELStateHash: state.Hash(), + // PositionInMEL: preState.MsgCount - 1, + } + return &validationEntry{ + Start: mv.latestValidatedGS, + End: endGs, + Preimages: preimages, + EndParentChainBlockHash: state.ParentChainBlockHash, + }, state.ParentChainBlockNumber, nil +} + +func (mv *MELValidator) SendValidationEntry(ctx context.Context, entry *validationEntry) (*validationDoneEntry, error) { + wasmRoots := mv.GetModuleRootsToValidate() + var runs []validator.ValidationRun + for _, moduleRoot := range wasmRoots { + chosenSpawner := mv.chosenValidator[moduleRoot] + spawner := retry_wrapper.NewValidationSpawnerRetryWrapper(chosenSpawner) + spawner.StopWaiter.Start(ctx, mv) + input, err := entry.ToInput(nil) + if err != nil && ctx.Err() == nil { + return nil, fmt.Errorf("%w: error preparing validation", err) + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + run := spawner.LaunchWithNAllowedAttempts(input, moduleRoot, mv.config().ValidationSpawningAllowedAttempts) + log.Trace("sendValidations: launched", "pos", entry.Pos, "moduleRoot", moduleRoot) + runs = append(runs, run) + } + for _, run := range runs { + runEnd, err := run.Await(ctx) + if err == nil && runEnd != entry.End { + err = fmt.Errorf("validation failed: got %v", runEnd) + } + if err != nil { + return nil, fmt.Errorf("MEL validator: error while validating: %w", err) + } + } + return &validationDoneEntry{ + Success: true, + Start: entry.Start, + End: entry.End, + WasmModuleRoots: wasmRoots, + }, nil +} + +func (mv *MELValidator) AdvanceValidations(ctx context.Context, doneEntry *validationDoneEntry) error { + info := GlobalStateValidatedInfo{ + GlobalState: doneEntry.End, + WasmRoots: doneEntry.WasmModuleRoots, + } + encoded, err := rlp.EncodeToBytes(info) + if err != nil { + return err + } + err = mv.arbDb.Put(lastMELGlobalStateValidatedInfoKey, encoded) + if err != nil { + return err + } + return nil +} diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index d4f392c1c0..d1debb322d 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -18,6 +18,7 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" + "github.com/offchainlabs/nitro/arbnode/mel" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/daprovider" @@ -72,6 +73,10 @@ type TransactionStreamerInterface interface { ChainConfig() *params.ChainConfig } +type MELValidatorInterface interface { + LatestValidatedMELState(context.Context) (*mel.State, error) +} + type InboxReaderInterface interface { GetSequencerMessageBytes(ctx context.Context, seqNum uint64) ([]byte, common.Hash, error) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) @@ -143,9 +148,10 @@ type validationEntry struct { // Has batch when created - others could be added on record BatchInfo []validator.BatchInfo // Valid since Ready - Preimages daprovider.PreimagesMap - UserWasms state.UserWasms - DelayedMsg []byte + Preimages daprovider.PreimagesMap + UserWasms state.UserWasms + DelayedMsg []byte + EndParentChainBlockHash common.Hash } func (e *validationEntry) ToInput(stylusArchs []rawdb.WasmTarget) (*validator.ValidationInput, error) { diff --git a/staker/block_validator_schema.go b/staker/validator_schema.go similarity index 53% rename from staker/block_validator_schema.go rename to staker/validator_schema.go index 91b0ac2dad..db37c8e2d7 100644 --- a/staker/block_validator_schema.go +++ b/staker/validator_schema.go @@ -21,6 +21,7 @@ type GlobalStateValidatedInfo struct { } var ( - lastGlobalStateValidatedInfoKey = []byte("_lastGlobalStateValidatedInfo") // contains a rlp encoded lastBlockValidatedDbInfo - legacyLastBlockValidatedInfoKey = []byte("_lastBlockValidatedInfo") // LEGACY - contains a rlp encoded lastBlockValidatedDbInfo + lastMELGlobalStateValidatedInfoKey = []byte("_lastMELGlobalStateValidatedInfo") // contains a rlp encoded GlobalStateValidatedInfo of the last validated MEL state + lastGlobalStateValidatedInfoKey = []byte("_lastGlobalStateValidatedInfo") // contains a rlp encoded lastBlockValidatedDbInfo + legacyLastBlockValidatedInfoKey = []byte("_lastBlockValidatedInfo") // LEGACY - contains a rlp encoded lastBlockValidatedDbInfo ) diff --git a/system_tests/message_extraction_layer_test.go b/system_tests/message_extraction_layer_test.go index e9e5c9bacd..4b8f2e33e9 100644 --- a/system_tests/message_extraction_layer_test.go +++ b/system_tests/message_extraction_layer_test.go @@ -19,7 +19,7 @@ import ( "github.com/offchainlabs/nitro/arbcompress" "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbnode/mel" - melrunner "github.com/offchainlabs/nitro/arbnode/mel/runner" + "github.com/offchainlabs/nitro/arbnode/mel/runner" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/cmd/chaininfo" @@ -27,7 +27,7 @@ import ( "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/solgen/go/precompilesgen" "github.com/offchainlabs/nitro/solgen/go/rollupgen" - "github.com/offchainlabs/nitro/staker/bold" + "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/testhelpers" ) @@ -816,7 +816,7 @@ func createInitialMELState( Require(t, err) confirmedHash, err := rollup.LatestConfirmed(&bind.CallOpts{}) Require(t, err) - latestConfirmedAssertion, err := bold.ReadBoldAssertionCreationInfo( + latestConfirmedAssertion, err := staker.ReadBoldAssertionCreationInfo( ctx, rollup, client, diff --git a/system_tests/message_extraction_layer_validation_test.go b/system_tests/message_extraction_layer_validation_test.go new file mode 100644 index 0000000000..f33cc6d0d7 --- /dev/null +++ b/system_tests/message_extraction_layer_validation_test.go @@ -0,0 +1,127 @@ +package arbtest + +import ( + "context" + "fmt" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + + "github.com/offchainlabs/nitro/arbnode/mel/extraction" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/daprovider" + "github.com/offchainlabs/nitro/mel-replay" + "github.com/offchainlabs/nitro/staker" +) + +func TestMELValidator_Recording_Preimages(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.L2Info.GenerateAccount("User2") + builder.nodeConfig.BatchPoster.Post4844Blobs = true + builder.nodeConfig.BatchPoster.IgnoreBlobPrice = true + builder.nodeConfig.BatchPoster.MaxDelay = time.Hour // set high max-delay so we can test the delay buffer + builder.nodeConfig.BatchPoster.PollInterval = time.Hour // set a high poll interval to avoid continuous polling + cleanup := builder.Build(t) + defer cleanup() + + // Post a blob batch with a bunch of txs + startBlock, err := builder.L1.Client.BlockNumber(ctx) + Require(t, err) + testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{}) + defer cleanupB() + initialBatchCount := GetBatchCount(t, builder) + var txs types.Transactions + for i := 0; i < 20; i++ { + tx, _ := builder.L2.TransferBalance(t, "Faucet", "User2", big.NewInt(1e12), builder.L2Info) + txs = append(txs, tx) + } + builder.nodeConfig.BatchPoster.MaxDelay = 0 + builder.L2.ConsensusConfigFetcher.Set(builder.nodeConfig) + _, err = builder.L2.ConsensusNode.BatchPoster.MaybePostSequencerBatch(ctx) + Require(t, err) + for _, tx := range txs { + _, err := testClientB.EnsureTxSucceeded(tx) + Require(t, err, "tx not found on second node") + } + CheckBatchCount(t, builder, initialBatchCount+1) + + // Post delayed messages + forceDelayedBatchPosting(t, ctx, builder, testClientB, 10, 0) + + // MEL Validator: create validation entry + blobReaderRegistry := daprovider.NewDAProviderRegistry() + Require(t, blobReaderRegistry.SetupBlobReader(daprovider.NewReaderForBlobReader(builder.L1.L1BlobReader))) + config := func() *staker.MELValidatorConfig { return &staker.DefaultMELValidatorConfig } + Require(t, config().Validate()) + melValidator, err := staker.NewMELValidator(config, builder.L2.ConsensusNode.ConsensusDB, builder.L1.Client, builder.L1.Stack, builder.L2.ConsensusNode.MessageExtractor, blobReaderRegistry, common.MaxHash) + Require(t, err) + extractedMsgCount, err := builder.L2.ConsensusNode.TxStreamer.GetMessageCount() + Require(t, err) + entry, _, err := melValidator.CreateNextValidationEntry(ctx, startBlock, uint64(extractedMsgCount)) + Require(t, err) + + // Represents running of MEL validation using preimages in wasm mode. TODO: remove this once we have validation wired + preimageResolver := melreplay.NewTypeBasedPreimageResolver( + arbutil.Keccak256PreimageType, + entry.Preimages, + ) + state, err := builder.L2.ConsensusNode.MessageExtractor.GetState(ctx, startBlock) + Require(t, err) + preimagesBasedDelayedDb := melreplay.NewDelayedMessageDatabase(preimageResolver) + preimagesBasedDapReaders := daprovider.NewDAProviderRegistry() + blobReader := &blobPreimageReader{ + melreplay.NewTypeBasedPreimageResolver( + arbutil.EthVersionedHashPreimageType, + entry.Preimages, + ), + } + Require(t, preimagesBasedDapReaders.SetupBlobReader(daprovider.NewReaderForBlobReader(blobReader))) + for state.MsgCount < uint64(extractedMsgCount) { + header, err := builder.L1.Client.HeaderByNumber(ctx, new(big.Int).SetUint64(state.ParentChainBlockNumber+1)) + Require(t, err) + preimagesBasedTxsFetcher := melreplay.NewTransactionFetcher(header, preimageResolver) + preimagesBasedLogsFetcher := melreplay.NewLogsFetcher(header, preimageResolver) + postState, _, _, _, err := melextraction.ExtractMessages(ctx, state, header, preimagesBasedDapReaders, preimagesBasedDelayedDb, preimagesBasedTxsFetcher, preimagesBasedLogsFetcher, nil) + Require(t, err) + wantState, err := builder.L2.ConsensusNode.MessageExtractor.GetState(ctx, state.ParentChainBlockNumber+1) + Require(t, err) + if postState.Hash() != wantState.Hash() { + t.Fatalf("MEL state mismatch") + } + state = postState + } +} + +type blobPreimageReader struct { + preimageResolver melreplay.PreimageResolver +} + +func (b *blobPreimageReader) Initialize(ctx context.Context) error { return nil } + +func (b *blobPreimageReader) GetBlobs( + ctx context.Context, + batchBlockHash common.Hash, + versionedHashes []common.Hash, +) ([]kzg4844.Blob, error) { + var blobs []kzg4844.Blob + for _, h := range versionedHashes { + var blob kzg4844.Blob + preimage, err := b.preimageResolver.ResolveTypedPreimage(arbutil.EthVersionedHashPreimageType, h) + if err != nil { + return nil, err + } + if len(preimage) != len(blob) { + return nil, fmt.Errorf("for blob %v got back preimage of length %v but expected blob length %v", h, len(preimage), len(blob)) + } + copy(blob[:], preimage) + blobs = append(blobs, blob) + } + return blobs, nil +}