Skip to content

Commit 14d576c

Browse files
zsfelfoldirjl493456442fjl
authored
core/filtermaps: hashdb safe delete range (#31525)
This PR adds `rawdb.SafeDeleteRange` and uses it for range deletion in `core/filtermaps`. This includes deleting the old bloombits database, resetting the log index database and removing index data for unindexed tail epochs (which previously weren't properly implemented for the fallback case). `SafeDeleteRange` either calls `ethdb.DeleteRange` if the node uses the new path based state scheme or uses an iterator based fallback method that safely skips trie nodes in the range if the old hash based state scheme is used. Note that `ethdb.DeleteRange` also has its own iterator based fallback implementation in `ethdb/leveldb`. If a path based state scheme is used and the backing db is pebble (as it is on the majority of new nodes) then `rawdb.SafeDeleteRange` uses the fast native range delete. Also note that `rawdb.SafeDeleteRange` has different semantics from `ethdb.DeleteRange`, it does not automatically return if the operation takes a long time. Instead it receives a `stopCallback` that can interrupt the process if necessary. This is because in the safe mode potentially a lot of entries are iterated without being deleted (this is definitely the case when deleting the old bloombits database which has a single byte prefix) and therefore restarting the process every time a fixed number of entries have been iterated would result in a quadratic run time in the number of skipped entries. When running in safe mode, unindexing an epoch takes about a second, removing bloombits takes around 10s while resetting a full log index might take a few minutes. If a range delete operation takes a significant amount of time then log messages are printed. Also, any range delete operation can be interrupted by shutdown (tail uinindexing can also be interrupted by head indexing, similarly to how tail indexing works). If the last unindexed epoch might have "dirty" index data left then the indexed map range points to the first valid epoch and `cleanedEpochsBefore` points to the previous, potentially dirty one. At startup it is always assumed that the epoch before the first fully indexed one might be dirty. New tail maps are never rendered and also no further maps are unindexed before the previous unindexing is properly cleaned up. --------- Co-authored-by: Gary Rong <[email protected]> Co-authored-by: Felix Lange <[email protected]>
1 parent ffa315f commit 14d576c

File tree

7 files changed

+256
-117
lines changed

7 files changed

+256
-117
lines changed

core/filtermaps/filtermaps.go

Lines changed: 135 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/ethereum/go-ethereum/core/rawdb"
3030
"github.com/ethereum/go-ethereum/core/types"
3131
"github.com/ethereum/go-ethereum/ethdb"
32-
"github.com/ethereum/go-ethereum/ethdb/leveldb"
3332
"github.com/ethereum/go-ethereum/log"
3433
)
3534

@@ -59,6 +58,7 @@ type FilterMaps struct {
5958
closeCh chan struct{}
6059
closeWg sync.WaitGroup
6160
history uint64
61+
hashScheme bool // use hashdb-safe delete range method
6262
exportFileName string
6363
Params
6464

@@ -67,10 +67,11 @@ type FilterMaps struct {
6767
// fields written by the indexer and read by matcher backend. Indexer can
6868
// read them without a lock and write them under indexLock write lock.
6969
// Matcher backend can read them under indexLock read lock.
70-
indexLock sync.RWMutex
71-
indexedRange filterMapsRange
72-
indexedView *ChainView // always consistent with the log index
73-
hasTempRange bool
70+
indexLock sync.RWMutex
71+
indexedRange filterMapsRange
72+
cleanedEpochsBefore uint32 // all unindexed data cleaned before this point
73+
indexedView *ChainView // always consistent with the log index
74+
hasTempRange bool
7475

7576
// also accessed by indexer and matcher backend but no locking needed.
7677
filterMapCache *lru.Cache[uint32, filterMap]
@@ -180,6 +181,10 @@ type Config struct {
180181
// This option enables the checkpoint JSON file generator.
181182
// If set, the given file will be updated with checkpoint information.
182183
ExportFileName string
184+
185+
// expect trie nodes of hash based state scheme in the filtermaps key range;
186+
// use safe iterator based implementation of DeleteRange that skips them
187+
HashScheme bool
183188
}
184189

185190
// NewFilterMaps creates a new FilterMaps and starts the indexer.
@@ -197,6 +202,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
197202
blockProcessingCh: make(chan bool, 1),
198203
history: config.History,
199204
disabled: config.Disabled,
205+
hashScheme: config.HashScheme,
200206
disabledCh: make(chan struct{}),
201207
exportFileName: config.ExportFileName,
202208
Params: params,
@@ -208,15 +214,17 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
208214
maps: common.NewRange(rs.MapsFirst, rs.MapsAfterLast-rs.MapsFirst),
209215
tailPartialEpoch: rs.TailPartialEpoch,
210216
},
211-
historyCutoff: historyCutoff,
212-
finalBlock: finalBlock,
213-
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
214-
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
215-
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
216-
lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks),
217-
lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers),
218-
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
219-
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
217+
// deleting last unindexed epoch might have been interrupted by shutdown
218+
cleanedEpochsBefore: max(rs.MapsFirst>>params.logMapsPerEpoch, 1) - 1,
219+
historyCutoff: historyCutoff,
220+
finalBlock: finalBlock,
221+
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
222+
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
223+
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
224+
lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks),
225+
lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers),
226+
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
227+
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
220228
}
221229

