Skip to content

Commit 2192020

Browse files
authored
triedb/pathdb, eth: use double-buffer mechanism in pathdb (#30464)
Previously, PathDB used a single buffer to aggregate database writes, which needed to be flushed atomically. However, flushing large amounts of data (e.g., 256MB) caused significant overhead, often blocking the system for around 3 seconds during the flush. To mitigate this overhead and reduce performance spikes, a double-buffer mechanism is introduced. When the active buffer fills up, it is marked as frozen and a background flushing process is triggered. Meanwhile, a new buffer is allocated for incoming writes, allowing operations to continue uninterrupted. This approach reduces system blocking times and provides flexibility in adjusting buffer parameters for improved performance.
1 parent 338d754 commit 2192020

17 files changed

+383
-177
lines changed

core/blockchain.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,10 @@ const (
162162
// BlockChainConfig contains the configuration of the BlockChain object.
163163
type BlockChainConfig struct {
164164
// Trie database related options
165-
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
166-
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
167-
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
165+
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
166+
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
167+
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
168+
TrieNoAsyncFlush bool // Whether the asynchronous buffer flushing is disallowed
168169

169170
Preimages bool // Whether to store preimage of trie key to the disk
170171
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
@@ -210,7 +211,7 @@ func DefaultConfig() *BlockChainConfig {
210211
}
211212
}
212213

213-
// WithArchive enabled/disables archive mode on the config.
214+
// WithArchive enables/disables archive mode on the config.
214215
func (cfg BlockChainConfig) WithArchive(on bool) *BlockChainConfig {
215216
cfg.ArchiveMode = on
216217
return &cfg
@@ -222,6 +223,12 @@ func (cfg BlockChainConfig) WithStateScheme(scheme string) *BlockChainConfig {
222223
return &cfg
223224
}
224225

226+
// WithNoAsyncFlush enables/disables asynchronous buffer flushing mode on the config.
227+
func (cfg BlockChainConfig) WithNoAsyncFlush(on bool) *BlockChainConfig {
228+
cfg.TrieNoAsyncFlush = on
229+
return &cfg
230+
}
231+
225232
// triedbConfig derives the configures for trie database.
226233
func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
227234
config := &triedb.Config{
@@ -243,6 +250,7 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
243250
// for flushing both trie data and state data to disk. The config name
244251
// should be updated to eliminate the confusion.
245252
WriteBufferSize: cfg.TrieDirtyLimit * 1024 * 1024,
253+
NoAsyncFlush: cfg.TrieNoAsyncFlush,
246254
}
247255
}
248256
return config

core/blockchain_snapshot_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
8181
}
8282
engine = ethash.NewFullFaker()
8383
)
84-
chain, err := NewBlockChain(db, gspec, engine, DefaultConfig().WithStateScheme(basic.scheme))
84+
chain, err := NewBlockChain(db, gspec, engine, DefaultConfig().WithStateScheme(basic.scheme).WithNoAsyncFlush(true))
8585
if err != nil {
8686
t.Fatalf("Failed to create chain: %v", err)
8787
}
@@ -572,7 +572,7 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
572572
//
573573
// Expected head header : C8
574574
// Expected head fast block: C8
575-
// Expected head block : G (Hash mode), C6 (Hash mode)
575+
// Expected head block : G (Hash mode), C6 (Path mode)
576576
// Expected snapshot disk : C4 (Hash mode)
577577
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
578578
expHead := uint64(0)

core/genesis_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ func newDbConfig(scheme string) *triedb.Config {
256256
if scheme == rawdb.HashScheme {
257257
return triedb.HashDefaults
258258
}
259-
return &triedb.Config{PathDB: pathdb.Defaults}
259+
config := *pathdb.Defaults
260+
config.NoAsyncFlush = true
261+
return &triedb.Config{PathDB: &config}
260262
}
261263

