@@ -29,7 +29,6 @@ import (
2929 "github.com/ethereum/go-ethereum/core/rawdb"
3030 "github.com/ethereum/go-ethereum/core/types"
3131 "github.com/ethereum/go-ethereum/ethdb"
32- "github.com/ethereum/go-ethereum/ethdb/leveldb"
3332 "github.com/ethereum/go-ethereum/log"
3433)
3534
@@ -59,6 +58,7 @@ type FilterMaps struct {
5958 closeCh chan struct {}
6059 closeWg sync.WaitGroup
6160 history uint64
61+ hashScheme bool // use hashdb-safe delete range method
6262 exportFileName string
6363 Params
6464
@@ -67,10 +67,11 @@ type FilterMaps struct {
6767 // fields written by the indexer and read by matcher backend. Indexer can
6868 // read them without a lock and write them under indexLock write lock.
6969 // Matcher backend can read them under indexLock read lock.
70- indexLock sync.RWMutex
71- indexedRange filterMapsRange
72- indexedView * ChainView // always consistent with the log index
73- hasTempRange bool
70+ indexLock sync.RWMutex
71+ indexedRange filterMapsRange
72+ cleanedEpochsBefore uint32 // all unindexed data cleaned before this point
73+ indexedView * ChainView // always consistent with the log index
74+ hasTempRange bool
7475
7576 // also accessed by indexer and matcher backend but no locking needed.
7677 filterMapCache * lru.Cache [uint32 , filterMap ]
@@ -180,6 +181,10 @@ type Config struct {
180181 // This option enables the checkpoint JSON file generator.
181182 // If set, the given file will be updated with checkpoint information.
182183 ExportFileName string
184+
185+ // expect trie nodes of hash based state scheme in the filtermaps key range;
186+ // use safe iterator based implementation of DeleteRange that skips them
187+ HashScheme bool
183188}
184189
185190// NewFilterMaps creates a new FilterMaps and starts the indexer.
@@ -197,6 +202,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
197202 blockProcessingCh : make (chan bool , 1 ),
198203 history : config .History ,
199204 disabled : config .Disabled ,
205+ hashScheme : config .HashScheme ,
200206 disabledCh : make (chan struct {}),
201207 exportFileName : config .ExportFileName ,
202208 Params : params ,
@@ -208,15 +214,17 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
208214 maps : common .NewRange (rs .MapsFirst , rs .MapsAfterLast - rs .MapsFirst ),
209215 tailPartialEpoch : rs .TailPartialEpoch ,
210216 },
211- historyCutoff : historyCutoff ,
212- finalBlock : finalBlock ,
213- matcherSyncCh : make (chan * FilterMapsMatcherBackend ),
214- matchers : make (map [* FilterMapsMatcherBackend ]struct {}),
215- filterMapCache : lru.NewCache [uint32 , filterMap ](cachedFilterMaps ),
216- lastBlockCache : lru.NewCache [uint32 , lastBlockOfMap ](cachedLastBlocks ),
217- lvPointerCache : lru.NewCache [uint64 , uint64 ](cachedLvPointers ),
218- baseRowsCache : lru.NewCache [uint64 , [][]uint32 ](cachedBaseRows ),
219- renderSnapshots : lru.NewCache [uint64 , * renderedMap ](cachedRenderSnapshots ),
217+ // deleting last unindexed epoch might have been interrupted by shutdown
218+ cleanedEpochsBefore : max (rs .MapsFirst >> params .logMapsPerEpoch , 1 ) - 1 ,
219+ historyCutoff : historyCutoff ,
220+ finalBlock : finalBlock ,
221+ matcherSyncCh : make (chan * FilterMapsMatcherBackend ),
222+ matchers : make (map [* FilterMapsMatcherBackend ]struct {}),
223+ filterMapCache : lru.NewCache [uint32 , filterMap ](cachedFilterMaps ),
224+ lastBlockCache : lru.NewCache [uint32 , lastBlockOfMap ](cachedLastBlocks ),
225+ lvPointerCache : lru.NewCache [uint64 , uint64 ](cachedLvPointers ),
226+ baseRowsCache : lru.NewCache [uint64 , [][]uint32 ](cachedBaseRows ),
227+ renderSnapshots : lru.NewCache [uint64 , * renderedMap ](cachedRenderSnapshots ),
220228 }
221229
222230 // Set initial indexer target.
@@ -301,14 +309,24 @@ func (f *FilterMaps) reset() {
301309 // deleting the range first ensures that resetDb will be called again at next
302310 // startup and any leftover data will be removed even if it cannot finish now.
303311 rawdb .DeleteFilterMapsRange (f .db )
304- f .safeDeleteRange (rawdb .DeleteFilterMapsDb , "Resetting log index database" )
312+ f .safeDeleteWithLogs (rawdb .DeleteFilterMapsDb , "Resetting log index database" , f .isShuttingDown )
313+ }
314+
315+ // isShuttingDown returns true if FilterMaps is shutting down.
316+ func (f * FilterMaps ) isShuttingDown () bool {
317+ select {
318+ case <- f .closeCh :
319+ return true
320+ default :
321+ return false
322+ }
305323}
306324
307325// init initializes an empty log index according to the current targetView.
308326func (f * FilterMaps ) init () error {
309327 // ensure that there is no remaining data in the filter maps key range
310- if ! f . safeDeleteRange (rawdb .DeleteFilterMapsDb , "Resetting log index database" ) {
311- return errors . New ( "could not reset log index database" )
328+ if err := f . safeDeleteWithLogs (rawdb .DeleteFilterMapsDb , "Resetting log index database" , f . isShuttingDown ); err != nil {
329+ return err
312330 }
313331
314332 f .indexLock .Lock ()
@@ -358,38 +376,37 @@ func (f *FilterMaps) init() error {
358376
359377// removeBloomBits removes old bloom bits data from the database.
360378func (f * FilterMaps ) removeBloomBits () {
361- f .safeDeleteRange (rawdb .DeleteBloomBitsDb , "Removing old bloom bits database" )
379+ f .safeDeleteWithLogs (rawdb .DeleteBloomBitsDb , "Removing old bloom bits database" , f . isShuttingDown )
362380 f .closeWg .Done ()
363381}
364382
365- // safeDeleteRange calls the specified database range deleter function
366- // repeatedly as long as it returns leveldb.ErrTooManyKeys.
367- // This wrapper is necessary because of the leveldb fallback implementation
368- // of DeleteRange.
369- func (f * FilterMaps ) safeDeleteRange (removeFn func (ethdb.KeyValueRangeDeleter ) error , action string ) bool {
370- start := time .Now ()
371- var retry bool
372- for {
373- err := removeFn (f .db )
374- if err == nil {
375- if retry {
376- log .Info (action + " finished" , "elapsed" , time .Since (start ))
377- }
378- return true
379- }
380- if err != leveldb .ErrTooManyKeys {
381- log .Error (action + " failed" , "error" , err )
382- return false
383+ // safeDeleteWithLogs is a wrapper for a function that performs a safe range
384+ // delete operation using rawdb.SafeDeleteRange. It emits log messages if the
385+ // process takes long enough to call the stop callback.
386+ func (f * FilterMaps ) safeDeleteWithLogs (deleteFn func (db ethdb.KeyValueStore , hashScheme bool , stopCb func (bool ) bool ) error , action string , stopCb func () bool ) error {
387+ var (
388+ start = time .Now ()
389+ logPrinted bool
390+ lastLogPrinted = start
391+ )
392+ switch err := deleteFn (f .db , f .hashScheme , func (deleted bool ) bool {
393+ if deleted && ! logPrinted || time .Since (lastLogPrinted ) > time .Second * 10 {
394+ log .Info (action + " in progress..." , "elapsed" , common .PrettyDuration (time .Since (start )))
395+ logPrinted , lastLogPrinted = true , time .Now ()
383396 }
384- select {
385- case <- f .closeCh :
386- return false
387- default :
388- }
389- if ! retry {
390- log .Info (action + " in progress..." , "elapsed" , time .Since (start ))
391- retry = true
397+ return stopCb ()
398+ }); {
399+ case err == nil :
400+ if logPrinted {
401+ log .Info (action + " finished" , "elapsed" , common .PrettyDuration (time .Since (start )))
392402 }
403+ return nil
404+ case errors .Is (err , rawdb .ErrDeleteRangeInterrupted ):
405+ log .Warn (action + " interrupted" , "elapsed" , common .PrettyDuration (time .Since (start )))
406+ return err
407+ default :
408+ log .Error (action + " failed" , "error" , err )
409+ return err
393410 }
394411}
395412
@@ -658,54 +675,97 @@ func (f *FilterMaps) deleteLastBlockOfMap(batch ethdb.Batch, mapIndex uint32) {
658675 rawdb .DeleteFilterMapLastBlock (batch , mapIndex )
659676}
660677
661- // deleteTailEpoch deletes index data from the earliest, either fully or partially
662- // indexed epoch. The last block pointer for the last map of the epoch and the
663- // corresponding block log value pointer are retained as these are always assumed
664- // to be available for each epoch.
665- func (f * FilterMaps ) deleteTailEpoch (epoch uint32 ) error {
678+ // deleteTailEpoch deletes index data from the specified epoch. The last block
679+ // pointer for the last map of the epoch and the corresponding block log value
680+ // pointer are retained as these are always assumed to be available for each
681+ // epoch as boundary markers.
682+ // The function returns true if all index data related to the epoch (except for
683+ // the boundary markers) has been fully removed.
684+ func (f * FilterMaps ) deleteTailEpoch (epoch uint32 ) (bool , error ) {
666685 f .indexLock .Lock ()
667686 defer f .indexLock .Unlock ()
668687
688+ // determine epoch boundaries
669689 firstMap := epoch << f .logMapsPerEpoch
670690 lastBlock , _ , err := f .getLastBlockOfMap (firstMap + f .mapsPerEpoch - 1 )
671691 if err != nil {
672- return fmt .Errorf ("failed to retrieve last block of deleted epoch %d: %v" , epoch , err )
692+ return false , fmt .Errorf ("failed to retrieve last block of deleted epoch %d: %v" , epoch , err )
673693 }
674694 var firstBlock uint64
675695 if epoch > 0 {
676696 firstBlock , _ , err = f .getLastBlockOfMap (firstMap - 1 )
677697 if err != nil {
678- return fmt .Errorf ("failed to retrieve last block before deleted epoch %d: %v" , epoch , err )
698+ return false , fmt .Errorf ("failed to retrieve last block before deleted epoch %d: %v" , epoch , err )
679699 }
680700 firstBlock ++
681701 }
682- fmr := f .indexedRange
683- if f .indexedRange .maps .First () == firstMap &&
684- f .indexedRange .maps .AfterLast () > firstMap + f .mapsPerEpoch &&
685- f .indexedRange .tailPartialEpoch == 0 {
686- fmr .maps .SetFirst (firstMap + f .mapsPerEpoch )
687- fmr .blocks .SetFirst (lastBlock + 1 )
688- } else if f .indexedRange .maps .First () == firstMap + f .mapsPerEpoch {
702+ // update rendered range if necessary
703+ var (
704+ fmr = f .indexedRange
705+ firstEpoch = f .indexedRange .maps .First () >> f .logMapsPerEpoch
706+ afterLastEpoch = (f .indexedRange .maps .AfterLast () + f .mapsPerEpoch - 1 ) >> f .logMapsPerEpoch
707+ )
708+ if f .indexedRange .tailPartialEpoch != 0 && firstEpoch > 0 {
709+ firstEpoch --
710+ }
711+ switch {
712+ case epoch < firstEpoch :
713+ // cleanup of already unindexed epoch; range not affected
714+ case epoch == firstEpoch && epoch + 1 < afterLastEpoch :
715+ // first fully or partially rendered epoch and there is at least one
716+ // rendered map in the next epoch; remove from indexed range
689717 fmr .tailPartialEpoch = 0
718+ fmr .maps .SetFirst ((epoch + 1 ) << f .logMapsPerEpoch )
719+ fmr .blocks .SetFirst (lastBlock + 1 )
720+ f .setRange (f .db , f .indexedView , fmr , false )
721+ default :
722+ // cannot be cleaned or unindexed; return with error
723+ return false , errors .New ("invalid tail epoch number" )
724+ }
725+ // remove index data
726+ if err := f .safeDeleteWithLogs (func (db ethdb.KeyValueStore , hashScheme bool , stopCb func (bool ) bool ) error {
727+ first := f .mapRowIndex (firstMap , 0 )
728+ count := f .mapRowIndex (firstMap + f .mapsPerEpoch , 0 ) - first
729+ if err := rawdb .DeleteFilterMapRows (f .db , common .NewRange (first , count ), hashScheme , stopCb ); err != nil {
730+ return err
731+ }
732+ for mapIndex := firstMap ; mapIndex < firstMap + f .mapsPerEpoch ; mapIndex ++ {
733+ f .filterMapCache .Remove (mapIndex )
734+ }
735+ delMapRange := common .NewRange (firstMap , f .mapsPerEpoch - 1 ) // keep last entry
736+ if err := rawdb .DeleteFilterMapLastBlocks (f .db , delMapRange , hashScheme , stopCb ); err != nil {
737+ return err
738+ }
739+ for mapIndex := firstMap ; mapIndex < firstMap + f .mapsPerEpoch - 1 ; mapIndex ++ {
740+ f .lastBlockCache .Remove (mapIndex )
741+ }
742+ delBlockRange := common .NewRange (firstBlock , lastBlock - firstBlock ) // keep last entry
743+ if err := rawdb .DeleteBlockLvPointers (f .db , delBlockRange , hashScheme , stopCb ); err != nil {
744+ return err
745+ }
746+ for blockNumber := firstBlock ; blockNumber < lastBlock ; blockNumber ++ {
747+ f .lvPointerCache .Remove (blockNumber )
748+ }
749+ return nil
750+ }, fmt .Sprintf ("Deleting tail epoch #%d" , epoch ), func () bool {
751+ f .processEvents ()
752+ return f .stop || ! f .targetHeadIndexed ()
753+ }); err == nil {
754+ // everything removed; mark as cleaned and report success
755+ if f .cleanedEpochsBefore == epoch {
756+ f .cleanedEpochsBefore = epoch + 1
757+ }
758+ return true , nil
690759 } else {
691- return errors .New ("invalid tail epoch number" )
692- }
693- f .setRange (f .db , f .indexedView , fmr , false )
694- first := f .mapRowIndex (firstMap , 0 )
695- count := f .mapRowIndex (firstMap + f .mapsPerEpoch , 0 ) - first
696- rawdb .DeleteFilterMapRows (f .db , common .NewRange (first , count ))
697- for mapIndex := firstMap ; mapIndex < firstMap + f .mapsPerEpoch ; mapIndex ++ {
698- f .filterMapCache .Remove (mapIndex )
699- }
700- rawdb .DeleteFilterMapLastBlocks (f .db , common .NewRange (firstMap , f .mapsPerEpoch - 1 )) // keep last enrty
701- for mapIndex := firstMap ; mapIndex < firstMap + f .mapsPerEpoch - 1 ; mapIndex ++ {
702- f .lastBlockCache .Remove (mapIndex )
703- }
704- rawdb .DeleteBlockLvPointers (f .db , common .NewRange (firstBlock , lastBlock - firstBlock )) // keep last enrty
705- for blockNumber := firstBlock ; blockNumber < lastBlock ; blockNumber ++ {
706- f .lvPointerCache .Remove (blockNumber )
760+ // more data left in epoch range; mark as dirty and report unfinished
761+ if f .cleanedEpochsBefore > epoch {
762+ f .cleanedEpochsBefore = epoch
763+ }
764+ if errors .Is (err , rawdb .ErrDeleteRangeInterrupted ) {
765+ return false , nil
766+ }
767+ return false , err
707768 }
708- return nil
709769}
710770
711771// exportCheckpoints exports epoch checkpoints in the format used by checkpoints.go.
0 commit comments