222230
// Set initial indexer target.
@@ -301,14 +309,24 @@ func (f *FilterMaps) reset() {
301309
// deleting the range first ensures that resetDb will be called again at next
302310
// startup and any leftover data will be removed even if it cannot finish now.
303311
rawdb.DeleteFilterMapsRange(f.db)
304-
f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
312+
f.safeDeleteWithLogs(rawdb.DeleteFilterMapsDb, "Resetting log index database", f.isShuttingDown)
313+
}
314+
315+
// isShuttingDown returns true if FilterMaps is shutting down.
316+
func (f *FilterMaps) isShuttingDown() bool {
317+
select {
318+
case <-f.closeCh:
319+
return true
320+
default:
321+
return false
322+
}
305323
}
306324

307325
// init initializes an empty log index according to the current targetView.
308326
func (f *FilterMaps) init() error {
309327
// ensure that there is no remaining data in the filter maps key range
310-
if !f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") {
311-
return errors.New("could not reset log index database")
328+
if err := f.safeDeleteWithLogs(rawdb.DeleteFilterMapsDb, "Resetting log index database", f.isShuttingDown); err != nil {
329+
return err
312330
}
313331

314332
f.indexLock.Lock()
@@ -358,38 +376,37 @@ func (f *FilterMaps) init() error {
358376

359377
// removeBloomBits removes old bloom bits data from the database.
360378
func (f *FilterMaps) removeBloomBits() {
361-
f.safeDeleteRange(rawdb.DeleteBloomBitsDb, "Removing old bloom bits database")
379+
f.safeDeleteWithLogs(rawdb.DeleteBloomBitsDb, "Removing old bloom bits database", f.isShuttingDown)
362380
f.closeWg.Done()
363381
}
364382

365-
// safeDeleteRange calls the specified database range deleter function
366-
// repeatedly as long as it returns leveldb.ErrTooManyKeys.
367-
// This wrapper is necessary because of the leveldb fallback implementation
368-
// of DeleteRange.
369-
func (f *FilterMaps) safeDeleteRange(removeFn func(ethdb.KeyValueRangeDeleter) error, action string) bool {
370-
start := time.Now()
371-
var retry bool
372-
for {
373-
err := removeFn(f.db)
374-
if err == nil {
375-
if retry {
376-
log.Info(action+" finished", "elapsed", time.Since(start))
377-
}
378-
return true
379-
}
380-
if err != leveldb.ErrTooManyKeys {
381-
log.Error(action+" failed", "error", err)
382-
return false
383+
// safeDeleteWithLogs is a wrapper for a function that performs a safe range
384+
// delete operation using rawdb.SafeDeleteRange. It emits log messages if the
385+
// process takes long enough to call the stop callback.
386+
func (f *FilterMaps) safeDeleteWithLogs(deleteFn func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error, action string, stopCb func() bool) error {
387+
var (
388+
start = time.Now()
389+
logPrinted bool
390+
lastLogPrinted = start
391+
)
392+
switch err := deleteFn(f.db, f.hashScheme, func(deleted bool) bool {
393+
if deleted && !logPrinted || time.Since(lastLogPrinted) > time.Second*10 {
394+
log.Info(action+" in progress...", "elapsed", common.PrettyDuration(time.Since(start)))
395+
logPrinted, lastLogPrinted = true, time.Now()
383396
}
384-
select {
385-
case <-f.closeCh:
386-
return false
387-
default:
388-
}
389-
if !retry {
390-
log.Info(action+" in progress...", "elapsed", time.Since(start))
391-
retry = true
397+
return stopCb()
398+
}); {
399+
case err == nil:
400+
if logPrinted {
401+
log.Info(action+" finished", "elapsed", common.PrettyDuration(time.Since(start)))
392402
}
403+
return nil
404+
case errors.Is(err, rawdb.ErrDeleteRangeInterrupted):
405+
log.Warn(action+" interrupted", "elapsed", common.PrettyDuration(time.Since(start)))
406+
return err
407+
default:
408+
log.Error(action+" failed", "error", err)
409+
return err
393410
}
394411
}
395412

@@ -658,54 +675,97 @@ func (f *FilterMaps) deleteLastBlockOfMap(batch ethdb.Batch, mapIndex uint32) {
658675
rawdb.DeleteFilterMapLastBlock(batch, mapIndex)
659676
}
660677

661-
// deleteTailEpoch deletes index data from the earliest, either fully or partially
662-
// indexed epoch. The last block pointer for the last map of the epoch and the
663-
// corresponding block log value pointer are retained as these are always assumed
664-
// to be available for each epoch.
665-
func (f *FilterMaps) deleteTailEpoch(epoch uint32) error {
678+
// deleteTailEpoch deletes index data from the specified epoch. The last block
679+
// pointer for the last map of the epoch and the corresponding block log value
680+
// pointer are retained as these are always assumed to be available for each
681+
// epoch as boundary markers.
682+
// The function returns true if all index data related to the epoch (except for
683+
// the boundary markers) has been fully removed.
684+
func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) {
666685
f.indexLock.Lock()
667686
defer f.indexLock.Unlock()
668687

688+
// determine epoch boundaries
669689
firstMap := epoch << f.logMapsPerEpoch
670690
lastBlock, _, err := f.getLastBlockOfMap(firstMap + f.mapsPerEpoch - 1)
671691
if err != nil {
672-
return fmt.Errorf("failed to retrieve last block of deleted epoch %d: %v", epoch, err)
692+
return false, fmt.Errorf("failed to retrieve last block of deleted epoch %d: %v", epoch, err)
673693
}
674694
var firstBlock uint64
675695
if epoch > 0 {
676696
firstBlock, _, err = f.getLastBlockOfMap(firstMap - 1)
677697
if err != nil {
678-
return fmt.Errorf("failed to retrieve last block before deleted epoch %d: %v", epoch, err)
698+
return false, fmt.Errorf("failed to retrieve last block before deleted epoch %d: %v", epoch, err)
679699
}
680700
firstBlock++
681701
}
682-
fmr := f.indexedRange
683-
if f.indexedRange.maps.First() == firstMap &&
684-
f.indexedRange.maps.AfterLast() > firstMap+f.mapsPerEpoch &&
685-
f.indexedRange.tailPartialEpoch == 0 {
686-
fmr.maps.SetFirst(firstMap + f.mapsPerEpoch)
687-
fmr.blocks.SetFirst(lastBlock + 1)
688-
} else if f.indexedRange.maps.First() == firstMap+f.mapsPerEpoch {
702+
// update rendered range if necessary
703+
var (
704+
fmr = f.indexedRange
705+
firstEpoch = f.indexedRange.maps.First() >> f.logMapsPerEpoch
706+
afterLastEpoch = (f.indexedRange.maps.AfterLast() + f.mapsPerEpoch - 1) >> f.logMapsPerEpoch
707+
)
708+
if f.indexedRange.tailPartialEpoch != 0 && firstEpoch > 0 {
709+
firstEpoch--
710+
}
711+
switch {
712+
case epoch < firstEpoch:
713+
// cleanup of already unindexed epoch; range not affected
714+
case epoch == firstEpoch && epoch+1 < afterLastEpoch:
715+
// first fully or partially rendered epoch and there is at least one
716+
// rendered map in the next epoch; remove from indexed range
689717
fmr.tailPartialEpoch = 0
718+
fmr.maps.SetFirst((epoch + 1) << f.logMapsPerEpoch)
719+
fmr.blocks.SetFirst(lastBlock + 1)
720+
f.setRange(f.db, f.indexedView, fmr, false)
721+
default:
722+
// cannot be cleaned or unindexed; return with error
723+
return false, errors.New("invalid tail epoch number")
724+
}
725+
// remove index data
726+
if err := f.safeDeleteWithLogs(func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error {
727+
first := f.mapRowIndex(firstMap, 0)
728+
count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first
729+
if err := rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count), hashScheme, stopCb); err != nil {
730+
return err
731+
}
732+
for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch; mapIndex++ {
733+
f.filterMapCache.Remove(mapIndex)
734+
}
735+
delMapRange := common.NewRange(firstMap, f.mapsPerEpoch-1) // keep last entry
736+
if err := rawdb.DeleteFilterMapLastBlocks(f.db, delMapRange, hashScheme, stopCb); err != nil {
737+
return err
738+
}
739+
for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch-1; mapIndex++ {
740+
f.lastBlockCache.Remove(mapIndex)
741+
}
742+
delBlockRange := common.NewRange(firstBlock, lastBlock-firstBlock) // keep last entry
743+
if err := rawdb.DeleteBlockLvPointers(f.db, delBlockRange, hashScheme, stopCb); err != nil {
744+
return err
745+
}
746+
for blockNumber := firstBlock; blockNumber < lastBlock; blockNumber++ {
747+
f.lvPointerCache.Remove(blockNumber)
748+
}
749+
return nil
750+
}, fmt.Sprintf("Deleting tail epoch #%d", epoch), func() bool {
751+
f.processEvents()
752+
return f.stop || !f.targetHeadIndexed()
753+
}); err == nil {
754+
// everything removed; mark as cleaned and report success
755+
if f.cleanedEpochsBefore == epoch {
756+
f.cleanedEpochsBefore = epoch + 1
757+
}
758+
return true, nil
690759
} else {
691-
return errors.New("invalid tail epoch number")
692-
}
693-
f.setRange(f.db, f.indexedView, fmr, false)
694-
first := f.mapRowIndex(firstMap, 0)
695-
count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first
696-
rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count))
697-
for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch; mapIndex++ {
698-
f.filterMapCache.Remove(mapIndex)
699-
}
700-
rawdb.DeleteFilterMapLastBlocks(f.db, common.NewRange(firstMap, f.mapsPerEpoch-1)) // keep last enrty
701-
for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch-1; mapIndex++ {
702-
f.lastBlockCache.Remove(mapIndex)
703-
}
704-
rawdb.DeleteBlockLvPointers(f.db, common.NewRange(firstBlock, lastBlock-firstBlock)) // keep last enrty
705-
for blockNumber := firstBlock; blockNumber < lastBlock; blockNumber++ {
706-
f.lvPointerCache.Remove(blockNumber)
760+
// more data left in epoch range; mark as dirty and report unfinished
761+
if f.cleanedEpochsBefore > epoch {
762+
f.cleanedEpochsBefore = epoch
763+
}
764+
if errors.Is(err, rawdb.ErrDeleteRangeInterrupted) {
765+
return false, nil
766+
}
767+
return false, err
707768
}
708-
return nil
709769
}
710770

711771
// exportCheckpoints exports epoch checkpoints in the format used by checkpoints.go.

core/filtermaps/indexer.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,17 @@ func (f *FilterMaps) indexerLoop() {
6767
}
6868
f.lastFinal = f.finalBlock
6969
}
70-
if done, err := f.tryIndexTail(); err != nil {
71-
f.disableForError("tail rendering", err)
70+
// always attempt unindexing before indexing the tail in order to
71+
// ensure that a potentially dirty previously unindexed epoch is
72+
// always cleaned up before any new maps are rendered.
73+
if done, err := f.tryUnindexTail(); err != nil {
74+
f.disableForError("tail unindexing", err)
7275
return
7376
} else if !done {
7477
continue
7578
}
76-
if done, err := f.tryUnindexTail(); err != nil {
77-
f.disableForError("tail unindexing", err)
79+
if done, err := f.tryIndexTail(); err != nil {
80+
f.disableForError("tail rendering", err)
7881
return
7982
} else if !done {
8083
continue
@@ -349,25 +352,24 @@ func (f *FilterMaps) tryIndexTail() (bool, error) {
349352
// Note that unindexing is very quick as it only removes continuous ranges of
350353
// data from the database and is also called while running head indexing.
351354
func (f *FilterMaps) tryUnindexTail() (bool, error) {
352-
for {
353-
firstEpoch := (f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch) >> f.logMapsPerEpoch
354-
if f.needTailEpoch(firstEpoch) {
355-
break
356-
}
357-
f.processEvents()
358-
if f.stop {
359-
return false, nil
360-
}
355+
firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch
356+
if f.indexedRange.tailPartialEpoch > 0 && firstEpoch > 0 {
357+
firstEpoch--
358+
}
359+
for epoch := min(firstEpoch, f.cleanedEpochsBefore); !f.needTailEpoch(epoch); epoch++ {
361360
if !f.startedTailUnindex {
362361
f.startedTailUnindexAt = time.Now()
363362
f.startedTailUnindex = true
364363
f.ptrTailUnindexMap = f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch
365364
f.ptrTailUnindexBlock = f.indexedRange.blocks.First() - f.tailPartialBlocks()
366365
}
367-
if err := f.deleteTailEpoch(firstEpoch); err != nil {
368-
log.Error("Log index tail epoch unindexing failed", "error", err)
366+
if done, err := f.deleteTailEpoch(epoch); !done {
369367
return false, err
370368
}
369+
f.processEvents()
370+
if f.stop || !f.targetHeadIndexed() {
371+
return false, nil
372+
}
371373
}
372374
if f.startedTailUnindex && f.indexedRange.hasIndexedBlocks() {
373375
log.Info("Log index tail unindexing finished",

0 commit comments

Comments
 (0)