Skip to content

Commit bc4ee71

Browse files
authored
triedb/pathdb: add recovery mechanism in state indexer (ethereum#32447)
Alternative of ethereum#32335, enhancing the history indexer recovery after unclean shutdown.
1 parent c4ec450 commit bc4ee71

File tree

4 files changed

+210
-14
lines changed

4 files changed

+210
-14
lines changed

triedb/pathdb/database_test.go

Lines changed: 144 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ func (ctx *genctx) storageOriginSet(rawStorageKey bool, t *tester) map[common.Ad
121121
type tester struct {
122122
db *Database
123123
roots []common.Hash
124+
nodes []*trienode.MergedNodeSet
125+
states []*StateSetWithOrigin
124126
preimages map[common.Hash][]byte
125127

126128
// current state set
@@ -135,12 +137,38 @@ type tester struct {
135137
snapNodes map[common.Hash]*trienode.MergedNodeSet
136138
}
137139

140+
// testerConfig holds configuration parameters for running a test scenario.
138141
type testerConfig struct {
139-
stateHistory uint64
140-
isVerkle bool
141-
layers int
142-
enableIndex bool
143-
journalDir string
142+
stateHistory uint64 // Number of historical states to retain
143+
layers int // Number of state transitions to generate for
144+
enableIndex bool // Enable state history indexing or not
145+
journalDir string // Directory path for persisting journal files
146+
isVerkle bool // Enables Verkle trie mode if true
147+
148+
writeBuffer *int // Optional, the size of memory allocated for write buffer
149+
trieCache *int // Optional, the size of memory allocated for trie cache
150+
stateCache *int // Optional, the size of memory allocated for state cache
151+
}
152+
153+
func (c *testerConfig) trieCacheSize() int {
154+
if c.trieCache != nil {
155+
return *c.trieCache
156+
}
157+
return 256 * 1024
158+
}
159+
160+
func (c *testerConfig) stateCacheSize() int {
161+
if c.stateCache != nil {
162+
return *c.stateCache
163+
}
164+
return 256 * 1024
165+
}
166+
167+
func (c *testerConfig) writeBufferSize() int {
168+
if c.writeBuffer != nil {
169+
return *c.writeBuffer
170+
}
171+
return 256 * 1024
144172
}
145173

146174
func newTester(t *testing.T, config *testerConfig) *tester {
@@ -149,9 +177,9 @@ func newTester(t *testing.T, config *testerConfig) *tester {
149177
db = New(disk, &Config{
150178
StateHistory: config.stateHistory,
151179
EnableStateIndexing: config.enableIndex,
152-
TrieCleanSize: 256 * 1024,
153-
StateCleanSize: 256 * 1024,
154-
WriteBufferSize: 256 * 1024,
180+
TrieCleanSize: config.trieCacheSize(),
181+
StateCleanSize: config.stateCacheSize(),
182+
WriteBufferSize: config.writeBufferSize(),
155183
NoAsyncFlush: true,
156184
JournalDirectory: config.journalDir,
157185
}, config.isVerkle)
@@ -177,6 +205,8 @@ func newTester(t *testing.T, config *testerConfig) *tester {
177205
panic(fmt.Errorf("failed to update state changes, err: %w", err))
178206
}
179207
obj.roots = append(obj.roots, root)
208+
obj.nodes = append(obj.nodes, nodes)
209+
obj.states = append(obj.states, states)
180210
}
181211
return obj
182212
}
@@ -200,6 +230,8 @@ func (t *tester) extend(layers int) {
200230
panic(fmt.Errorf("failed to update state changes, err: %w", err))
201231
}
202232
t.roots = append(t.roots, root)
233+
t.nodes = append(t.nodes, nodes)
234+
t.states = append(t.states, states)
203235
}
204236
}
205237

@@ -885,3 +917,107 @@ func copyStorages(set map[common.Hash]map[common.Hash][]byte) map[common.Hash]ma
885917
}
886918
return copied
887919
}
920+
921+
func TestDatabaseIndexRecovery(t *testing.T) {
922+
maxDiffLayers = 4
923+
defer func() {
924+
maxDiffLayers = 128
925+
}()
926+
927+
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
928+
writeBuffer := 512 * 1024
929+
config := &testerConfig{
930+
layers: 64,
931+
enableIndex: true,
932+
writeBuffer: &writeBuffer,
933+
}
934+
env := newTester(t, config)
935+
defer env.release()
936+
937+
// Ensure the buffer in disk layer is not empty
938+
var (
939+
bRoot = env.db.tree.bottom().rootHash()
940+
dRoot = crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(env.db.diskdb, nil))
941+
)
942+
for dRoot == bRoot {
943+
env.extend(1)
944+
945+
bRoot = env.db.tree.bottom().rootHash()
946+
dRoot = crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(env.db.diskdb, nil))
947+
}
948+
waitIndexing(env.db)
949+
950+
var (
951+
dIndex int
952+
roots = env.roots
953+
hr = newHistoryReader(env.db.diskdb, env.db.stateFreezer)
954+
)
955+
for i, root := range roots {
956+
if root == dRoot {
957+
dIndex = i
958+
}
959+
if root == bRoot {
960+
break
961+
}
962+
if err := checkHistoricalState(env, root, uint64(i+1), hr); err != nil {
963+
t.Fatal(err)
964+
}
965+
}
966+
967+
// Terminate the database and mutate the journal, it's for simulating
968+
// the unclean shutdown
969+
env.db.Journal(env.lastHash())
970+
env.db.Close()
971+
972+
// Mutate the journal in disk, it should be regarded as invalid
973+
blob := rawdb.ReadTrieJournal(env.db.diskdb)
974+
blob[0] = 0xa
975+
rawdb.WriteTrieJournal(env.db.diskdb, blob)
976+
977+
// Reload the database, the extra state histories should be removed
978+
env.db = New(env.db.diskdb, env.db.config, false)
979+
980+
for i := range roots {
981+
_, err := readStateHistory(env.db.stateFreezer, uint64(i+1))
982+
if i <= dIndex && err != nil {
983+
t.Fatalf("State history is not found, %d", i)
984+
}
985+
if i > dIndex && err == nil {
986+
t.Fatalf("Unexpected state history found, %d", i)
987+
}
988+
}
989+
remain, err := env.db.IndexProgress()
990+
if err != nil {
991+
t.Fatalf("Failed to obtain the progress, %v", err)
992+
}
993+
if remain == 0 {
994+
t.Fatalf("Unexpected progress remain, %d", remain)
995+
}
996+
997+
// Apply new states on top, ensuring state indexing can respond correctly
998+
for i := dIndex + 1; i < len(roots); i++ {
999+
if err := env.db.Update(roots[i], roots[i-1], uint64(i), env.nodes[i], env.states[i]); err != nil {
1000+
panic(fmt.Errorf("failed to update state changes, err: %w", err))
1001+
}
1002+
}
1003+
remain, err = env.db.IndexProgress()
1004+
if err != nil {
1005+
t.Fatalf("Failed to obtain the progress, %v", err)
1006+
}
1007+
if remain != 0 {
1008+
t.Fatalf("Unexpected progress remain, %d", remain)
1009+
}
1010+
waitIndexing(env.db)
1011+
1012+
// Ensure the truncated state histories become accessible
1013+
bRoot = env.db.tree.bottom().rootHash()
1014+
hr = newHistoryReader(env.db.diskdb, env.db.stateFreezer)
1015+
for i, root := range roots {
1016+
if root == bRoot {
1017+
break
1018+
}
1019+
if err := checkHistoricalState(env, root, uint64(i+1), hr); err != nil {
1020+
t.Fatal(err)
1021+
}
1022+
}
1023+
}

