Skip to content

Commit 0ec3b3a

Browse files
authored
Merge branch 'master' into jord/7397-storage-readme
2 parents f3cbe93 + dba1b3c commit 0ec3b3a

File tree

270 files changed

+10764
-6293
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

270 files changed

+10764
-6293
lines changed

admin/commands/storage/backfill_tx_error_messages.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/rs/zerolog"
8+
79
"github.com/onflow/flow-go/admin"
810
"github.com/onflow/flow-go/admin/commands"
911
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
1012
"github.com/onflow/flow-go/model/flow"
1113
"github.com/onflow/flow-go/model/flow/filter"
14+
"github.com/onflow/flow-go/module/util"
1215
"github.com/onflow/flow-go/state/protocol"
1316
)
1417

@@ -25,16 +28,19 @@ type backfillTxErrorMessagesRequest struct {
2528
// BackfillTxErrorMessagesCommand executes a command to backfill
2629
// transaction error messages by fetching them from execution nodes.
2730
type BackfillTxErrorMessagesCommand struct {
31+
log zerolog.Logger
2832
state protocol.State
2933
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
3034
}
3135

3236
// NewBackfillTxErrorMessagesCommand creates a new instance of BackfillTxErrorMessagesCommand
3337
func NewBackfillTxErrorMessagesCommand(
38+
log zerolog.Logger,
3439
state protocol.State,
3540
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore,
3641
) commands.AdminCommand {
3742
return &BackfillTxErrorMessagesCommand{
43+
log: log.With().Str("command", "backfill-tx-error-messages").Logger(),
3844
state: state,
3945
txErrorMessagesCore: txErrorMessagesCore,
4046
}
@@ -147,6 +153,17 @@ func (b *BackfillTxErrorMessagesCommand) Handler(ctx context.Context, request *a
147153

148154
data := request.ValidatorData.(*backfillTxErrorMessagesRequest)
149155

156+
total := data.endHeight - data.startHeight + 1
157+
progress := util.LogProgress(b.log,
158+
util.DefaultLogProgressConfig("backfilling", int(total)),
159+
)
160+
161+
b.log.Info().
162+
Uint64("start_height", data.startHeight).
163+
Uint64("end_height", data.endHeight).
164+
Uint64("blocks", total).
165+
Msgf("starting backfill")
166+
150167
for height := data.startHeight; height <= data.endHeight; height++ {
151168
header, err := b.state.AtHeight(height).Head()
152169
if err != nil {
@@ -158,6 +175,8 @@ func (b *BackfillTxErrorMessagesCommand) Handler(ctx context.Context, request *a
158175
if err != nil {
159176
return nil, fmt.Errorf("error encountered while processing transaction result error message for block: %d, %w", height, err)
160177
}
178+
179+
progress(1)
161180
}
162181

163182
return nil, nil
@@ -170,13 +189,18 @@ func (b *BackfillTxErrorMessagesCommand) Handler(ctx context.Context, request *a
170189
// - admin.InvalidAdminReqParameterError - if execution-node-ids is empty or has an invalid format.
171190
func (b *BackfillTxErrorMessagesCommand) parseExecutionNodeIds(executionNodeIdsIn interface{}, allIdentities flow.IdentityList) (flow.IdentitySkeletonList, error) {
172191
var ids flow.IdentityList
173-
174192
switch executionNodeIds := executionNodeIdsIn.(type) {
175-
case []string:
193+
case []any:
176194
if len(executionNodeIds) == 0 {
177195
return nil, admin.NewInvalidAdminReqParameterError("execution-node-ids", "must be a non empty list of strings", executionNodeIdsIn)
178196
}
179-
requestedENIdentifiers, err := flow.IdentifierListFromHex(executionNodeIds)
197+
198+
idStrings := make([]string, len(executionNodeIds))
199+
for i, id := range executionNodeIds {
200+
idStrings[i] = id.(string)
201+
}
202+
203+
requestedENIdentifiers, err := flow.IdentifierListFromHex(idStrings)
180204
if err != nil {
181205
return nil, admin.NewInvalidAdminReqParameterError("execution-node-ids", err.Error(), executionNodeIdsIn)
182206
}

admin/commands/storage/backfill_tx_error_messages_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func TestBackfillTxErrorMessages(t *testing.T) {
7777
func (suite *BackfillTxErrorMessagesSuite) SetupTest() {
7878
suite.log = zerolog.New(os.Stderr)
7979

80+
lockManager := storage.NewTestingLockManager()
8081
suite.state = new(protocolmock.State)
8182
suite.headers = new(storagemock.Headers)
8283
suite.receipts = new(storagemock.ExecutionReceipts)
@@ -160,9 +161,11 @@ func (suite *BackfillTxErrorMessagesSuite) SetupTest() {
160161
errorMessageProvider,
161162
suite.txErrorMessages,
162163
executionNodeIdentitiesProvider,
164+
lockManager,
163165
)
164166

165167
suite.command = NewBackfillTxErrorMessagesCommand(
168+
suite.log,
166169
suite.state,
167170
suite.txResultErrorMessagesCore,
168171
)
@@ -237,7 +240,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestValidateInvalidFormat() {
237240
Data: map[string]interface{}{
238241
"start-height": float64(1), // raw json parses to float64
239242
"end-height": float64(endHeight), // raw json parses to float64
240-
"execution-node-ids": []string{suite.allENIDs[0].NodeID.String()},
243+
"execution-node-ids": []any{suite.allENIDs[0].NodeID.String()},
241244
},
242245
})
243246
suite.Error(err)
@@ -290,7 +293,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestValidateInvalidFormat() {
290293
Data: map[string]interface{}{
291294
"start-height": float64(1), // raw json parses to float64
292295
"end-height": float64(4), // raw json parses to float64
293-
"execution-node-ids": []string{invalidENID.String()},
296+
"execution-node-ids": []any{invalidENID.String()},
294297
},
295298
})
296299
suite.Error(err)
@@ -321,7 +324,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestValidateValidFormat() {
321324
Data: map[string]interface{}{
322325
"start-height": float64(1), // raw json parses to float64
323326
"end-height": float64(3), // raw json parses to float64
324-
"execution-node-ids": []string{suite.allENIDs[0].NodeID.String()},
327+
"execution-node-ids": []any{suite.allENIDs[0].NodeID.String()},
325328
},
326329
})
327330
suite.NoError(err)
@@ -389,7 +392,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestHandleBackfillTxErrorMessages() {
389392
Data: map[string]interface{}{
390393
"start-height": float64(startHeight), // raw json parses to float64
391394
"end-height": float64(endHeight), // raw json parses to float64
392-
"execution-node-ids": []string{executorID.String()},
395+
"execution-node-ids": []any{executorID.String()},
393396
},
394397
}
395398
suite.Require().NoError(suite.command.Validator(req))
@@ -432,6 +435,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestHandleBackfillTxErrorMessagesErro
432435
suite.Run("error when txErrorMessagesCore is nil", func() {
433436
req := &admin.CommandRequest{Data: map[string]interface{}{}}
434437
command := NewBackfillTxErrorMessagesCommand(
438+
suite.log,
435439
suite.state,
436440
nil,
437441
)
@@ -530,7 +534,7 @@ func (suite *BackfillTxErrorMessagesSuite) mockStoreTxErrorMessages(
530534
}
531535
}
532536

533-
suite.txErrorMessages.On("Store", blockID, txErrorMessages).Return(nil).Once()
537+
suite.txErrorMessages.On("Store", mock.Anything, blockID, txErrorMessages).Return(nil).Once()
534538
}
535539

536540
// assertAllExpectations asserts that all the expectations set on various mocks are met,

cmd/access/node_builder/access_node_builder.go

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ type FlowAccessNodeBuilder struct {
351351
transactionResultErrorMessages storage.TransactionResultErrorMessages
352352
transactions storage.Transactions
353353
collections storage.Collections
354+
scheduledTransactions storage.ScheduledTransactions
354355

355356
// The sync engine participants provider is the libp2p peer store for the access node
356357
// which is not available until after the network has started.
@@ -852,6 +853,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
852853
builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
853854
return nil
854855
}).
856+
Module("scheduled transactions storage", func(node *cmd.NodeConfig) error {
857+
builder.scheduledTransactions = store.NewScheduledTransactions(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
858+
return nil
859+
}).
855860
DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
856861
// Note: using a DependableComponent here to ensure that the indexer does not block
857862
// other components from starting while bootstrapping the register db since it may
@@ -937,7 +942,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
937942
return nil, fmt.Errorf("could not create derived chain data: %w", err)
938943
}
939944

940-
indexerCore, err := indexer.New(
945+
builder.ExecutionIndexerCore = indexer.New(
941946
builder.Logger,
942947
metrics.NewExecutionStateIndexerCollector(),
943948
notNil(builder.ProtocolDB),
@@ -947,22 +952,19 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
947952
notNil(builder.collections),
948953
notNil(builder.transactions),
949954
notNil(builder.lightTransactionResults),
950-
builder.RootChainID.Chain(),
955+
notNil(builder.scheduledTransactions),
956+
builder.RootChainID,
951957
indexerDerivedChainData,
952958
notNil(builder.collectionExecutedMetric),
953959
node.StorageLockMgr,
954960
)
955-
if err != nil {
956-
return nil, err
957-
}
958-
builder.ExecutionIndexerCore = indexerCore
959961

960962
// execution state worker uses a jobqueue to process new execution data and indexes it by using the indexer.
961963
builder.ExecutionIndexer, err = indexer.NewIndexer(
962964
builder.Logger,
963965
registers.FirstHeight(),
964966
registers,
965-
indexerCore,
967+
builder.ExecutionIndexerCore,
966968
executionDataStoreCache,
967969
builder.ExecutionDataRequester.HighestConsecutiveHeight,
968970
indexedBlockHeight,
@@ -1601,6 +1603,11 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
16011603
return errors.New("rpc-max-execution-response-message-size must be greater than 0")
16021604
}
16031605

1606+
// indexing tx error messages is only supported when tx results are also indexed
1607+
if builder.storeTxResultErrorMessages && !builder.executionDataIndexingEnabled {
1608+
return errors.New("execution-data-indexing-enabled must be set if store-tx-result-error-messages is enabled")
1609+
}
1610+
16041611
return nil
16051612
})
16061613
}
@@ -1956,13 +1963,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
19561963

19571964
return nil
19581965
}).
1959-
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
1960-
if builder.storeTxResultErrorMessages {
1961-
builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
1962-
}
1963-
1964-
return nil
1965-
}).
19661966
Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
19671967
if !builder.versionControlEnabled {
19681968
noop := &module.NoopReadyDoneAware{}
@@ -2217,6 +2217,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
22172217
notNil(builder.txResultErrorMessageProvider),
22182218
builder.transactionResultErrorMessages,
22192219
notNil(builder.ExecNodeIdentitiesProvider),
2220+
node.StorageLockMgr,
22202221
)
22212222
}
22222223