262264
func TestVerkleGenesisCommit(t *testing.T) {
@@ -313,7 +315,14 @@ func TestVerkleGenesisCommit(t *testing.T) {
313315
}
314316

315317
db := rawdb.NewMemoryDatabase()
316-
triedb := triedb.NewDatabase(db, triedb.VerkleDefaults)
318+
319+
config := *pathdb.Defaults
320+
config.NoAsyncFlush = true
321+
322+
triedb := triedb.NewDatabase(db, &triedb.Config{
323+
IsVerkle: true,
324+
PathDB: &config,
325+
})
317326
block := genesis.MustCommit(db, triedb)
318327
if !bytes.Equal(block.Root().Bytes(), expected) {
319328
t.Fatalf("invalid genesis state root, expected %x, got %x", expected, block.Root())

core/state/snapshot/generate_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ func newHelper(scheme string) *testHelper {
168168
if scheme == rawdb.PathScheme {
169169
config.PathDB = &pathdb.Config{
170170
SnapshotNoBuild: true,
171+
NoAsyncFlush: true,
171172
} // disable caching
172173
} else {
173174
config.HashDB = &hashdb.Config{} // disable caching

core/state/statedb_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,7 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
982982
TrieCleanSize: 0,
983983
StateCleanSize: 0,
984984
WriteBufferSize: 0,
985+
NoAsyncFlush: true,
985986
}}) // disable caching
986987
} else {
987988
tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
@@ -1004,18 +1005,25 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
10041005
// force-flush
10051006
tdb.Commit(root, false)
10061007
}
1007-
// Create a new state on the old root
1008-
state, _ = New(root, db)
10091008
// Now we clear out the memdb
10101009
it := memDb.NewIterator(nil, nil)
10111010
for it.Next() {
10121011
k := it.Key()
1013-
// Leave the root intact
1014-
if !bytes.Equal(k, root[:]) {
1015-
t.Logf("key: %x", k)
1016-
memDb.Delete(k)
1012+
if scheme == rawdb.HashScheme {
1013+
if !bytes.Equal(k, root[:]) {
1014+
t.Logf("key: %x", k)
1015+
memDb.Delete(k)
1016+
}
1017+
}
1018+
if scheme == rawdb.PathScheme {
1019+
rk := k[len(rawdb.TrieNodeAccountPrefix):]
1020+
if len(rk) != 0 {
1021+
t.Logf("key: %x", k)
1022+
memDb.Delete(k)
1023+
}
10171024
}
10181025
}
1026+
state, _ = New(root, db)
10191027
balance := state.GetBalance(addr)
10201028
// The removed elem should lead to it returning zero balance
10211029
if exp, got := uint64(0), balance.Uint64(); got != exp {

core/state/sync_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ func makeTestState(scheme string) (ethdb.Database, Database, *triedb.Database, c
4646
// Create an empty state
4747
config := &triedb.Config{Preimages: true}
4848
if scheme == rawdb.PathScheme {
49-
config.PathDB = pathdb.Defaults
49+
pconfig := *pathdb.Defaults
50+
pconfig.NoAsyncFlush = true
51+
config.PathDB = &pconfig
5052
} else {
5153
config.HashDB = hashdb.Defaults
5254
}

triedb/pathdb/buffer.go

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package pathdb
1818

1919
import (
20+
"errors"
2021
"fmt"
2122
"time"
2223

@@ -37,6 +38,13 @@ type buffer struct {
3738
limit uint64 // The maximum memory allowance in bytes
3839
nodes *nodeSet // Aggregated trie node set
3940
states *stateSet // Aggregated state set
41+
42+
// done is the notifier whether the content in buffer has been flushed or not.
43+
// This channel is nil if the buffer is not frozen.
44+
done chan struct{}
45+
46+
// flushErr memorizes the error if any exception occurs during flushing
47+
flushErr error
4048
}
4149

4250
// newBuffer initializes the buffer with the provided states and trie nodes.
@@ -61,7 +69,7 @@ func (b *buffer) account(hash common.Hash) ([]byte, bool) {
6169
return b.states.account(hash)
6270
}
6371

64-
// storage retrieves the storage slot with account address hash and slot key.
72+
// storage retrieves the storage slot with account address hash and slot key hash.
6573
func (b *buffer) storage(addrHash common.Hash, storageHash common.Hash) ([]byte, bool) {
6674
return b.states.storage(addrHash, storageHash)
6775
}
@@ -124,43 +132,78 @@ func (b *buffer) size() uint64 {
124132

125133
// flush persists the in-memory dirty trie node into the disk if the configured
126134
// memory threshold is reached. Note, all data must be written atomically.
127-
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error {
128-
// Ensure the target state id is aligned with the internal counter.
129-
head := rawdb.ReadPersistentStateID(db)
130-
if head+b.layers != id {
131-
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
135+
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64, postFlush func()) {
136+
if b.done != nil {
137+
panic("duplicated flush operation")
132138
}
133-
// Terminate the state snapshot generation if it's active
134-
var (
135-
start = time.Now()
136-
batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff
137-
)
138-
// Explicitly sync the state freezer to ensure all written data is persisted to disk
139-
// before updating the key-value store.
140-
//
141-
// This step is crucial to guarantee that the corresponding state history remains
142-
// available for state rollback.
143-
if freezer != nil {
144-
if err := freezer.SyncAncient(); err != nil {
145-
return err
139+
b.done = make(chan struct{}) // allocate the channel for notification
140+
141+
// Schedule the background thread to construct the batch, which usually
142+
// take a few seconds.
143+
go func() {
144+
defer func() {
145+
if postFlush != nil {
146+
postFlush()
147+
}
148+
close(b.done)
149+
}()
150+
151+
// Ensure the target state id is aligned with the internal counter.
152+
head := rawdb.ReadPersistentStateID(db)
153+
if head+b.layers != id {
154+
b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
155+
return
146156
}
157+
158+
// Terminate the state snapshot generation if it's active
159+
var (
160+
start = time.Now()
161+
batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff
162+
)
163+
// Explicitly sync the state freezer to ensure all written data is persisted to disk
164+
// before updating the key-value store.
165+
//
166+
// This step is crucial to guarantee that the corresponding state history remains
167+
// available for state rollback.
168+
if freezer != nil {
169+
if err := freezer.SyncAncient(); err != nil {
170+
b.flushErr = err
171+
return
172+
}
173+
}
174+
nodes := b.nodes.write(batch, nodesCache)
175+
accounts, slots := b.states.write(batch, progress, statesCache)
176+
rawdb.WritePersistentStateID(batch, id)
177+
rawdb.WriteSnapshotRoot(batch, root)
178+
179+
// Flush all mutations in a single batch
180+
size := batch.ValueSize()
181+
if err := batch.Write(); err != nil {
182+
b.flushErr = err
183+
return
184+
}
185+
commitBytesMeter.Mark(int64(size))
186+
commitNodesMeter.Mark(int64(nodes))
187+
commitAccountsMeter.Mark(int64(accounts))
188+
commitStoragesMeter.Mark(int64(slots))
189+
commitTimeTimer.UpdateSince(start)
190+
191+
// The content in the frozen buffer is kept for consequent state access,
192+
// TODO (rjl493456442) measure the gc overhead for holding this struct.
193+
// TODO (rjl493456442) can we somehow get rid of it after flushing??
194+
// TODO (rjl493456442) buffer itself is not thread-safe, add the lock
195+
// protection if try to reset the buffer here.
196+
// b.reset()
197+
log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
198+
}()
199+
}
200+
201+
// waitFlush blocks until the buffer has been fully flushed and returns any
202+
// stored errors that occurred during the process.
203+
func (b *buffer) waitFlush() error {
204+
if b.done == nil {
205+
return errors.New("the buffer is not frozen")
147206
}
148-
nodes := b.nodes.write(batch, nodesCache)
149-
accounts, slots := b.states.write(batch, progress, statesCache)
150-
rawdb.WritePersistentStateID(batch, id)
151-
rawdb.WriteSnapshotRoot(batch, root)
152-
153-
// Flush all mutations in a single batch
154-
size := batch.ValueSize()
155-
if err := batch.Write(); err != nil {
156-
return err
157-
}
158-
commitBytesMeter.Mark(int64(size))
159-
commitNodesMeter.Mark(int64(nodes))
160-
commitAccountsMeter.Mark(int64(accounts))
161-
commitStoragesMeter.Mark(int64(slots))
162-
commitTimeTimer.UpdateSince(start)
163-
b.reset()
164-
log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
165-
return nil
207+
<-b.done
208+
return b.flushErr
166209
}

triedb/pathdb/database.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,11 @@ type Config struct {
119119
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
120120
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
121121
ReadOnly bool // Flag whether the database is opened in read only mode
122-
SnapshotNoBuild bool // Flag Whether the background generation is allowed
122+
123+
// Testing configurations
124+
SnapshotNoBuild bool // Flag Whether the state generation is allowed
125+
NoAsyncFlush bool // Flag whether the background buffer flushing is allowed
126+
NoAsyncGeneration bool // Flag whether the background generation is allowed
123127
}
124128

125129
// sanitize checks the provided user configurations and changes anything that's
@@ -366,6 +370,12 @@ func (db *Database) setStateGenerator() error {
366370
}
367371
stats.log("Starting snapshot generation", root, generator.Marker)
368372
dl.generator.run(root)
373+
374+
// Block until the generation completes. It's the feature used in
375+
// unit tests.
376+
if db.config.NoAsyncGeneration {
377+
<-dl.generator.done
378+
}
369379
return nil
370380
}
371381

@@ -434,8 +444,8 @@ func (db *Database) Disable() error {
434444
// Terminate the state generator if it's active and mark the disk layer
435445
// as stale to prevent access to persistent state.
436446
disk := db.tree.bottom()
437-
if disk.generator != nil {
438-
disk.generator.stop()
447+
if err := disk.terminate(); err != nil {
448+
return err
439449
}
440450
disk.markStale()
441451

@@ -592,12 +602,14 @@ func (db *Database) Close() error {
592602
// following mutations.
593603
db.readOnly = true
594604

595-
// Terminate the background generation if it's active
596-
disk := db.tree.bottom()
597-
if disk.generator != nil {
598-
disk.generator.stop()
605+
// Block until the background flushing is finished. It must
606+
// be done before terminating the potential background snapshot
607+
// generator.
608+
dl := db.tree.bottom()
609+
if err := dl.terminate(); err != nil {
610+
return err
599611
}
600-
disk.resetCache() // release the memory held by clean cache
612+
dl.resetCache() // release the memory held by clean cache
601613

602614
// Close the attached state history freezer.
603615
if db.freezer == nil {
@@ -662,16 +674,6 @@ func (db *Database) HistoryRange() (uint64, uint64, error) {
662674
return historyRange(db.freezer)
663675
}
664676

665-
// waitGeneration waits until the background generation is finished. It assumes
666-
// that the generation is permitted; otherwise, it will block indefinitely.
667-
func (db *Database) waitGeneration() {
668-
gen := db.tree.bottom().generator
669-
if gen == nil || gen.completed() {
670-
return
671-
}
672-
<-gen.done
673-
}
674-
675677
// AccountIterator creates a new account iterator for the specified root hash and
676678
// seeks to a starting account hash.
677679
func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
@@ -681,7 +683,7 @@ func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (Account
681683
if wait {
682684
return nil, errDatabaseWaitSync
683685
}
684-
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
686+
if !db.tree.bottom().genComplete() {
685687
return nil, errNotConstructed
686688
}
687689
return newFastAccountIterator(db, root, seek)
@@ -696,7 +698,7 @@ func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek
696698
if wait {
697699
return nil, errDatabaseWaitSync
698700
}
699-
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
701+
if !db.tree.bottom().genComplete() {
700702
return nil, errNotConstructed
701703
}
702704
return newFastStorageIterator(db, root, account, seek)

triedb/pathdb/database_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int) *te
129129
TrieCleanSize: 256 * 1024,
130130
StateCleanSize: 256 * 1024,
131131
WriteBufferSize: 256 * 1024,
132+
NoAsyncFlush: true,
132133
}, isVerkle)
133134

134135
obj = &tester{

0 commit comments

Comments
 (0)