triedb/pathdb/history_indexer.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,15 +322,22 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID
322322
closed: make(chan struct{}),
323323
}
324324
// Load indexing progress
325+
var recover bool
325326
initer.last.Store(lastID)
326327
metadata := loadIndexMetadata(disk)
327328
if metadata != nil {
328329
initer.indexed.Store(metadata.Last)
330+
recover = metadata.Last > lastID
329331
}
330332

331333
// Launch background indexer
332334
initer.wg.Add(1)
333-
go initer.run(lastID)
335+
if recover {
336+
log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last)
337+
go initer.recover(lastID)
338+
} else {
339+
go initer.run(lastID)
340+
}
334341
return initer
335342
}
336343

@@ -364,8 +371,8 @@ func (i *indexIniter) remain() uint64 {
364371
default:
365372
last, indexed := i.last.Load(), i.indexed.Load()
366373
if last < indexed {
367-
log.Error("Invalid state indexing range", "last", last, "indexed", indexed)
368-
return 0
374+
log.Warn("State indexer is in recovery", "indexed", indexed, "last", last)
375+
return indexed - last
369376
}
370377
return last - indexed
371378
}
@@ -569,6 +576,49 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
569576
log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
570577
}
571578

579+
// recover handles unclean shutdown recovery. After an unclean shutdown, any
580+
// extra histories are typically truncated, while the corresponding history index
581+
// entries may still have been written. Ideally, we would unindex these histories
582+
// in reverse order, but there is no guarantee that the required histories will
583+
// still be available.
584+
//
585+
// As a workaround, indexIniter waits until the missing histories are regenerated
586+
// by chain recovery, under the assumption that the recovered histories will be
587+
// identical to the lost ones. Fork-awareness should be added in the future to
588+
// correctly handle histories affected by reorgs.
589+
func (i *indexIniter) recover(lastID uint64) {
590+
defer i.wg.Done()
591+
592+
for {
593+
select {
594+
case signal := <-i.interrupt:
595+
newLastID := signal.newLastID
596+
if newLastID != lastID+1 && newLastID != lastID-1 {
597+
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID)
598+
continue
599+
}
600+
601+
// Update the last indexed flag
602+
lastID = newLastID
603+
signal.result <- nil
604+
i.last.Store(newLastID)
605+
log.Debug("Updated history index flag", "last", lastID)
606+
607+
// Terminate the recovery routine once the histories are fully aligned
608+
// with the index data, indicating that index initialization is complete.
609+
metadata := loadIndexMetadata(i.disk)
610+
if metadata != nil && metadata.Last == lastID {
611+
close(i.done)
612+
log.Info("History indexer is recovered", "last", lastID)
613+
return
614+
}
615+
616+
case <-i.closed:
617+
return
618+
}
619+
}
620+
}
621+
572622
// historyIndexer manages the indexing and unindexing of state histories,
573623
// providing access to historical states.
574624
//