@@ -2262,34 +2263,45 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
22622263
}).
22632264
AdminCommand("backfill-tx-error-messages", func(config *cmd.NodeConfig) commands.AdminCommand {
22642265
return storageCommands.NewBackfillTxErrorMessagesCommand(
2266+
builder.Logger,
22652267
builder.State,
22662268
builder.TxResultErrorMessagesCore,
22672269
)
22682270
})
22692271

22702272
if builder.storeTxResultErrorMessages {
2271-
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
2272-
processedTxErrorMessagesBlockHeight = store.NewConsumerProgress(
2273-
builder.ProtocolDB,
2274-
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
2275-
)
2276-
return nil
2277-
})
2278-
builder.Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
2279-
engine, err := tx_error_messages.New(
2280-
node.Logger,
2281-
node.State,
2282-
node.Storage.Headers,
2283-
processedTxErrorMessagesBlockHeight,
2284-
builder.TxResultErrorMessagesCore,
2285-
)
2286-
if err != nil {
2287-
return nil, err
2288-
}
2289-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)
2273+
builder.
2274+
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
2275+
builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(
2276+
node.Metrics.Cache,
2277+
node.ProtocolDB,
2278+
bstorage.DefaultCacheSize,
2279+
)
2280+
return nil
2281+
}).
2282+
Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
2283+
processedTxErrorMessagesBlockHeight = store.NewConsumerProgress(
2284+
builder.ProtocolDB,
2285+
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
2286+
)
2287+
return nil
2288+
}).
2289+
Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
2290+
engine, err := tx_error_messages.New(
2291+
node.Logger,
2292+
metrics.NewTransactionErrorMessagesCollector(),
2293+
node.State,
2294+
node.Storage.Headers,
2295+
processedTxErrorMessagesBlockHeight,
2296+
builder.TxResultErrorMessagesCore,
2297+
)
2298+
if err != nil {
2299+
return nil, err
2300+
}
2301+
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)
22902302

