Skip to content

Commit a745d4e

Browse files
authored
Merge pull request #8021 from onflow/leo/refactor-index-tx-err-msg
[Storage] Refactor indexing transaction error message
2 parents 3510962 + b7918aa commit a745d4e

27 files changed

+706
-250
lines changed

admin/commands/storage/backfill_tx_error_messages_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func TestBackfillTxErrorMessages(t *testing.T) {
7777
func (suite *BackfillTxErrorMessagesSuite) SetupTest() {
7878
suite.log = zerolog.New(os.Stderr)
7979

80+
lockManager := storage.NewTestingLockManager()
8081
suite.state = new(protocolmock.State)
8182
suite.headers = new(storagemock.Headers)
8283
suite.receipts = new(storagemock.ExecutionReceipts)
@@ -160,6 +161,7 @@ func (suite *BackfillTxErrorMessagesSuite) SetupTest() {
160161
errorMessageProvider,
161162
suite.txErrorMessages,
162163
executionNodeIdentitiesProvider,
164+
lockManager,
163165
)
164166

165167
suite.command = NewBackfillTxErrorMessagesCommand(
@@ -532,7 +534,7 @@ func (suite *BackfillTxErrorMessagesSuite) mockStoreTxErrorMessages(
532534
}
533535
}
534536

535-
suite.txErrorMessages.On("Store", blockID, txErrorMessages).Return(nil).Once()
537+
suite.txErrorMessages.On("Store", mock.Anything, blockID, txErrorMessages).Return(nil).Once()
536538
}
537539

538540
// assertAllExpectations asserts that all the expectations set on various mocks are met,

cmd/access/node_builder/access_node_builder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2217,6 +2217,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
22172217
notNil(builder.txResultErrorMessageProvider),
22182218
builder.transactionResultErrorMessages,
22192219
notNil(builder.ExecNodeIdentitiesProvider),
2220+
node.StorageLockMgr,
22202221
)
22212222
}
22222223

engine/access/ingestion/tx_error_messages/tx_error_messages_core.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/jordanschalm/lockctx"
78
"github.com/rs/zerolog"
89

910
"github.com/onflow/flow-go/engine/access/rpc/backend/transactions/error_messages"
@@ -24,6 +25,7 @@ type TxErrorMessagesCore struct {
2425
txErrorMessageProvider error_messages.Provider
2526
transactionResultErrorMessages storage.TransactionResultErrorMessages
2627
execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider
28+
lockManager storage.LockManager
2729
}
2830

2931
// NewTxErrorMessagesCore creates a new instance of TxErrorMessagesCore.
@@ -32,12 +34,14 @@ func NewTxErrorMessagesCore(
3234
txErrorMessageProvider error_messages.Provider,
3335
transactionResultErrorMessages storage.TransactionResultErrorMessages,
3436
execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider,
37+
lockManager storage.LockManager,
3538
) *TxErrorMessagesCore {
3639
return &TxErrorMessagesCore{
3740
log: log.With().Str("module", "tx_error_messages_core").Logger(),
3841
txErrorMessageProvider: txErrorMessageProvider,
3942
transactionResultErrorMessages: transactionResultErrorMessages,
4043
execNodeIdentitiesProvider: execNodeIdentitiesProvider,
44+
lockManager: lockManager,
4145
}
4246
}
4347

@@ -62,6 +66,15 @@ func (c *TxErrorMessagesCore) FetchErrorMessages(ctx context.Context, blockID fl
6266
return c.FetchErrorMessagesByENs(ctx, blockID, execNodes)
6367
}
6468