triedb/pathdb/history_reader_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,13 @@ func testHistoryReader(t *testing.T, historyLimit uint64) {
144144
maxDiffLayers = 128
145145
}()
146146

147-
env := newTester(t, &testerConfig{stateHistory: historyLimit, layers: 64, enableIndex: true})
147+
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
148+
config := &testerConfig{
149+
stateHistory: historyLimit,
150+
layers: 64,
151+
enableIndex: true,
152+
}
153+
env := newTester(t, config)
148154
defer env.release()
149155
waitIndexing(env.db)
150156

@@ -183,7 +189,11 @@ func TestHistoricalStateReader(t *testing.T) {
183189
}()
184190

185191
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
186-
config := &testerConfig{stateHistory: 0, layers: 64, enableIndex: true}
192+
config := &testerConfig{
193+
stateHistory: 0,
194+
layers: 64,
195+
enableIndex: true,
196+
}
187197
env := newTester(t, config)
188198
defer env.release()
189199
waitIndexing(env.db)

triedb/pathdb/journal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (dl *diskLayer) journal(w io.Writer) error {
267267
if err := dl.buffer.states.encode(w); err != nil {
268268
return err
269269
}
270-
log.Debug("Journaled pathdb disk layer", "root", dl.root)
270+
log.Debug("Journaled pathdb disk layer", "root", dl.root, "id", dl.id)
271271
return nil
272272
}
273273

0 commit comments

Comments
 (0)