Skip to content

Commit 9fc2bbe

Browse files
authored
core/filtermaps: allow log search while head indexing (ethereum#31429)
This PR changes the matcher syncing conditions so that it is possible to run a search while head indexing is in progress. Previously it was a requirement to have the head indexed in order to perform matcher sync before and after a search. This was unnecessarily strict as the purpose was just to avoid syncing the valid range with the temporary shortened indexed range applied while updating existing head maps. Now the sync condition explicitly checks whether the indexer has a temporary indexed range with some head maps being partially updated. It also fixes a deadlock that happened when matcher synchronization was attempted in the event handler called from the `writeFinishedMaps` periodical callback.
1 parent 0a8f41e commit 9fc2bbe

File tree

4 files changed

+22
-12
lines changed

4 files changed

+22
-12
lines changed

core/filtermaps/filtermaps.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type FilterMaps struct {
7070
indexLock sync.RWMutex
7171
indexedRange filterMapsRange
7272
indexedView *ChainView // always consistent with the log index
73+
hasTempRange bool
7374

7475
// also accessed by indexer and matcher backend but no locking needed.
7576
filterMapCache *lru.Cache[uint32, filterMap]
@@ -94,7 +95,7 @@ type FilterMaps struct {
9495
ptrTailUnindexMap uint32
9596

9697
targetView *ChainView
97-
matcherSyncRequest *FilterMapsMatcherBackend
98+
matcherSyncRequests []*FilterMapsMatcherBackend
9899
historyCutoff uint64
99100
finalBlock, lastFinal uint64
100101
lastFinalEpoch uint32
@@ -330,7 +331,7 @@ func (f *FilterMaps) init() error {
330331
fmr.blocks = common.NewRange(cp.BlockNumber+1, 0)
331332
fmr.maps = common.NewRange(uint32(bestLen)<<f.logMapsPerEpoch, 0)
332333
}
333-
f.setRange(batch, f.targetView, fmr)
334+
f.setRange(batch, f.targetView, fmr, false)
334335
return batch.Write()
335336
}
336337

@@ -373,9 +374,10 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
373374
// setRange updates the indexed chain view and covered range and also adds the
374375
// changes to the given batch.
375376
// Note that this function assumes that the index write lock is being held.
376-
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, newRange filterMapsRange) {
377+
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, newRange filterMapsRange, isTempRange bool) {
377378
f.indexedView = newView
378379
f.indexedRange = newRange
380+
f.hasTempRange = isTempRange
379381
f.updateMatchersValidRange()
380382
if newRange.initialized {
381383
rs := rawdb.FilterMapsRange{
@@ -666,7 +668,7 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) error {
666668
} else {
667669
return errors.New("invalid tail epoch number")
668670
}
669-
f.setRange(f.db, f.indexedView, fmr)
671+
f.setRange(f.db, f.indexedView, fmr, false)
670672
first := f.mapRowIndex(firstMap, 0)
671673
count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first
672674
rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count))

core/filtermaps/indexer.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,18 @@ func (f *FilterMaps) processEvents() {
136136
// processSingleEvent processes a single event either in a blocking or
137137
// non-blocking manner.
138138
func (f *FilterMaps) processSingleEvent(blocking bool) bool {
139-
if f.matcherSyncRequest != nil && f.targetHeadIndexed() {
140-
f.matcherSyncRequest.synced()
141-
f.matcherSyncRequest = nil
139+
if !f.hasTempRange {
140+
for _, mb := range f.matcherSyncRequests {
141+
mb.synced()
142+
}
143+
f.matcherSyncRequests = nil
142144
}
143145
if blocking {
144146
select {
145147
case target := <-f.targetCh:
146148
f.setTarget(target)
147-
case f.matcherSyncRequest = <-f.matcherSyncCh:
149+
case mb := <-f.matcherSyncCh:
150+
f.matcherSyncRequests = append(f.matcherSyncRequests, mb)
148151
case f.blockProcessing = <-f.blockProcessingCh:
149152
case <-f.closeCh:
150153
f.stop = true
@@ -160,7 +163,8 @@ func (f *FilterMaps) processSingleEvent(blocking bool) bool {
160163
select {
161164
case target := <-f.targetCh:
162165
f.setTarget(target)
163-
case f.matcherSyncRequest = <-f.matcherSyncCh:
166+
case mb := <-f.matcherSyncCh:
167+
f.matcherSyncRequests = append(f.matcherSyncRequests, mb)
164168
case f.blockProcessing = <-f.blockProcessingCh:
165169
case <-f.closeCh:
166170
f.stop = true

core/filtermaps/map_renderer.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,12 +392,16 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
392392
}
393393
// do not exit while in partially written state but do allow processing
394394
// events and pausing while block processing is in progress
395+
r.f.indexLock.Unlock()
395396
pauseCb()
397+
r.f.indexLock.Lock()
396398
batch = r.f.db.NewBatch()
397399
}
398400
}
399401

400-
r.f.setRange(batch, r.f.indexedView, tempRange)
402+
if tempRange != r.f.indexedRange {
403+
r.f.setRange(batch, r.f.indexedView, tempRange, true)
404+
}
401405
// add or update filter rows
402406
for rowIndex := uint32(0); rowIndex < r.f.mapHeight; rowIndex++ {
403407
var (
@@ -469,7 +473,7 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
469473
}
470474
r.finishedMaps = make(map[uint32]*renderedMap)
471475
r.finished.SetFirst(r.finished.AfterLast())
472-
r.f.setRange(batch, renderedView, newRange)
476+
r.f.setRange(batch, renderedView, newRange, false)
473477
if err := batch.Write(); err != nil {
474478
log.Crit("Error writing log index update batch", "error", err)
475479
}

core/filtermaps/matcher_backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (fm *FilterMapsMatcherBackend) synced() {
125125
indexedBlocks.SetAfterLast(indexedBlocks.Last()) // remove partially indexed last block
126126
}
127127
fm.syncCh <- SyncRange{
128-
HeadNumber: fm.f.indexedView.headNumber,
128+
HeadNumber: fm.f.targetView.headNumber,
129129
ValidBlocks: fm.validBlocks,
130130
IndexedBlocks: indexedBlocks,
131131
}

0 commit comments

Comments
 (0)