69+
// FetchErrorMessagesByENs requests the transaction result error messages for the specified block ID from
70+
// any of the given execution nodes and persists them once retrieved. This function blocks until ingesting
71+
// the tx error messages is completed or failed.
72+
//
73+
// Note that transaction error messages are auxiliary data provided by the Execution Nodes on a goodwill basis and
74+
// not protected by the protocol. Execution Error messages might be non-deterministic, i.e. potentially different
75+
// for different execution nodes. Hence, we also persist which execution node (`execNode) provided the error message.
76+
//
77+
// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist.
6578
func (c *TxErrorMessagesCore) FetchErrorMessagesByENs(
6679
ctx context.Context,
6780
blockID flow.Identifier,
@@ -71,7 +84,6 @@ func (c *TxErrorMessagesCore) FetchErrorMessagesByENs(
7184
if err != nil {
7285
return fmt.Errorf("could not check existance of transaction result error messages: %w", err)
7386
}
74-
7587
if exists {
7688
return nil
7789
}
@@ -100,14 +112,10 @@ func (c *TxErrorMessagesCore) FetchErrorMessagesByENs(
100112
return nil
101113
}
102114

103-
// storeTransactionResultErrorMessages stores the transaction result error messages for a given block ID.
115+
// storeTransactionResultErrorMessages persists and indexes all transaction result error messages for the given blockID.
116+
// The caller must acquire [storage.LockInsertTransactionResultErrMessage] and hold it until the write batch has been committed.
104117
//
105-
// Parameters:
106-
// - blockID: The identifier of the block for which the error messages are to be stored.
107-
// - errorMessagesResponses: A slice of responses containing the error messages to be stored.
108-
// - execNode: The execution node associated with the error messages.
109-
//
110-
// No errors are expected during normal operation.
118+
// It returns [storage.ErrAlreadyExists] if tx result error messages for the block already exist.
111119
func (c *TxErrorMessagesCore) storeTransactionResultErrorMessages(
112120
blockID flow.Identifier,
113121
errorMessagesResponses []*execproto.GetTransactionErrorMessagesResponse_Result,
@@ -124,7 +132,9 @@ func (c *TxErrorMessagesCore) storeTransactionResultErrorMessages(
124132
errorMessages = append(errorMessages, errorMessage)
125133
}
126134

127-
err := c.transactionResultErrorMessages.Store(blockID, errorMessages)
135+
err := storage.WithLock(c.lockManager, storage.LockInsertTransactionResultErrMessage, func(lctx lockctx.Context) error {
136+
return c.transactionResultErrorMessages.Store(lctx, blockID, errorMessages)
137+
})
128138
if err != nil {
129139
return fmt.Errorf("failed to store transaction error messages: %w", err)
130140
}

engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"testing"
77

8+
"github.com/jordanschalm/lockctx"
89
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
910
"github.com/rs/zerolog"
1011
"github.com/stretchr/testify/mock"
@@ -21,7 +22,8 @@ import (
2122
"github.com/onflow/flow-go/module/irrecoverable"
2223
syncmock "github.com/onflow/flow-go/module/state_synchronization/mock"
2324
protocol "github.com/onflow/flow-go/state/protocol/mock"
24-
storage "github.com/onflow/flow-go/storage/mock"
25+
"github.com/onflow/flow-go/storage"
26+
storagemock "github.com/onflow/flow-go/storage/mock"
2527
"github.com/onflow/flow-go/utils/unittest"
2628
)
2729

@@ -37,13 +39,14 @@ type TxErrorMessagesCoreSuite struct {
3739
params *protocol.Params
3840
}
3941

40-
receipts *storage.ExecutionReceipts
41-
txErrorMessages *storage.TransactionResultErrorMessages
42-
lightTxResults *storage.LightTransactionResults
42+
receipts *storagemock.ExecutionReceipts
43+
txErrorMessages *storagemock.TransactionResultErrorMessages
44+
lightTxResults *storagemock.LightTransactionResults
4345

4446
reporter *syncmock.IndexReporter
4547
indexReporter *index.Reporter
4648
txResultsIndex *index.TransactionResultsIndex
49+
lockManager storage.LockManager
4750

4851
enNodeIDs flow.IdentityList
4952
execClient *accessmock.ExecutionAPIClient
@@ -79,18 +82,21 @@ func (s *TxErrorMessagesCoreSuite) SetupTest() {
7982
s.proto.params = protocol.NewParams(s.T())
8083
s.execClient = accessmock.NewExecutionAPIClient(s.T())
8184
s.connFactory = connectionmock.NewConnectionFactory(s.T())
82-
s.receipts = storage.NewExecutionReceipts(s.T())
83-
s.txErrorMessages = storage.NewTransactionResultErrorMessages(s.T())
85+
s.receipts = storagemock.NewExecutionReceipts(s.T())
86+
s.txErrorMessages = storagemock.NewTransactionResultErrorMessages(s.T())
8487
s.rootBlock = unittest.Block.Genesis(flow.Emulator)
8588
s.finalizedBlock = unittest.BlockWithParentFixture(s.rootBlock.ToHeader()).ToHeader()
8689

87-
s.lightTxResults = storage.NewLightTransactionResults(s.T())
90+
s.lightTxResults = storagemock.NewLightTransactionResults(s.T())
8891
s.reporter = syncmock.NewIndexReporter(s.T())
8992
s.indexReporter = index.NewReporter()
9093
err := s.indexReporter.Initialize(s.reporter)
9194
s.Require().NoError(err)
9295
s.txResultsIndex = index.NewTransactionResultsIndex(s.indexReporter, s.lightTxResults)
9396

97+
// Initialize lock manager for tests
98+
s.lockManager = storage.NewTestingLockManager()
99+
94100
s.proto.state.On("Params").Return(s.proto.params)
95101

96102
// Mock the finalized root block header with height 0.
@@ -143,8 +149,11 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages() {
143149
expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0])
144150

145151
// Mock the storage of the fetched error messages into the protocol database.
146-
s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages).
147-
Return(nil).Once()
152+
s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages).
153+
Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error {
154+
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
155+
return nil
156+
}).Once()
148157

