Skip to content

Commit fe062d1

Browse files
authored
Merge pull request #8165 from onflow/peter/fix-access-reindex
[Access] Allow reindexing last block's protocol data
2 parents ae50cb4 + c0a8871 commit fe062d1

File tree

4 files changed

+75
-16
lines changed

4 files changed

+75
-16
lines changed

module/state_synchronization/indexer/indexer_core.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,9 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
145145
return fmt.Errorf("must index block data with the next height %d, but got %d", latest+1, header.Height)
146146
}
147147

148-
// allow rerunning the indexer for same height since we are fetching height from register storage, but there are other storages
149-
// for indexing resources which might fail to update the values, so this enables rerunning and reindexing those resources
148+
// Data for the block is stored into both the protocol and registers databases. This creates a
149+
// race condition where it's possible only one completes if the node crashes at an inopportune time.
150+
// In this case, allow reindexing the last block. Both databases should treat this as a no-op.
150151
if header.Height == latest {
151152
lg.Warn().Msg("reindexing block data")
152153
c.metrics.BlockReindexed()
@@ -183,7 +184,6 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
183184
return fmt.Errorf("could not index events at height %d: %w", header.Height, err)
184185
}
185186

186-
// requires the [storage.LockInsertLightTransactionResult] lock
187187
err = c.results.BatchStore(lctx, rw, data.BlockID, results)
188188
if err != nil {
189189
return fmt.Errorf("could not index transaction results at height %d: %w", header.Height, err)
@@ -201,6 +201,10 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
201201
})
202202

203203
if err != nil {
204+
if errors.Is(err, storage.ErrAlreadyExists) {
205+
// Since reindexing is a no-op, return early without an error
206+
return nil
207+
}
204208
return fmt.Errorf("could not commit block data: %w", err)
205209
}
206210

module/state_synchronization/indexer/indexer_core_test.go

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/rs/zerolog"
1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/mock"
13-
mocks "github.com/stretchr/testify/mock"
1413
"github.com/stretchr/testify/require"
1514

1615
collectionsmock "github.com/onflow/flow-go/engine/access/ingestion/collections/mock"
@@ -31,6 +30,7 @@ import (
3130
pebbleStorage "github.com/onflow/flow-go/storage/pebble"
3231
"github.com/onflow/flow-go/utils/unittest"
3332
"github.com/onflow/flow-go/utils/unittest/fixtures"
33+
"github.com/onflow/flow-go/utils/unittest/mocks"
3434
)
3535