2291-
return engine, nil
2292-
})
2303+
return engine, nil
2304+
})
22932305
}
22942306

22952307
if builder.supportsObserver {

cmd/collection/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ func main() {
636636
node.Logger,
637637
node.Me,
638638
node.ProtocolDB,
639+
node.StorageLockMgr,
639640
node.State,
640641
node.Metrics.Engine,
641642
node.Metrics.Mempool,

cmd/consensus/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ func main() {
396396
multipleReceiptsFilterMempool,
397397
consensusMempools.LogForkAndCrash(node.Logger),
398398
node.ProtocolDB,
399+
node.StorageLockMgr,
399400
node.Logger,
400401
)
401402
if err != nil {
@@ -621,7 +622,7 @@ func main() {
621622
notifier.AddFollowerConsumer(followerDistributor)
622623

623624
// initialize the persister
624-
persist, err := persister.New(node.ProtocolDB, node.RootChainID)
625+
persist, err := persister.New(node.ProtocolDB, node.RootChainID, node.StorageLockMgr)
625626
if err != nil {
626627
return nil, err
627628
}

cmd/execution_builder.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ func (exeNode *ExecutionNode) LoadSyncCore(node *NodeConfig) error {
334334
func (exeNode *ExecutionNode) LoadExecutionStorage(
335335
node *NodeConfig,
336336
) error {
337+
var err error
337338
db := node.ProtocolDB
338339

339340
exeNode.events = store.NewEvents(node.Metrics.Cache, db)
@@ -342,7 +343,10 @@ func (exeNode *ExecutionNode) LoadExecutionStorage(
342343
exeNode.results = store.NewExecutionResults(node.Metrics.Cache, db)
343344
exeNode.receipts = store.NewExecutionReceipts(node.Metrics.Cache, db, exeNode.results, storage.DefaultCacheSize)
344345
exeNode.myReceipts = store.NewMyExecutionReceipts(node.Metrics.Cache, db, exeNode.receipts)
345-
exeNode.txResults = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
346+
exeNode.txResults, err = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
347+
if err != nil {
348+
return err
349+
}
346350
exeNode.eventsReader = exeNode.events
347351
exeNode.commitsReader = exeNode.commits
348352
exeNode.resultsReader = exeNode.results
@@ -758,8 +762,12 @@ func (exeNode *ExecutionNode) LoadExecutionState(
758762
}
759763
return nil
760764
})
765+
766+
chunkDB := pebbleimpl.ToDB(chunkDataPackDB)
767+
storedChunkDataPacks := store.NewStoredChunkDataPacks(
768+
node.Metrics.Cache, chunkDB, exeNode.exeConf.chunkDataPackCacheSize)
761769
chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache,
762-
pebbleimpl.ToDB(chunkDataPackDB), exeNode.collections, exeNode.exeConf.chunkDataPackCacheSize)
770+
chunkDB, storedChunkDataPacks, exeNode.collections, exeNode.exeConf.chunkDataPackCacheSize)
763771

764772
getLatestFinalized := func() (uint64, error) {
765773
final, err := node.State.Final().Head()

0 commit comments

Comments
 (0)