149158
core := s.initCore()
150159
err := core.FetchErrorMessages(irrecoverableCtx, blockId)
@@ -228,8 +237,11 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages_Erro
228237

229238
// Simulate an error when attempting to store the fetched transaction error messages in storage.
230239
expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0])
231-
s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages).
232-
Return(fmt.Errorf("storage error")).Once()
240+
s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages).
241+
Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error {
242+
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
243+
return fmt.Errorf("storage error")
244+
}).Once()
233245

234246
core := s.initCore()
235247
err := core.FetchErrorMessages(irrecoverableCtx, blockId)
@@ -268,6 +280,7 @@ func (s *TxErrorMessagesCoreSuite) initCore() *TxErrorMessagesCore {
268280
errorMessageProvider,
269281
s.txErrorMessages,
270282
execNodeIdentitiesProvider,
283+
s.lockManager,
271284
)
272285
return core
273286
}
@@ -311,7 +324,7 @@ func mockTransactionResultsByBlock(count int) []flow.LightTransactionResult {
311324

312325
// setupReceiptsForBlock sets up mock execution receipts for a block and returns the receipts along
313326
// with the identities of the execution nodes that processed them.
314-
func setupReceiptsForBlock(receipts *storage.ExecutionReceipts, block *flow.Block, eNodeID flow.Identifier) {
327+
func setupReceiptsForBlock(receipts *storagemock.ExecutionReceipts, block *flow.Block, eNodeID flow.Identifier) {
315328
receipt1 := unittest.ReceiptForBlockFixture(block)
316329
receipt1.ExecutorID = eNodeID
317330
receipt2 := unittest.ReceiptForBlockFixture(block)
@@ -328,7 +341,7 @@ func setupReceiptsForBlock(receipts *storage.ExecutionReceipts, block *flow.Bloc
328341
}
329342

330343
// setupReceiptsForBlockWithResult sets up mock execution receipts for a block with a specific execution result
331-
func setupReceiptsForBlockWithResult(receipts *storage.ExecutionReceipts, executionResult *flow.ExecutionResult, executorIDs ...flow.Identifier) {
344+
func setupReceiptsForBlockWithResult(receipts *storagemock.ExecutionReceipts, executionResult *flow.ExecutionResult, executorIDs ...flow.Identifier) {
332345
receiptList := make(flow.ExecutionReceiptList, 0, len(executorIDs))
333346
for _, enID := range executorIDs {
334347
receiptList = append(receiptList, unittest.ExecutionReceiptFixture(

engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/jordanschalm/lockctx"
1011
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
1112
"github.com/rs/zerolog"
1213
"github.com/stretchr/testify/mock"
@@ -55,6 +56,7 @@ type TxErrorMessagesEngineSuite struct {
5556
reporter *syncmock.IndexReporter
5657
indexReporter *index.Reporter
5758
txResultsIndex *index.TransactionResultsIndex
59+
lockManager storage.LockManager
5860

5961
enNodeIDs flow.IdentityList
6062
execClient *accessmock.ExecutionAPIClient
@@ -87,9 +89,13 @@ func (s *TxErrorMessagesEngineSuite) SetupTest() {
8789
s.log = unittest.Logger()
8890
s.metrics = metrics.NewNoopCollector()
8991
s.ctx, s.cancel = context.WithCancel(context.Background())
92+
93+
// Initialize database and lock manager
9094
pdb, dbDir := unittest.TempPebbleDB(s.T())
9195
s.db = pebbleimpl.ToDB(pdb)
9296
s.dbDir = dbDir
97+
s.lockManager = storage.NewTestingLockManager()
98+
9399
// mock out protocol state
94100
s.proto.state = protocol.NewFollowerState(s.T())
95101
s.proto.snapshot = protocol.NewSnapshot(s.T())
@@ -177,6 +183,7 @@ func (s *TxErrorMessagesEngineSuite) initEngine(ctx irrecoverable.SignalerContex
177183
errorMessageProvider,
178184
s.txErrorMessages,
179185
execNodeIdentitiesProvider,
186+
s.lockManager,
180187
)
181188

182189
eng, err := New(
@@ -245,10 +252,12 @@ func (s *TxErrorMessagesEngineSuite) TestOnFinalizedBlockHandleTxErrorMessages()
245252
expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0])
246253

247254
// Mock the storage of the fetched error messages into the protocol database.
248-
s.txErrorMessages.On("Store", blockID, expectedStoreTxErrorMessages).Return(nil).
255+
s.txErrorMessages.On("Store", mock.Anything, blockID, expectedStoreTxErrorMessages).Return(nil).
249256
Run(func(args mock.Arguments) {
250-
// Ensure the test does not complete its work faster than necessary
251-
wg.Done()
257+
lctx, ok := args[0].(lockctx.Proof)
258+
require.True(s.T(), ok, "expecting lock proof, but cast failed")
259+
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
260+
wg.Done() // Ensure the test does not complete its work faster than necessary
252261
}).Once()
253262
}
254263

module/executiondatasync/optimistic_sync/core.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func (c *CoreImpl) Persist() error {
335335
stores.NewEventsStore(indexerData.Events, c.workingData.persistentEvents, c.executionResult.BlockID),
336336
stores.NewResultsStore(indexerData.Results, c.workingData.persistentResults, c.executionResult.BlockID),
337337
stores.NewCollectionsStore(indexerData.Collections, c.workingData.persistentCollections),
338-
stores.NewTxResultErrMsgStore(c.workingData.txResultErrMsgsData, c.workingData.persistentTxResultErrMsgs, c.executionResult.BlockID),
338+
stores.NewTxResultErrMsgStore(c.workingData.txResultErrMsgsData, c.workingData.persistentTxResultErrMsgs, c.executionResult.BlockID, c.workingData.lockManager),
339339
stores.NewLatestSealedResultStore(c.workingData.latestPersistedSealedResult, c.executionResult.ID(), c.block.Height),
340340
}
341341
blockPersister := persisters.NewBlockPersister(

module/executiondatasync/optimistic_sync/core_impl_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/jordanschalm/lockctx"
910
"github.com/stretchr/testify/mock"
1011
"github.com/stretchr/testify/suite"
1112

@@ -446,9 +447,21 @@ func (c *CoreImplSuite) TestCoreImpl_Persist() {
446447
indexerData := core.workingData.indexerData
447448
c.persistentRegisters.On("Store", flow.RegisterEntries(indexerData.Registers), tf.block.Height).Return(nil)
448449
c.persistentEvents.On("BatchStore", blockID, []flow.EventsList{indexerData.Events}, mock.Anything).Return(nil)
449-
c.persistentCollections.On("BatchStoreAndIndexByTransaction", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
450-
c.persistentResults.On("BatchStore", blockID, indexerData.Results, mock.Anything).Return(nil)
451-
c.persistentTxResultErrMsg.On("BatchStore", blockID, core.workingData.txResultErrMsgsData, mock.Anything).Return(nil)
450+
c.persistentCollections.On("BatchStoreAndIndexByTransaction",
451+
mock.MatchedBy(func(lctx lockctx.Proof) bool {
452+
return lctx.HoldsLock(storage.LockInsertCollection)
453+
}),
454+
mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
455+
c.persistentResults.On("BatchStore",
456+
mock.MatchedBy(func(lctx lockctx.Proof) bool {
457+
return lctx.HoldsLock(storage.LockInsertLightTransactionResult)
458+
}),
459+
mock.Anything, blockID, indexerData.Results).Return(nil)
460+
c.persistentTxResultErrMsg.On("BatchStore",
461+
mock.MatchedBy(func(lctx lockctx.Proof) bool {
462+
return lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage)
463+
}),
464+
mock.Anything, blockID, core.workingData.txResultErrMsgsData).Return(nil)
452465
c.latestPersistedSealedResult.On("BatchSet", tf.exeResult.ID(), tf.block.Height, mock.Anything).Return(nil)
453466

454467
err = core.Persist()

module/executiondatasync/optimistic_sync/persisters/block.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,19 @@ func (p *BlockPersister) Persist() error {
6060
p.log.Debug().Msg("started to persist execution data")
6161
start := time.Now()
6262

63-
lctx := p.lockManager.NewContext()
64-
err := lctx.AcquireLock(storage.LockInsertCollection)
65-
if err != nil {
66-
return fmt.Errorf("could not acquire lock for inserting light collections: %w", err)
67-
}
68-
defer lctx.Release()
69-
70-
err = p.protocolDB.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error {
71-
for _, persister := range p.persisterStores {
72-
if err := persister.Persist(lctx, batch); err != nil {
73-
return err
63+
err := storage.WithLocks(p.lockManager, []string{
64+
storage.LockInsertCollection,
65+
storage.LockInsertLightTransactionResult,
66+
storage.LockInsertTransactionResultErrMessage,
67+
}, func(lctx lockctx.Context) error {
68+
return p.protocolDB.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error {
69+
for _, persister := range p.persisterStores {
70+
if err := persister.Persist(lctx, batch); err != nil {
71+
return err
72+
}
7473
}
75-
}
76-
return nil
74+
return nil
75+
})
7776
})
7877

7978
if err != nil {

module/executiondatasync/optimistic_sync/persisters/block_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (p *PersisterSuite) testWithDatabase() {
111111
stores.NewEventsStore(p.indexerData.Events, events, p.executionResult.BlockID),
112112
stores.NewResultsStore(p.indexerData.Results, results, p.executionResult.BlockID),
113113
stores.NewCollectionsStore(p.indexerData.Collections, collections),
114-
stores.NewTxResultErrMsgStore(p.txErrMsgs, txResultErrMsg, p.executionResult.BlockID),
114+
stores.NewTxResultErrMsgStore(p.txErrMsgs, txResultErrMsg, p.executionResult.BlockID, lockManager),
115115
stores.NewLatestSealedResultStore(latestPersistedSealedResult, p.executionResult.ID(), p.header.Height),
116116
},
117117
)

module/executiondatasync/optimistic_sync/persisters/stores/events.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package stores
22

33
import (
4+
"errors"
45
"fmt"
56

67
"github.com/jordanschalm/lockctx"
@@ -34,8 +35,16 @@ func NewEventsStore(
3435
//
3536
// No error returns are expected during normal operations
3637
func (e *EventsStore) Persist(_ lockctx.Proof, batch storage.ReaderBatchWriter) error {
37-
if err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch); err != nil {
38+
err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch)
39+
if err != nil {
40+
if errors.Is(err, storage.ErrAlreadyExists) {
41+
// CAUTION: here we assume that if something is already stored for our blockID, then the data is identical.
42+
// This only holds true for sealed execution results, whose consistency has previously been verified by
43+
// comparing the data's hash to commitments in the execution result.
44+
return nil
45+
}
3846
return fmt.Errorf("could not add events to batch: %w", err)
3947
}
48+
4049
return nil
4150
}

0 commit comments

Comments
 (0)