3636
type indexCoreTest struct {
@@ -80,7 +80,7 @@ func newIndexCoreTest(
8080

8181
func (i *indexCoreTest) useDefaultBlockByHeight() *indexCoreTest {
8282
i.headers.
83-
On("BlockIDByHeight", mocks.AnythingOfType("uint64")).
83+
On("BlockIDByHeight", mock.AnythingOfType("uint64")).
8484
Return(func(height uint64) (flow.Identifier, error) {
8585
for _, b := range i.blocks {
8686
if b.Height == height {
@@ -91,7 +91,7 @@ func (i *indexCoreTest) useDefaultBlockByHeight() *indexCoreTest {
9191
})
9292

9393
i.headers.
94-
On("ByHeight", mocks.AnythingOfType("uint64")).
94+
On("ByHeight", mock.AnythingOfType("uint64")).
9595
Return(func(height uint64) (*flow.Header, error) {
9696
for _, b := range i.blocks {
9797
if b.Height == height {
@@ -268,17 +268,15 @@ func TestExecutionState_IndexBlockData(t *testing.T) {
268268
t.Run("Index AllTheThings", func(t *testing.T) {
269269
test := newIndexCoreTest(t, g, blocks, tf.ExecutionDataEntity()).initIndexer()
270270

271-
test.events.On("BatchStore",
272-
mock.MatchedBy(func(lctx lockctx.Proof) bool { return lctx.HoldsLock(storage.LockInsertEvent) }),
273-
blockID, []flow.EventsList{tf.ExpectedEvents}, mock.Anything).
271+
test.events.
272+
On("BatchStore", mocks.MatchLock(storage.LockInsertEvent), blockID, []flow.EventsList{tf.ExpectedEvents}, mock.Anything).
274273
Return(func(lctx lockctx.Proof, blockID flow.Identifier, events []flow.EventsList, batch storage.ReaderBatchWriter) error {
275274
require.True(t, lctx.HoldsLock(storage.LockInsertEvent))
276275
require.NotNil(t, batch)
277276
return nil
278277
})
279-
test.results.On("BatchStore",
280-
mock.MatchedBy(func(lctx lockctx.Proof) bool { return lctx.HoldsLock(storage.LockInsertLightTransactionResult) }),
281-
mock.Anything, blockID, tf.ExpectedResults).
278+
test.results.
279+
On("BatchStore", mocks.MatchLock(storage.LockInsertLightTransactionResult), mock.Anything, blockID, tf.ExpectedResults).
282280
Return(func(lctx lockctx.Proof, batch storage.ReaderBatchWriter, blockID flow.Identifier, results []flow.LightTransactionResult) error {
283281
require.True(t, lctx.HoldsLock(storage.LockInsertLightTransactionResult))
284282
require.NotNil(t, batch)
@@ -294,9 +292,8 @@ func TestExecutionState_IndexBlockData(t *testing.T) {
294292
Return(nil)
295293
test.collectionIndexer.On("IndexCollections", tf.ExpectedCollections).Return(nil).Once()
296294
for txID, scheduledTxID := range tf.ExpectedScheduledTransactions {
297-
test.scheduledTransactions.On("BatchIndex",
298-
mock.MatchedBy(func(lctx lockctx.Proof) bool { return lctx.HoldsLock(storage.LockIndexScheduledTransaction) }),
299-
blockID, txID, scheduledTxID, mock.Anything).
295+
test.scheduledTransactions.
296+
On("BatchIndex", mocks.MatchLock(storage.LockIndexScheduledTransaction), blockID, txID, scheduledTxID, mock.Anything).
300297
Return(func(lctx lockctx.Proof, blockID flow.Identifier, txID flow.Identifier, scheduledTxID uint64, batch storage.ReaderBatchWriter) error {
301298
require.True(t, lctx.HoldsLock(storage.LockIndexScheduledTransaction))
302299
require.NotNil(t, batch)
@@ -343,6 +340,39 @@ func TestExecutionState_IndexBlockData(t *testing.T) {
343340
assert.ErrorIs(t, err, storage.ErrNotFound)
344341
})
345342

343+
// test that reindexing the last block does not return an error and does not write any data
344+
t.Run("Reindexing last block", func(t *testing.T) {
345+
test := newIndexCoreTest(t, g, blocks, tf.ExecutionDataEntity()).
346+
initIndexer().
347+
setLastHeight(func(t *testing.T) uint64 {
348+
return tf.Block.Height
349+
})
350+
351+
// reset all mocks to avoid false positives
352+
test.events.On("BatchStore", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset()
353+
test.results.On("BatchStore", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset()
354+
test.scheduledTransactions.On("BatchIndex", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset()
355+
test.registers.On("Store", mock.Anything, mock.Anything).Unset()
356+
test.collectionIndexer.On("IndexCollections", mock.Anything).Unset()
357+
358+
// setup mocks to behave as they would if the block was already indexed.
359+
// tx results and scheduled transactions will not be called since events returned an error.
360+
test.events.
361+
On("BatchStore", mocks.MatchLock(storage.LockInsertEvent), blockID, []flow.EventsList{tf.ExpectedEvents}, mock.Anything).
362+
Return(storage.ErrAlreadyExists).
363+
Once()
364+
test.collectionIndexer.
365+
On("IndexCollections", tf.ExpectedCollections).
366+
Return(nil).
367+
Once()
368+
test.registers.
369+
On("Store", mock.Anything, tf.Block.Height).
370+
Return(nil).
371+
Once()
372+
373+
err := test.indexer.IndexBlockData(tf.ExecutionDataEntity())
374+
assert.NoError(t, err)
375+
})
346376
}
347377

348378
func TestExecutionState_RegisterValues(t *testing.T) {

storage/light_transaction_results.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type LightTransactionResultsReader interface {
2222

2323
// ByBlockID gets all transaction results for a block, ordered by transaction index
2424
// CAUTION: this function returns the empty list in case for block IDs without known results.
25+
//
2526
// No error returns are expected during normal operations.
2627
ByBlockID(id flow.Identifier) ([]flow.LightTransactionResult, error)
2728
}
@@ -33,6 +34,8 @@ type LightTransactionResults interface {
3334
// BatchStore persists and indexes all transaction results (light representation) for the given blockID
3435
// as part of the provided batch. The caller must acquire [storage.LockInsertLightTransactionResult] and
3536
// hold it until the write batch has been committed.
36-
// It returns [storage.ErrAlreadyExists] if light transaction results for the block already exist.
37+
//
38+
// Expected error returns during normal operation:
39+
// - [storage.ErrAlreadyExists] if light transaction results for the block already exist.
3740
BatchStore(lctx lockctx.Proof, rw ReaderBatchWriter, blockID flow.Identifier, transactionResults []flow.LightTransactionResult) error
3841
}

utils/unittest/mocks/matchers.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package mocks
2+
3+
import (
4+
"github.com/jordanschalm/lockctx"
5+
"github.com/stretchr/testify/mock"
6+
)
7+
8+
// MatchLock returns an argument matcher that checks if the argument is a `lockctx.Proof` that holds
9+
// the provided lock.
10+
//
11+
// Example:
12+
//
13+
// events.
14+
// On("BatchStore", mocks.MatchLock(storage.LockInsertEvent), blockID, expectedEvents, mock.Anything).
15+
// Return(func(lctx lockctx.Proof, blockID flow.Identifier, events []flow.EventsList, batch storage.ReaderBatchWriter) error {
16+
// require.NotNil(t, batch)
17+
// return nil
18+
// }).
19+
// Once()
20+
func MatchLock(lock string) interface{} {
21+
return mock.MatchedBy(func(lctx lockctx.Proof) bool { return lctx.HoldsLock(lock) })
22+
}

0 commit comments

Comments
 (0)