Skip to content

Commit 3cb966e

Browse files
authored
Merge pull request #8005 from onflow/leo/refactor-index-result
[Storage] Refactor index execution result
2 parents ab06551 + 28e05fb commit 3cb966e

File tree

85 files changed

+1770
-1111
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+1770
-1111
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2267,6 +2267,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
22672267
node.EngineRegistry,
22682268
node.State,
22692269
node.Me,
2270+
node.StorageLockMgr,
2271+
node.ProtocolDB,
22702272
node.Storage.Blocks,
22712273
node.Storage.Results,
22722274
node.Storage.Receipts,

cmd/util/cmd/execution-state-extract/execution_state_extract_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestExtractExecutionState(t *testing.T) {
6767
blockID := unittest.IdentifierFixture()
6868
stateCommitment := unittest.StateCommitmentFixture()
6969

70-
err := unittest.WithLock(t, lockManager, storage.LockInsertOwnReceipt, func(lctx lockctx.Context) error {
70+
err := unittest.WithLock(t, lockManager, storage.LockIndexStateCommitment, func(lctx lockctx.Context) error {
7171
return storageDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
7272
// Store the state commitment for the block ID
7373
return operation.IndexStateCommitment(lctx, rw, blockID, stateCommitment)
@@ -139,7 +139,7 @@ func TestExtractExecutionState(t *testing.T) {
139139
// generate random block and map it to state commitment
140140
blockID := unittest.IdentifierFixture()
141141

142-
err = unittest.WithLock(t, lockManager, storage.LockInsertOwnReceipt, func(lctx lockctx.Context) error {
142+
err = unittest.WithLock(t, lockManager, storage.LockIndexStateCommitment, func(lctx lockctx.Context) error {
143143
return storageDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
144144
return operation.IndexStateCommitment(lctx, rw, blockID, flow.StateCommitment(stateCommitment))
145145
})

cmd/util/cmd/reindex/cmd/results.go

Lines changed: 0 additions & 56 deletions
This file was deleted.

cmd/util/cmd/reindex/cmd/root.go

Lines changed: 0 additions & 40 deletions
This file was deleted.

cmd/util/cmd/reindex/main.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

cmd/util/cmd/root.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
read_execution_state "github.com/onflow/flow-go/cmd/util/cmd/read-execution-state"
4040
read_hotstuff "github.com/onflow/flow-go/cmd/util/cmd/read-hotstuff/cmd"
4141
read_protocol_state "github.com/onflow/flow-go/cmd/util/cmd/read-protocol-state/cmd"
42-
index_er "github.com/onflow/flow-go/cmd/util/cmd/reindex/cmd"
4342
rollback_executed_height "github.com/onflow/flow-go/cmd/util/cmd/rollback-executed-height/cmd"
4443
run_script "github.com/onflow/flow-go/cmd/util/cmd/run-script"
4544
"github.com/onflow/flow-go/cmd/util/cmd/snapshot"
@@ -111,7 +110,6 @@ func addCommands() {
111110
rootCmd.AddCommand(leaders.Cmd)
112111
rootCmd.AddCommand(epochs.RootCmd)
113112
rootCmd.AddCommand(edbs.RootCmd)
114-
rootCmd.AddCommand(index_er.RootCmd)
115113
rootCmd.AddCommand(rollback_executed_height.Cmd)
116114
rootCmd.AddCommand(read_execution_state.Cmd)
117115
rootCmd.AddCommand(snapshot.Cmd)

engine/access/access_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,7 @@ func (suite *Suite) TestGetBlockByIDAndHeight() {
545545

546546
func (suite *Suite) TestGetExecutionResultByBlockID() {
547547
suite.RunTest(func(handler *rpc.Handler, db storage.DB, all *store.All) {
548+
lockManager := storage.NewTestingLockManager()
548549

549550
// test block1 get by ID
550551
nonexistingID := unittest.IdentifierFixture()
@@ -554,8 +555,15 @@ func (suite *Suite) TestGetExecutionResultByBlockID() {
554555
unittest.WithExecutionResultBlockID(blockID),
555556
unittest.WithServiceEvents(3))
556557

557-
require.NoError(suite.T(), all.Results.Store(er))
558-
require.NoError(suite.T(), all.Results.Index(blockID, er.ID()))
558+
require.NoError(suite.T(), storage.WithLock(lockManager, storage.LockIndexExecutionResult, func(lctx lockctx.Context) error {
559+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
560+
err := all.Results.BatchStore(er, rw)
561+
if err != nil {
562+
return err
563+
}
564+
return all.Results.BatchIndex(lctx, rw, blockID, er.ID()) // requires storage.LockIndexExecutionResult
565+
})
566+
}))
559567

560568
assertResp := func(
561569
resp *accessproto.ExecutionResultForBlockIDResponse,
@@ -627,6 +635,7 @@ func (suite *Suite) TestGetExecutionResultByBlockID() {
627635
// is reported as sealed
628636
func (suite *Suite) TestGetSealedTransaction() {
629637
unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) {
638+
lockManager := storage.NewTestingLockManager()
630639
db := pebbleimpl.ToDB(pdb)
631640
all := store.InitAll(metrics.NewNoopCollector(), db)
632641
enIdentities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
@@ -751,6 +760,8 @@ func (suite *Suite) TestGetSealedTransaction() {
751760
suite.net,
752761
suite.state,
753762
suite.me,
763+
lockManager,
764+
db,
754765
all.Blocks,
755766
all.Results,
756767
all.Receipts,
@@ -841,6 +852,7 @@ func (suite *Suite) TestGetSealedTransaction() {
841852
// transaction ID, block ID, and collection ID.
842853
func (suite *Suite) TestGetTransactionResult() {
843854
unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) {
855+
lockManager := storage.NewTestingLockManager()
844856
db := pebbleimpl.ToDB(pdb)
845857
all := store.InitAll(metrics.NewNoopCollector(), db)
846858
originID := unittest.IdentifierFixture()
@@ -1010,6 +1022,8 @@ func (suite *Suite) TestGetTransactionResult() {
10101022
suite.net,
10111023
suite.state,
10121024
suite.me,
1025+
lockManager,
1026+
db,
10131027
all.Blocks,
10141028
all.Results,
10151029
all.Receipts,
@@ -1172,6 +1186,7 @@ func (suite *Suite) TestGetTransactionResult() {
11721186
// the correct block id
11731187
func (suite *Suite) TestExecuteScript() {
11741188
unittest.RunWithPebbleDB(suite.T(), func(pdb *pebble.DB) {
1189+
lockManager := storage.NewTestingLockManager()
11751190
db := pebbleimpl.ToDB(pdb)
11761191
all := store.InitAll(metrics.NewNoopCollector(), db)
11771192
identities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
@@ -1276,6 +1291,8 @@ func (suite *Suite) TestExecuteScript() {
12761291
suite.net,
12771292
suite.state,
12781293
suite.me,
1294+
lockManager,
1295+
db,
12791296
all.Blocks,
12801297
all.Results,
12811298
all.Receipts,

engine/access/ingestion/engine.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/jordanschalm/lockctx"
78
"github.com/rs/zerolog"
89

910
"github.com/onflow/flow-go/consensus/hotstuff/model"
@@ -58,6 +59,8 @@ type Engine struct {
5859

5960
// storage
6061
// FIX: remove direct DB access by substituting indexer module
62+
db storage.DB
63+
lockManager storage.LockManager
6164
blocks storage.Blocks
6265
executionReceipts storage.ExecutionReceipts
6366
maxReceiptHeight uint64
@@ -83,6 +86,8 @@ func New(
8386
net network.EngineRegistry,
8487
state protocol.State,
8588
me module.Local,
89+
lockManager storage.LockManager,
90+
db storage.DB,
8691
blocks storage.Blocks,
8792
executionResults storage.ExecutionResults,
8893
executionReceipts storage.ExecutionReceipts,
@@ -116,6 +121,8 @@ func New(
116121
log: log.With().Str("engine", "ingestion").Logger(),
117122
state: state,
118123
me: me,
124+
lockManager: lockManager,
125+
db: db,
119126
blocks: blocks,
120127
executionResults: executionResults,
121128
executionReceipts: executionReceipts,
@@ -355,19 +362,29 @@ func (e *Engine) processFinalizedBlock(block *flow.Block) error {
355362
// TODO: substitute an indexer module as layer between engine and storage
356363

357364
// index the block storage with each of the collection guarantee
358-
err := e.blocks.IndexBlockContainingCollectionGuarantees(block.ID(), flow.GetIDs(block.Payload.Guarantees))
365+
err := storage.WithLocks(e.lockManager, storage.LockGroupAccessFinalizingBlock, func(lctx lockctx.Context) error {
366+
return e.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
367+
// requires [storage.LockIndexBlockByPayloadGuarantees] lock
368+
err := e.blocks.BatchIndexBlockContainingCollectionGuarantees(lctx, rw, block.ID(), flow.GetIDs(block.Payload.Guarantees))
369+
if err != nil {
370+
return fmt.Errorf("could not index block for collections: %w", err)
371+
}
372+
373+
// loop through seals and index ID -> result ID
374+
for _, seal := range block.Payload.Seals {
375+
// requires [storage.LockIndexExecutionResult] lock
376+
err := e.executionResults.BatchIndex(lctx, rw, seal.BlockID, seal.ResultID)
377+
if err != nil {
378+
return fmt.Errorf("could not index block for execution result: %w", err)
379+
}
380+
}
381+
return nil
382+
})
383+
})
359384
if err != nil {
360385
return fmt.Errorf("could not index block for collections: %w", err)
361386
}
362387

363-
// loop through seals and index ID -> result ID
364-
for _, seal := range block.Payload.Seals {
365-
err := e.executionResults.Index(seal.BlockID, seal.ResultID)
366-
if err != nil {
367-
return fmt.Errorf("could not index block for execution result: %w", err)
368-
}
369-
}
370-
371388
err = e.collectionSyncer.RequestCollectionsForBlock(block.Height, block.Payload.Guarantees)
372389
if err != nil {
373390
return fmt.Errorf("could not request collections for block: %w", err)

engine/access/ingestion/engine_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ func (s *Suite) SetupTest() {
125125
s.receipts = new(storagemock.ExecutionReceipts)
126126
s.transactions = new(storagemock.Transactions)
127127
s.results = new(storagemock.ExecutionResults)
128+
s.results.On("BatchIndex", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
128129
collectionsToMarkFinalized := stdmap.NewTimes(100)
129130
collectionsToMarkExecuted := stdmap.NewTimes(100)
130131
blocksToMarkExecuted := stdmap.NewTimes(100)
@@ -208,6 +209,8 @@ func (s *Suite) initEngineAndSyncer() (*Engine, *collections.Syncer, *collection
208209
s.net,
209210
s.proto.state,
210211
s.me,
212+
s.lockManager,
213+
s.db,
211214
s.blocks,
212215
s.results,
213216
s.receipts,
@@ -297,7 +300,7 @@ func (s *Suite) TestOnFinalizedBlockSingle() {
297300
}
298301

299302
// expect that the block storage is indexed with each of the collection guarantee
300-
s.blocks.On("IndexBlockContainingCollectionGuarantees", block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once()
303+
s.blocks.On("BatchIndexBlockContainingCollectionGuarantees", mock.Anything, mock.Anything, block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once()
301304
for _, seal := range block.Payload.Seals {
302305
s.results.On("Index", seal.BlockID, seal.ResultID).Return(nil).Once()
303306
}
@@ -324,7 +327,7 @@ func (s *Suite) TestOnFinalizedBlockSingle() {
324327
// assert that the block was retrieved and all collections were requested
325328
s.headers.AssertExpectations(s.T())
326329
s.request.AssertNumberOfCalls(s.T(), "EntityByID", len(block.Payload.Guarantees))
327-
s.results.AssertNumberOfCalls(s.T(), "Index", len(block.Payload.Seals))
330+
s.results.AssertNumberOfCalls(s.T(), "BatchIndex", len(block.Payload.Seals))
328331
}
329332

330333
// TestOnFinalizedBlockSeveralBlocksAhead checks OnFinalizedBlock with a block several blocks newer than the last block processed
@@ -384,7 +387,7 @@ func (s *Suite) TestOnFinalizedBlockSeveralBlocksAhead() {
384387

385388
// expected all new blocks after last block processed
386389
for _, block := range blocks {
387-
s.blocks.On("IndexBlockContainingCollectionGuarantees", block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once()
390+
s.blocks.On("BatchIndexBlockContainingCollectionGuarantees", mock.Anything, mock.Anything, block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).Return(nil).Once()
388391

389392
for _, cg := range block.Payload.Guarantees {
390393
s.request.On("EntityByID", cg.CollectionID, mock.Anything).Return().Run(func(args mock.Arguments) {
@@ -412,9 +415,9 @@ func (s *Suite) TestOnFinalizedBlockSeveralBlocksAhead() {
412415
}
413416

414417
s.headers.AssertExpectations(s.T())
415-
s.blocks.AssertNumberOfCalls(s.T(), "IndexBlockContainingCollectionGuarantees", newBlocksCount)
418+
s.blocks.AssertNumberOfCalls(s.T(), "BatchIndexBlockContainingCollectionGuarantees", newBlocksCount)
416419
s.request.AssertNumberOfCalls(s.T(), "EntityByID", expectedEntityByIDCalls)
417-
s.results.AssertNumberOfCalls(s.T(), "Index", expectedIndexCalls)
420+
s.results.AssertNumberOfCalls(s.T(), "BatchIndex", expectedIndexCalls)
418421
}
419422

420423
// TestExecutionReceiptsAreIndexed checks that execution receipts are properly indexed
@@ -517,9 +520,9 @@ func (s *Suite) TestCollectionSyncing() {
517520

518521
// setup finalized block indexer mocks
519522
guaranteeIDs := []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))
520-
s.blocks.On("IndexBlockContainingCollectionGuarantees", block.ID(), guaranteeIDs).Return(nil).Once()
523+
s.blocks.On("BatchIndexBlockContainingCollectionGuarantees", mock.Anything, mock.Anything, block.ID(), guaranteeIDs).Return(nil).Once()
521524
for _, seal := range payload.Seals {
522-
s.results.On("Index", seal.BlockID, seal.ResultID).Return(nil).Once()
525+
s.results.On("BatchIndex", mock.Anything, mock.Anything, seal.BlockID, seal.ResultID).Return(nil).Once()
523526
}
524527

525528
// initialize the engine using the initial finalized block

0 commit comments

Comments
 (0)