diff --git a/core/blockchain.go b/core/blockchain.go index 0b92a94b6c6..071118af2a8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -112,6 +112,9 @@ var ( errChainStopped = errors.New("blockchain is stopped") errInvalidOldChain = errors.New("invalid old chain") errInvalidNewChain = errors.New("invalid new chain") + + avgAccessDepthInBlock = metrics.NewRegisteredGauge("trie/access/depth/avg", nil) + minAccessDepthInBlock = metrics.NewRegisteredGauge("trie/access/depth/min", nil) ) var ( @@ -331,6 +334,8 @@ type BlockChain struct { processor Processor // Block transaction processor interface logger *tracing.Hooks + stateSizeGen *state.StateSizeGenerator // State size tracking + lastForkReadyAlert time.Time // Last time there was a fork readiness print out } @@ -523,6 +528,11 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine, if bc.cfg.TxLookupLimit >= 0 { bc.txIndexer = newTxIndexer(uint64(bc.cfg.TxLookupLimit), bc) } + + // Start state size tracker + bc.stateSizeGen = state.NewStateSizeGenerator(bc.statedb.DiskDB(), bc.triedb, head.Root) + log.Info("Started state size generator", "root", head.Root) + return bc, nil } @@ -1249,6 +1259,12 @@ func (bc *BlockChain) stopWithoutSaving() { // Signal shutdown to all goroutines. bc.InterruptInsert(true) + // Stop state size generator if running + if bc.stateSizeGen != nil { + bc.stateSizeGen.Stop() + log.Info("Stopped state size generator") + } + // Now wait for all chain modifications to end and persistent goroutines to exit. // // Note: Close waits for the mutex to become available, i.e. any running chain @@ -1310,6 +1326,7 @@ func (bc *BlockChain) Stop() { } } } + // Allow tracers to clean-up and release resources. if bc.logger != nil && bc.logger.OnClose != nil { bc.logger.OnClose() @@ -1583,10 +1600,15 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. log.Crit("Failed to write block into disk", "err", err) } // Commit all cached state changes into underlying memory database. - root, err := statedb.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), bc.chainConfig.IsCancun(block.Number(), block.Time())) + root, stateUpdate, err := statedb.CommitWithUpdate(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), bc.chainConfig.IsCancun(block.Number(), block.Time())) if err != nil { return err } + + // Track state size changes if generator is running + if bc.stateSizeGen != nil && stateUpdate != nil { + bc.stateSizeGen.Track(stateUpdate) + } // If node is running in path mode, skip explicit gc operation // which is unnecessary in this mode. if bc.triedb.Scheme() == rawdb.PathScheme { @@ -2083,6 +2105,7 @@ func (bc *BlockChain) processBlock(parentRoot common.Hash, block *types.Block, s return nil, fmt.Errorf("stateless self-validation receipt root mismatch (cross: %x local: %x)", crossReceiptRoot, block.ReceiptHash()) } } + xvtime := time.Since(xvstart) proctime := time.Since(startTime) // processing + validation + cross validation @@ -2118,6 +2141,24 @@ func (bc *BlockChain) processBlock(parentRoot common.Hash, block *types.Block, s if err != nil { return nil, err } + + // If witness was generated, update metrics regarding the access paths. + if witness != nil { + paths := witness.Paths + totaldepth, pathnum, mindepth := 0, 0, -1 + if len(paths) > 0 { + for path, _ := range paths { + if len(path) < mindepth || mindepth < 0 { + mindepth = len(path) + } + totaldepth += len(path) + pathnum++ + } + avgAccessDepthInBlock.Update(int64(totaldepth) / int64(pathnum)) + minAccessDepthInBlock.Update(int64(mindepth)) + } + } + // Update the metrics touched during block commit accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index 859566f722f..63d0d0fff35 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -187,3 +187,16 @@ func WriteTransitionStatus(db ethdb.KeyValueWriter, data []byte) { log.Crit("Failed to store the eth2 transition status", "err", err) } } + +// WriteStateSizeMetrics writes the state size metrics to the database +func WriteStateSizeMetrics(db ethdb.KeyValueWriter, data []byte) { + if err := db.Put(stateSizeMetricsKey, data); err != nil { + log.Warn("Failed to store state size metrics", "err", err) + } +} + +// ReadStateSizeMetrics reads the state size metrics from the database +func ReadStateSizeMetrics(db ethdb.KeyValueReader) []byte { + data, _ := db.Get(stateSizeMetricsKey) + return data +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 72f9bd34eca..73a37900789 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -100,6 +100,9 @@ var ( // snapSyncStatusFlagKey flags that status of snap sync. snapSyncStatusFlagKey = []byte("SnapSyncStatus") + // stateSizeMetricsKey tracks the state size metrics. + stateSizeMetricsKey = []byte("state-size-metrics") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td (deprecated) diff --git a/core/state/database.go b/core/state/database.go index b46e5d500d6..1620841aa9f 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -124,6 +124,10 @@ type Trie interface { // The returned map could be nil if the witness is empty. Witness() map[string]struct{} + // WitnessPaths returns a set of paths for all trie nodes. For future reference, + // witness can be deprecated and used as a replacement to witness. + WitnessPaths() map[string]struct{} + // NodeIterator returns an iterator that returns nodes of the trie. Iteration // starts at the key after the given start key. And error will be returned // if fails to create node iterator. @@ -277,6 +281,11 @@ func (db *CachingDB) TrieDB() *triedb.Database { return db.triedb } +// DiskDB returns the underlying disk database for direct access. +func (db *CachingDB) DiskDB() ethdb.KeyValueStore { + return db.disk +} + // PointCache returns the cache of evaluated curve points. func (db *CachingDB) PointCache() *utils.PointCache { return db.pointCache diff --git a/core/state/metrics.go b/core/state/metrics.go index dd4b2e98381..57f6d4a4edd 100644 --- a/core/state/metrics.go +++ b/core/state/metrics.go @@ -29,4 +29,14 @@ var ( storageTriesUpdatedMeter = metrics.NewRegisteredMeter("state/update/storagenodes", nil) accountTrieDeletedMeter = metrics.NewRegisteredMeter("state/delete/accountnodes", nil) storageTriesDeletedMeter = metrics.NewRegisteredMeter("state/delete/storagenodes", nil) + + // State size metrics + accountCountGauge = metrics.NewRegisteredGauge("state/account/count", nil) + accountBytesGauge = metrics.NewRegisteredGauge("state/account/bytes", nil) + storageCountGauge = metrics.NewRegisteredGauge("state/storage/count", nil) + storageBytesGauge = metrics.NewRegisteredGauge("state/storage/bytes", nil) + trienodeCountGauge = metrics.NewRegisteredGauge("state/trienode/count", nil) + trienodeBytesGauge = metrics.NewRegisteredGauge("state/trienode/bytes", nil) + contractCountGauge = metrics.NewRegisteredGauge("state/contract/count", nil) + contractBytesGauge = metrics.NewRegisteredGauge("state/contract/bytes", nil) ) diff --git a/core/state/statedb.go b/core/state/statedb.go index efb09a08a02..cc95d448e4b 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -185,6 +185,7 @@ func NewWithReader(root common.Hash, db Database, reader Reader) (*StateDB, erro if db.TrieDB().IsVerkle() { sdb.accessEvents = NewAccessEvents(db.PointCache()) } + return sdb, nil } @@ -841,7 +842,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // If witness building is enabled and the state object has a trie, // gather the witnesses for its specific storage trie if s.witness != nil && obj.trie != nil { - s.witness.AddState(obj.trie.Witness()) + s.witness.AddState(obj.trie.Witness(), obj.trie.WitnessPaths()) } } return nil @@ -858,9 +859,9 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { continue } if trie := obj.getPrefetchedTrie(); trie != nil { - s.witness.AddState(trie.Witness()) + s.witness.AddState(trie.Witness(), trie.WitnessPaths()) } else if obj.trie != nil { - s.witness.AddState(obj.trie.Witness()) + s.witness.AddState(obj.trie.Witness(), obj.trie.WitnessPaths()) } } // Pull in only-read and non-destructed trie witnesses @@ -874,9 +875,9 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { continue } if trie := obj.getPrefetchedTrie(); trie != nil { - s.witness.AddState(trie.Witness()) + s.witness.AddState(trie.Witness(), trie.WitnessPaths()) } else if obj.trie != nil { - s.witness.AddState(obj.trie.Witness()) + s.witness.AddState(obj.trie.Witness(), obj.trie.WitnessPaths()) } } } @@ -942,7 +943,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // If witness building is enabled, gather the account trie witness if s.witness != nil { - s.witness.AddState(s.trie.Witness()) + s.witness.AddState(s.trie.Witness(), nil) } return hash } @@ -1356,6 +1357,16 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool, noStorageWiping return ret.root, nil } +// CommitWithUpdate writes the state mutations and returns both the root hash and the state update. +// This is useful for tracking state changes at the blockchain level. +func (s *StateDB) CommitWithUpdate(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, *stateUpdate, error) { + ret, err := s.commitAndFlush(block, deleteEmptyObjects, noStorageWiping) + if err != nil { + return common.Hash{}, nil, err + } + return ret.root, ret, nil +} + // Prepare handles the preparatory steps for executing a state transition with. // This method must be invoked before state transition. // diff --git a/core/state/statesize.go b/core/state/statesize.go new file mode 100644 index 00000000000..a696958064f --- /dev/null +++ b/core/state/statesize.go @@ -0,0 +1,519 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package state + +import ( + "context" + "encoding/json" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/triedb" + "golang.org/x/sync/errgroup" +) + +var ( + accountSnapKeySize = int64(len(rawdb.SnapshotAccountPrefix) + common.HashLength) + storageSnapKeySize = int64(len(rawdb.SnapshotStoragePrefix) + common.HashLength + common.HashLength) + accountTrieKeyPrefixSize = int64(len(rawdb.TrieNodeAccountPrefix)) + storageTrieKeyPrefixSize = int64(len(rawdb.TrieNodeStoragePrefix) + common.HashLength) + codeKeySize = int64(len(rawdb.CodePrefix) + common.HashLength) +) + +// stateSizeMetrics represents the current state size statistics +type stateSizeMetrics struct { + Root common.Hash // Root hash of the state trie + AccountCount int64 + AccountBytes int64 + StorageCount int64 + StorageBytes int64 + TrieNodeCount int64 + TrieNodeBytes int64 + ContractCount int64 + ContractBytes int64 +} + +// StateSizeGenerator handles the initialization and tracking of state size metrics +type StateSizeGenerator struct { + db ethdb.KeyValueStore + triedb *triedb.Database + abort chan struct{} + done chan struct{} + updateChan chan *stateUpdate // Async message channel for updates + metrics *stateSizeMetrics + buffered *stateSizeMetrics +} + +// NewStateSizeGenerator creates a new state size generator and starts it automatically +func NewStateSizeGenerator(db ethdb.KeyValueStore, triedb *triedb.Database, root common.Hash) *StateSizeGenerator { + g := &StateSizeGenerator{ + db: db, + triedb: triedb, + abort: make(chan struct{}), + done: make(chan struct{}), + updateChan: make(chan *stateUpdate, 1000), // Buffered channel for updates + metrics: &stateSizeMetrics{Root: root}, + buffered: &stateSizeMetrics{Root: root}, + } + + // Start the generator automatically + go g.generate() + + return g +} + +// Stop terminates the background generation and persists the metrics. +func (g *StateSizeGenerator) Stop() { + close(g.abort) + + <-g.done + + g.persistMetrics() +} + +// generate performs the state size initialization and handles updates +func (g *StateSizeGenerator) generate() { + defer close(g.done) + + var inited bool + + // Check if we already have existing metrics + if g.hasExistingMetrics() { + log.Info("State size metrics already initialized") + inited = true + } + + initDone := g.initialize() + + for { + select { + case update := <-g.updateChan: + g.handleUpdate(update, inited) + + case <-g.abort: + log.Info("State size generation aborted") + + // Wait for initialization to complete with timeout + if initDone != nil { + select { + case <-initDone: + log.Debug("Initialization completed before abort") + case <-time.After(5 * time.Second): + log.Warn("Initialization did not finish in time during abort") + } + } + return + + case <-initDone: + // Initialization completed, merge buffered metrics + if g.buffered != nil { + log.Info("Merging buffered metrics into main metrics") + g.metrics.Root = g.buffered.Root + g.metrics.AccountCount += g.buffered.AccountCount + g.metrics.AccountBytes += g.buffered.AccountBytes + g.metrics.StorageCount += g.buffered.StorageCount + g.metrics.StorageBytes += g.buffered.StorageBytes + g.metrics.TrieNodeCount += g.buffered.TrieNodeCount + g.metrics.TrieNodeBytes += g.buffered.TrieNodeBytes + g.metrics.ContractCount += g.buffered.ContractCount + g.metrics.ContractBytes += g.buffered.ContractBytes + + g.buffered = nil + } + + inited = true + initDone = nil // Clear the channel to prevent future selects + } + } +} + +// initialize starts the initialization process if not already initialized +func (g *StateSizeGenerator) initialize() chan struct{} { + done := make(chan struct{}) + + // Wait for snapshot completion and then initialize + go func() { + defer close(done) + + LOOP: + // Wait for snapshot generator to complete first + for { + root, done := g.triedb.SnapshotCompleted() + if done { + g.metrics.Root = root + g.buffered.Root = root + break LOOP + } + + select { + case <-g.abort: + log.Info("State size initialization aborted during snapshot wait") + return + case <-time.After(10 * time.Second): + // Continue checking for snapshot completion + } + } + + // Start actual initialization + start := time.Now() + log.Info("Starting state size initialization") + if err := g.initializeMetrics(); err != nil { + log.Error("Failed to initialize state size metrics", "err", err) + return + } + + log.Info("Completed state size initialization", "elapsed", time.Since(start)) + }() + + return done +} + +// handleUpdate processes a single update with proper root continuity checking +func (g *StateSizeGenerator) handleUpdate(update *stateUpdate, initialized bool) { + diff := g.calculateUpdateDiff(update) + + var targetMetrics *stateSizeMetrics + if initialized { + targetMetrics = g.metrics + } else { + targetMetrics = g.buffered + } + + // Check root continuity - the update should build on our current state + if targetMetrics.Root != (common.Hash{}) && targetMetrics.Root != update.originRoot { + log.Warn("State update root discontinuity detected", "current", targetMetrics.Root, "updateOrigin", update.originRoot, "updateNew", update.root) + } + + // Update to the new state root + targetMetrics.Root = update.root + targetMetrics.AccountCount += diff.AccountCount + targetMetrics.AccountBytes += diff.AccountBytes + targetMetrics.StorageCount += diff.StorageCount + targetMetrics.StorageBytes += diff.StorageBytes + targetMetrics.TrieNodeCount += diff.TrieNodeCount + targetMetrics.TrieNodeBytes += diff.TrieNodeBytes + targetMetrics.ContractCount += diff.ContractCount + targetMetrics.ContractBytes += diff.ContractBytes + + // Fire the metrics and persist only if initialization is done + if initialized { + g.updateMetrics() + g.persistMetrics() + } +} + +// calculateUpdateDiff calculates the diff for a state update +func (g *StateSizeGenerator) calculateUpdateDiff(update *stateUpdate) stateSizeMetrics { + var diff stateSizeMetrics + + // Calculate account changes + for addr, oldValue := range update.accountsOrigin { + addrHash := crypto.Keccak256Hash(addr.Bytes()) + newValue, exists := update.accounts[addrHash] + if !exists { + log.Warn("State update missing account", "address", addr) + continue + } + + oldLen, newLen := len(oldValue), len(newValue) + if oldLen > 0 && newLen == 0 { + // Account deletion + diff.AccountCount -= 1 + diff.AccountBytes -= accountSnapKeySize + int64(oldLen) + } else if oldLen == 0 && newLen > 0 { + // Account creation + diff.AccountCount += 1 + diff.AccountBytes += accountSnapKeySize + int64(newLen) + } else { + // Account update + diff.AccountBytes += int64(newLen - oldLen) + } + } + + // Calculate storage changes + for addr, slots := range update.storagesOrigin { + addrHash := crypto.Keccak256Hash(addr.Bytes()) + subset, exists := update.storages[addrHash] + if !exists { + log.Warn("State update missing storage", "address", addr) + continue + } + for key, oldValue := range slots { + var ( + exists bool + newValue []byte + ) + if update.rawStorageKey { + newValue, exists = subset[crypto.Keccak256Hash(key.Bytes())] + } else { + newValue, exists = subset[key] + } + if !exists { + log.Warn("State update missing storage slot", "address", addr, "key", key) + continue + } + + oldLen, newLen := len(oldValue), len(newValue) + if oldLen > 0 && newLen == 0 { + // Storage deletion + diff.StorageCount -= 1 + diff.StorageBytes -= storageSnapKeySize + int64(oldLen) + } else if oldLen == 0 && newLen > 0 { + // Storage creation + diff.StorageCount += 1 + diff.StorageBytes += storageSnapKeySize + int64(newLen) + } else { + // Storage update + diff.StorageBytes += int64(newLen - oldLen) + } + } + } + + // Calculate trie node changes + for owner, subset := range update.nodes.Sets { + isAccountTrie := owner == (common.Hash{}) + var keyPrefixSize int64 + if isAccountTrie { + keyPrefixSize = accountTrieKeyPrefixSize + } else { + keyPrefixSize = storageTrieKeyPrefixSize + } + + // Iterate over Origins since every modified node has an origin entry + for path, oldNode := range subset.Origins { + newNode, hasNew := subset.Nodes[path] + + keySize := keyPrefixSize + int64(len(path)) + + if len(oldNode) > 0 && (!hasNew || len(newNode.Blob) == 0) { + // Node deletion + diff.TrieNodeCount -= 1 + diff.TrieNodeBytes -= keySize + int64(len(oldNode)) + } else if len(oldNode) == 0 && hasNew && len(newNode.Blob) > 0 { + // Node creation + diff.TrieNodeCount += 1 + diff.TrieNodeBytes += keySize + int64(len(newNode.Blob)) + } else if len(oldNode) > 0 && hasNew && len(newNode.Blob) > 0 { + // Node update + diff.TrieNodeBytes += int64(len(newNode.Blob) - len(oldNode)) + } + } + } + + // Calculate code changes + for _, code := range update.codes { + diff.ContractCount += 1 + diff.ContractBytes += codeKeySize + int64(len(code.blob)) + } + + return diff +} + +// Track is an async method used to send the state update to the generator. +// It ignores empty updates (where no state changes occurred). +// If the channel is full, it drops the update to avoid blocking. +func (g *StateSizeGenerator) Track(update *stateUpdate) { + if update == nil || update.empty() { + return + } + + g.updateChan <- update +} + +// hasExistingMetrics checks if state size metrics already exist in the database +// and if they are continuous with the current root +func (g *StateSizeGenerator) hasExistingMetrics() bool { + data := rawdb.ReadStateSizeMetrics(g.db) + if data == nil { + return false + } + + var existed stateSizeMetrics + if err := json.Unmarshal(data, &existed); err != nil { + log.Warn("Failed to decode existed state size metrics", "err", err) + return false + } + + // Check if the existing metrics root matches our current root + if (g.metrics.Root != common.Hash{}) && existed.Root != g.metrics.Root { + log.Info("Existing state size metrics found but root mismatch", "existed", existed.Root, "current", g.metrics.Root) + return false + } + + // Root matches - load the existing metrics + log.Info("Loading existing state size metrics", "root", existed.Root) + g.metrics = &existed + return true +} + +// initializeMetrics performs the actual metrics initialization using errgroup +func (g *StateSizeGenerator) initializeMetrics() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + select { + case <-g.abort: + cancel() // Cancel context when abort is signaled + case <-ctx.Done(): + // Context already cancelled + } + }() + + // Create errgroup with context + group, ctx := errgroup.WithContext(ctx) + + // Metrics will be directly updated by each goroutine + var ( + accountSnapCount, accountSnapBytes int64 + storageSnapCount, storageSnapBytes int64 + accountTrieCount, accountTrieBytes int64 + storageTrieCount, storageTrieBytes int64 + contractCount, contractBytes int64 + ) + + // Start all table iterations concurrently with direct metric updates + group.Go(func() error { + count, bytes, err := g.iterateTable(ctx, rawdb.SnapshotAccountPrefix, "accountSnap") + if err != nil { + return err + } + accountSnapCount, accountSnapBytes = count, bytes + return nil + }) + + group.Go(func() error { + count, bytes, err := g.iterateTable(ctx, rawdb.SnapshotStoragePrefix, "storageSnap") + if err != nil { + return err + } + storageSnapCount, storageSnapBytes = count, bytes + return nil + }) + + group.Go(func() error { + count, bytes, err := g.iterateTable(ctx, rawdb.TrieNodeAccountPrefix, "accountTrie") + if err != nil { + return err + } + accountTrieCount, accountTrieBytes = count, bytes + return nil + }) + + group.Go(func() error { + count, bytes, err := g.iterateTable(ctx, rawdb.TrieNodeStoragePrefix, "storageTrie") + if err != nil { + return err + } + storageTrieCount, storageTrieBytes = count, bytes + return nil + }) + + group.Go(func() error { + count, bytes, err := g.iterateTable(ctx, rawdb.CodePrefix, "contract") + if err != nil { + return err + } + contractCount, contractBytes = count, bytes + return nil + }) + + // Wait for all goroutines to complete + if err := group.Wait(); err != nil { + return err + } + + g.metrics.AccountCount = accountSnapCount + g.metrics.AccountBytes = accountSnapBytes + g.metrics.StorageCount = storageSnapCount + g.metrics.StorageBytes = storageSnapBytes + g.metrics.TrieNodeCount = accountTrieCount + storageTrieCount + g.metrics.TrieNodeBytes = accountTrieBytes + storageTrieBytes + g.metrics.ContractCount = contractCount + g.metrics.ContractBytes = contractBytes + + g.updateMetrics() + g.persistMetrics() + + return nil +} + +// iterateTable performs iteration over a specific table and returns the results +func (g *StateSizeGenerator) iterateTable(ctx context.Context, prefix []byte, name string) (int64, int64, error) { + log.Info("Iterating over state size", "table", name) + start := time.Now() + + var count, bytes int64 + iter := g.db.NewIterator(prefix, nil) + defer iter.Release() + + for iter.Next() { + count++ + bytes += int64(len(iter.Key()) + len(iter.Value())) + + // Check for cancellation periodically for performance + if count%10000 == 0 { + select { + case <-ctx.Done(): + log.Info("State size iteration cancelled", "table", name, "count", count) + return 0, 0, ctx.Err() + default: + } + } + } + + // Check for iterator errors + if err := iter.Error(); err != nil { + log.Error("Iterator error during state size calculation", "table", name, "err", err) + return 0, 0, err + } + + log.Info("Finished iterating over state size", "table", name, "count", count, "bytes", bytes, "elapsed", common.PrettyDuration(time.Since(start))) + + return count, bytes, nil +} + +func (g *StateSizeGenerator) updateMetrics() { + accountCountGauge.Update(g.metrics.AccountCount) + accountBytesGauge.Update(g.metrics.AccountBytes) + storageCountGauge.Update(g.metrics.StorageCount) + storageBytesGauge.Update(g.metrics.StorageBytes) + trienodeCountGauge.Update(g.metrics.TrieNodeCount) + trienodeBytesGauge.Update(g.metrics.TrieNodeBytes) + contractCountGauge.Update(g.metrics.ContractCount) + contractBytesGauge.Update(g.metrics.ContractBytes) +} + +// persistMetrics saves the current metrics to the database +func (g *StateSizeGenerator) persistMetrics() { + // RLP doesn't support int64, so we use JSON for simplicity + data, err := json.Marshal(*g.metrics) + if err != nil { + log.Error("Failed to encode state size metrics", "err", err) + return + } + + batch := g.db.NewBatch() + rawdb.WriteStateSizeMetrics(batch, data) + if err := batch.Write(); err != nil { + log.Error("Failed to persist state size metrics", "err", err) + } +} diff --git a/core/stateless/database.go b/core/stateless/database.go index f54c123ddaa..b2d3efb0b19 100644 --- a/core/stateless/database.go +++ b/core/stateless/database.go @@ -63,5 +63,6 @@ func (w *Witness) MakeHashDB() ethdb.Database { rawdb.WriteLegacyTrieNode(memdb, common.BytesToHash(hash), blob) } + return memdb } diff --git a/core/stateless/witness.go b/core/stateless/witness.go index aecfad1d52f..671adcd37bb 100644 --- a/core/stateless/witness.go +++ b/core/stateless/witness.go @@ -41,6 +41,7 @@ type Witness struct { Headers []*types.Header // Past headers in reverse order (0=parent, 1=parent's-parent, etc). First *must* be set. Codes map[string]struct{} // Set of bytecodes ran or accessed State map[string]struct{} // Set of MPT state trie nodes (account and storage together) + Paths map[string]struct{} // Set of MPT trie paths (i.e. all accessed nodes, not just the ones in state) chain HeaderReader // Chain reader to convert block hash ops to header proofs lock sync.Mutex // Lock to allow concurrent state insertions @@ -58,13 +59,15 @@ func NewWitness(context *types.Header, chain HeaderReader) (*Witness, error) { } headers = append(headers, parent) } - // Create the wtness with a reconstructed gutted out block + // Create the witness with a reconstructed gutted out block return &Witness{ context: context, Headers: headers, Codes: make(map[string]struct{}), State: make(map[string]struct{}), - chain: chain, + Paths: make(map[string]struct{}), + + chain: chain, }, nil } @@ -88,7 +91,7 @@ func (w *Witness) AddCode(code []byte) { } // AddState inserts a batch of MPT trie nodes into the witness. -func (w *Witness) AddState(nodes map[string]struct{}) { +func (w *Witness) AddState(nodes map[string]struct{}, paths map[string]struct{}) { if len(nodes) == 0 { return } @@ -96,6 +99,9 @@ func (w *Witness) AddState(nodes map[string]struct{}) { defer w.lock.Unlock() maps.Copy(w.State, nodes) + if paths != nil { + maps.Copy(w.Paths, paths) + } } // Copy deep-copies the witness object. Witness.Block isn't deep-copied as it @@ -105,6 +111,7 @@ func (w *Witness) Copy() *Witness { Headers: slices.Clone(w.Headers), Codes: maps.Clone(w.Codes), State: maps.Clone(w.State), + Paths: maps.Clone(w.Paths), chain: w.chain, } if w.context != nil { diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 0424ecb6e51..fcbd846ff78 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -257,6 +257,11 @@ func (t *StateTrie) Witness() map[string]struct{} { return t.trie.Witness() } +// Witness returns a set containing all trie nodes that have been accessed. +func (t *StateTrie) WitnessPaths() map[string]struct{} { + return t.trie.WitnessPaths() +} + // Commit collects all dirty nodes in the trie and replaces them with the // corresponding node hash. All collected nodes (including dirty leaves if // collectLeaf is true) will be encapsulated into a nodeset for return. diff --git a/trie/tracer.go b/trie/tracer.go index 206e8aa20d8..8355e4c6b0e 100644 --- a/trie/tracer.go +++ b/trie/tracer.go @@ -140,6 +140,10 @@ func (t *prevalueTracer) values() [][]byte { return slices.Collect(maps.Values(t.data)) } +func (t *prevalueTracer) keys() []string { + return slices.Collect(maps.Keys(t.data)) +} + // reset resets the cached content in the prevalueTracer. func (t *prevalueTracer) reset() { clear(t.data) diff --git a/trie/trie.go b/trie/trie.go index 57c47b8ac9b..93a91b3d0d4 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -717,11 +717,26 @@ func (t *Trie) Witness() map[string]struct{} { if len(values) == 0 { return nil } - witness := make(map[string]struct{}, len(values)) + witnessStates := make(map[string]struct{}, len(values)) for _, val := range values { - witness[string(val)] = struct{}{} + witnessStates[string(val)] = struct{}{} } - return witness + + return witnessStates +} + +func (t *Trie) WitnessPaths() map[string]struct{} { + // Return the paths of all nodes that have been accessed. + // The paths are the keys of the prevalue tracer. + keys := t.prevalueTracer.keys() + if len(keys) == 0 { + return nil + } + witnessPaths := make(map[string]struct{}, len(keys)) + for _, key := range keys { + witnessPaths[string(key)] = struct{}{} + } + return witnessPaths } // Reset drops the referenced root node and cleans all internal state. diff --git a/trie/verkle.go b/trie/verkle.go index c89a8f1d368..89eb0591936 100644 --- a/trie/verkle.go +++ b/trie/verkle.go @@ -433,3 +433,8 @@ func (t *VerkleTrie) nodeResolver(path []byte) ([]byte, error) { func (t *VerkleTrie) Witness() map[string]struct{} { panic("not implemented") } + +// Witness returns a set containing all trie nodes that have been accessed. +func (t *VerkleTrie) WitnessPaths() map[string]struct{} { + panic("not implemented") +} diff --git a/triedb/database.go b/triedb/database.go index e2f4334d6e0..cba35a67e9c 100644 --- a/triedb/database.go +++ b/triedb/database.go @@ -375,3 +375,12 @@ func (db *Database) IsVerkle() bool { func (db *Database) Disk() ethdb.Database { return db.disk } + +// SnapshotCompleted returns the indicator if the snapshot is completed. +func (db *Database) SnapshotCompleted() (common.Hash, bool) { + pdb, ok := db.backend.(*pathdb.Database) + if !ok { + return common.Hash{}, false + } + return pdb.SnapshotCompleted() +} diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index e323a7449ee..708c07492d0 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -764,3 +764,15 @@ func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek } return newFastStorageIterator(db, root, account, seek) } + +// SnapshotCompleted returns the snapshot root if the snapshot generation is completed. +func (db *Database) SnapshotCompleted() (common.Hash, bool) { + if db.waitSync { + return common.Hash{}, false + } + dl := db.tree.bottom() + if dl.genComplete() { + return dl.rootHash(), true + } + return common.Hash{}, false +}