Skip to content

Commit cbe902d

Browse files
authored
core/filtermaps: fix log indexer init conditions (#31455)
This PR adds an extra condition to the log indexer initialization in order to avoid initializing with block 0 as target head. Previously this caused the indexer to initialize without a checkpoint. Later, when the real chain head was set, it indexed the entire history, then unindexed most of it if only the recent history was supposed to be indexed. Now the init only happens when there is an actual synced chain head and therefore the index is initialized at the most recent checkpoint and only the last year is indexed according to the default parameters. During checkpoint initialization the best available checkpoint is also checked against the history cutoff point and fails if the indexing would have to start from a block older than the cutoff. If initialization fails then the indexer reverts to unindexed mode instead of retrying because the the failure conditions cannot be expected to recover later.
1 parent fd4049d commit cbe902d

File tree

4 files changed

+45
-14
lines changed

4 files changed

+45
-14
lines changed

core/filtermaps/filtermaps.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ type FilterMaps struct {
5353
// This is configured by the --history.logs.disable Geth flag.
5454
// We chose to implement disabling this way because it requires less special
5555
// case logic in eth/filters.
56-
disabled bool
56+
disabled bool
57+
disabledCh chan struct{} // closed by indexer if disabled
5758

5859
closeCh chan struct{}
5960
closeWg sync.WaitGroup
@@ -196,6 +197,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
196197
blockProcessingCh: make(chan bool, 1),
197198
history: config.History,
198199
disabled: config.Disabled,
200+
disabledCh: make(chan struct{}),
199201
exportFileName: config.ExportFileName,
200202
Params: params,
201203
indexedRange: filterMapsRange{
@@ -206,6 +208,8 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
206208
maps: common.NewRange(rs.MapsFirst, rs.MapsAfterLast-rs.MapsFirst),
207209
tailPartialEpoch: rs.TailPartialEpoch,
208210
},
211+
historyCutoff: historyCutoff,
212+
finalBlock: finalBlock,
209213
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
210214
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
211215
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
@@ -278,8 +282,13 @@ func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView {
278282
}
279283

280284
// reset un-initializes the FilterMaps structure and removes all related data from
281-
// the database. The function returns true if everything was successfully removed.
282-
func (f *FilterMaps) reset() bool {
285+
// the database.
286+
// Note that in case of leveldb database the fallback implementation of DeleteRange
287+
// might take a long time to finish and deleting the entire database may be
288+
// interrupted by a shutdown. Deleting the filterMapsRange entry first does
289+
// guarantee though that the next init() will not return successfully until the
290+
// entire database has been cleaned.
291+
func (f *FilterMaps) reset() {
283292
f.indexLock.Lock()
284293
f.indexedRange = filterMapsRange{}
285294
f.indexedView = nil
@@ -292,11 +301,16 @@ func (f *FilterMaps) reset() bool {
292301
// deleting the range first ensures that resetDb will be called again at next
293302
// startup and any leftover data will be removed even if it cannot finish now.
294303
rawdb.DeleteFilterMapsRange(f.db)
295-
return f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
304+
f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
296305
}
297306

298307
// init initializes an empty log index according to the current targetView.
299308
func (f *FilterMaps) init() error {
309+
// ensure that there is no remaining data in the filter maps key range
310+
if !f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") {
311+
return errors.New("could not reset log index database")
312+
}
313+
300314
f.indexLock.Lock()
301315
defer f.indexLock.Unlock()
302316

@@ -317,6 +331,13 @@ func (f *FilterMaps) init() error {
317331
bestIdx, bestLen = idx, max
318332
}
319333
}
334+
var initBlockNumber uint64
335+
if bestLen > 0 {
336+
initBlockNumber = checkpoints[bestIdx][bestLen-1].BlockNumber
337+
}
338+
if initBlockNumber < f.historyCutoff {
339+
return errors.New("cannot start indexing before history cutoff point")
340+
}
320341
batch := f.db.NewBatch()
321342
for epoch := range bestLen {
322343
cp := checkpoints[bestIdx][epoch]

core/filtermaps/indexer.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,25 @@ func (f *FilterMaps) indexerLoop() {
3636

3737
if f.disabled {
3838
f.reset()
39+
close(f.disabledCh)
3940
return
4041
}
4142
log.Info("Started log indexer")
4243

4344
for !f.stop {
4445
if !f.indexedRange.initialized {
45-
if err := f.init(); err != nil {
46-
log.Error("Error initializing log index", "error", err)
47-
// unexpected error; there is not a lot we can do here, maybe it
48-
// recovers, maybe not. Calling event processing here ensures
49-
// that we can still properly shutdown in case of an infinite loop.
46+
if f.targetView.headNumber == 0 {
47+
// initialize when chain head is available
5048
f.processSingleEvent(true)
5149
continue
5250
}
51+
if err := f.init(); err != nil {
52+
log.Error("Error initializing log index; reverting to unindexed mode", "error", err)
53+
f.reset()
54+
f.disabled = true
55+
close(f.disabledCh)
56+
return
57+
}
5358
}
5459
if !f.targetHeadIndexed() {
5560
if !f.tryIndexHead() {

core/filtermaps/matcher_backend.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,6 @@ func (fm *FilterMapsMatcherBackend) synced() {
141141
// range that has not been changed and has been consistent with all states of the
142142
// chain since the previous SyncLogIndex or the creation of the matcher backend.
143143
func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange, error) {
144-
if fm.f.disabled {
145-
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
146-
}
147-
148144
syncCh := make(chan SyncRange, 1)
149145
fm.f.matchersLock.Lock()
150146
fm.syncCh = syncCh
@@ -154,12 +150,16 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
154150
case fm.f.matcherSyncCh <- fm:
155151
case <-ctx.Done():
156152
return SyncRange{}, ctx.Err()
153+
case <-fm.f.disabledCh:
154+
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
157155
}
158156
select {
159157
case vr := <-syncCh:
160158
return vr, nil
161159
case <-ctx.Done():
162160
return SyncRange{}, ctx.Err()
161+
case <-fm.f.disabledCh:
162+
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
163163
}
164164
}
165165

eth/backend.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
241241
}
242242
fmConfig := filtermaps.Config{History: config.LogHistory, Disabled: config.LogNoHistory, ExportFileName: config.LogExportCheckpoints}
243243
chainView := eth.newChainView(eth.blockchain.CurrentBlock())
244-
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, 0, 0, filtermaps.DefaultParams, fmConfig)
244+
historyCutoff := eth.blockchain.HistoryPruningCutoff()
245+
var finalBlock uint64
246+
if fb := eth.blockchain.CurrentFinalBlock(); fb != nil {
247+
finalBlock = fb.Number.Uint64()
248+
}
249+
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig)
245250
eth.closeFilterMaps = make(chan chan struct{})
246251

247252
if config.BlobPool.Datadir != "" {

0 commit comments

Comments
 (0)