Skip to content

Commit c046245

Browse files
committed
Merge branch 'master' into leo/refactor-execution-fork-evidence
2 parents e400653 + da3c9e1 commit c046245

File tree

108 files changed

+3488
-3218
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+3488
-3218
lines changed

admin/commands/storage/backfill_tx_error_messages.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/rs/zerolog"
8+
79
"github.com/onflow/flow-go/admin"
810
"github.com/onflow/flow-go/admin/commands"
911
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
1012
"github.com/onflow/flow-go/model/flow"
1113
"github.com/onflow/flow-go/model/flow/filter"
14+
"github.com/onflow/flow-go/module/util"
1215
"github.com/onflow/flow-go/state/protocol"
1316
)
1417

@@ -25,16 +28,19 @@ type backfillTxErrorMessagesRequest struct {
2528
// BackfillTxErrorMessagesCommand executes a command to backfill
2629
// transaction error messages by fetching them from execution nodes.
2730
type BackfillTxErrorMessagesCommand struct {
31+
log zerolog.Logger
2832
state protocol.State
2933
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
3034
}
3135

3236
// NewBackfillTxErrorMessagesCommand creates a new instance of BackfillTxErrorMessagesCommand
3337
func NewBackfillTxErrorMessagesCommand(
38+
log zerolog.Logger,
3439
state protocol.State,
3540
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore,
3641
) commands.AdminCommand {
3742
return &BackfillTxErrorMessagesCommand{
43+
log: log.With().Str("command", "backfill-tx-error-messages").Logger(),
3844
state: state,
3945
txErrorMessagesCore: txErrorMessagesCore,
4046
}
@@ -147,6 +153,17 @@ func (b *BackfillTxErrorMessagesCommand) Handler(ctx context.Context, request *a
147153

148154
data := request.ValidatorData.(*backfillTxErrorMessagesRequest)
149155

156+
total := data.endHeight - data.startHeight + 1
157+
progress := util.LogProgress(b.log,
158+
util.DefaultLogProgressConfig("backfilling", int(total)),
159+
)
160+
161+
b.log.Info().
162+
Uint64("start_height", data.startHeight).
163+
Uint64("end_height", data.endHeight).
164+
Uint64("blocks", total).
165+
Msgf("starting backfill")
166+
150167
for height := data.startHeight; height <= data.endHeight; height++ {
151168
header, err := b.state.AtHeight(height).Head()
152169
if err != nil {
@@ -158,6 +175,8 @@ func (b *BackfillTxErrorMessagesCommand) Handler(ctx context.Context, request *a
158175
if err != nil {
159176
return nil, fmt.Errorf("error encountered while processing transaction result error message for block: %d, %w", height, err)
160177
}
178+
179+
progress(1)
161180
}
162181

163182
return nil, nil
@@ -170,13 +189,18 @@ func (b *BackfillTxErrorMessagesCommand) Handler(ctx context.Context, request *a
170189
// - admin.InvalidAdminReqParameterError - if execution-node-ids is empty or has an invalid format.
171190
func (b *BackfillTxErrorMessagesCommand) parseExecutionNodeIds(executionNodeIdsIn interface{}, allIdentities flow.IdentityList) (flow.IdentitySkeletonList, error) {
172191
var ids flow.IdentityList
173-
174192
switch executionNodeIds := executionNodeIdsIn.(type) {
175-
case []string:
193+
case []any:
176194
if len(executionNodeIds) == 0 {
177195
return nil, admin.NewInvalidAdminReqParameterError("execution-node-ids", "must be a non empty list of strings", executionNodeIdsIn)
178196
}
179-
requestedENIdentifiers, err := flow.IdentifierListFromHex(executionNodeIds)
197+
198+
idStrings := make([]string, len(executionNodeIds))
199+
for i, id := range executionNodeIds {
200+
idStrings[i] = id.(string)
201+
}
202+
203+
requestedENIdentifiers, err := flow.IdentifierListFromHex(idStrings)
180204
if err != nil {
181205
return nil, admin.NewInvalidAdminReqParameterError("execution-node-ids", err.Error(), executionNodeIdsIn)
182206
}

admin/commands/storage/backfill_tx_error_messages_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func (suite *BackfillTxErrorMessagesSuite) SetupTest() {
163163
)
164164

165165
suite.command = NewBackfillTxErrorMessagesCommand(
166+
suite.log,
166167
suite.state,
167168
suite.txResultErrorMessagesCore,
168169
)
@@ -237,7 +238,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestValidateInvalidFormat() {
237238
Data: map[string]interface{}{
238239
"start-height": float64(1), // raw json parses to float64
239240
"end-height": float64(endHeight), // raw json parses to float64
240-
"execution-node-ids": []string{suite.allENIDs[0].NodeID.String()},
241+
"execution-node-ids": []any{suite.allENIDs[0].NodeID.String()},
241242
},
242243
})
243244
suite.Error(err)
@@ -290,7 +291,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestValidateInvalidFormat() {
290291
Data: map[string]interface{}{
291292
"start-height": float64(1), // raw json parses to float64
292293
"end-height": float64(4), // raw json parses to float64
293-
"execution-node-ids": []string{invalidENID.String()},
294+
"execution-node-ids": []any{invalidENID.String()},
294295
},
295296
})
296297
suite.Error(err)
@@ -321,7 +322,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestValidateValidFormat() {
321322
Data: map[string]interface{}{
322323
"start-height": float64(1), // raw json parses to float64
323324
"end-height": float64(3), // raw json parses to float64
324-
"execution-node-ids": []string{suite.allENIDs[0].NodeID.String()},
325+
"execution-node-ids": []any{suite.allENIDs[0].NodeID.String()},
325326
},
326327
})
327328
suite.NoError(err)
@@ -389,7 +390,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestHandleBackfillTxErrorMessages() {
389390
Data: map[string]interface{}{
390391
"start-height": float64(startHeight), // raw json parses to float64
391392
"end-height": float64(endHeight), // raw json parses to float64
392-
"execution-node-ids": []string{executorID.String()},
393+
"execution-node-ids": []any{executorID.String()},
393394
},
394395
}
395396
suite.Require().NoError(suite.command.Validator(req))
@@ -432,6 +433,7 @@ func (suite *BackfillTxErrorMessagesSuite) TestHandleBackfillTxErrorMessagesErro
432433
suite.Run("error when txErrorMessagesCore is nil", func() {
433434
req := &admin.CommandRequest{Data: map[string]interface{}{}}
434435
command := NewBackfillTxErrorMessagesCommand(
436+
suite.log,
435437
suite.state,
436438
nil,
437439
)

cmd/access/node_builder/access_node_builder.go

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,6 +1601,11 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
16011601
return errors.New("rpc-max-execution-response-message-size must be greater than 0")
16021602
}
16031603

1604+
// indexing tx error messages is only supported when tx results are also indexed
1605+
if builder.storeTxResultErrorMessages && !builder.executionDataIndexingEnabled {
1606+
return errors.New("execution-data-indexing-enabled must be set if store-tx-result-error-messages is enabled")
1607+
}
1608+
16041609
return nil
16051610
})
16061611
}
@@ -1956,13 +1961,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
19561961

19571962
return nil
19581963
}).
1959-
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
1960-
if builder.storeTxResultErrorMessages {
1961-
builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
1962-
}
1963-
1964-
return nil
1965-
}).
19661964
Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
19671965
if !builder.versionControlEnabled {
19681966
noop := &module.NoopReadyDoneAware{}
@@ -2262,34 +2260,45 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
22622260
}).
22632261
AdminCommand("backfill-tx-error-messages", func(config *cmd.NodeConfig) commands.AdminCommand {
22642262
return storageCommands.NewBackfillTxErrorMessagesCommand(
2263+
builder.Logger,
22652264
builder.State,
22662265
builder.TxResultErrorMessagesCore,
22672266
)
22682267
})
22692268

22702269
if builder.storeTxResultErrorMessages {
2271-
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
2272-
processedTxErrorMessagesBlockHeight = store.NewConsumerProgress(
2273-
builder.ProtocolDB,
2274-
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
2275-
)
2276-
return nil
2277-
})
2278-
builder.Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
2279-
engine, err := tx_error_messages.New(
2280-
node.Logger,
2281-
node.State,
2282-
node.Storage.Headers,
2283-
processedTxErrorMessagesBlockHeight,
2284-
builder.TxResultErrorMessagesCore,
2285-
)
2286-
if err != nil {
2287-
return nil, err
2288-
}
2289-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)
2270+
builder.
2271+
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
2272+
builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(
2273+
node.Metrics.Cache,
2274+
node.ProtocolDB,
2275+
bstorage.DefaultCacheSize,
2276+
)
2277+
return nil
2278+
}).
2279+
Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
2280+
processedTxErrorMessagesBlockHeight = store.NewConsumerProgress(
2281+
builder.ProtocolDB,
2282+
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
2283+
)
2284+
return nil
2285+
}).
2286+
Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
2287+
engine, err := tx_error_messages.New(
2288+
node.Logger,
2289+
metrics.NewTransactionErrorMessagesCollector(),
2290+
node.State,
2291+
node.Storage.Headers,
2292+
processedTxErrorMessagesBlockHeight,
2293+
builder.TxResultErrorMessagesCore,
2294+
)
2295+
if err != nil {
2296+
return nil, err
2297+
}
2298+
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)
22902299

2291-
return engine, nil
2292-
})
2300+
return engine, nil
2301+
})
22932302
}
22942303

22952304
if builder.supportsObserver {

cmd/collection/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ func main() {
636636
node.Logger,
637637
node.Me,
638638
node.ProtocolDB,
639+
node.StorageLockMgr,
639640
node.State,
640641
node.Metrics.Engine,
641642
node.Metrics.Mempool,

cmd/consensus/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ func main() {
622622
notifier.AddFollowerConsumer(followerDistributor)
623623

624624
// initialize the persister
625-
persist, err := persister.New(node.ProtocolDB, node.RootChainID)
625+
persist, err := persister.New(node.ProtocolDB, node.RootChainID, node.StorageLockMgr)
626626
if err != nil {
627627
return nil, err
628628
}

cmd/scaffold.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func (fnb *FlowNodeBuilder) EnqueuePingService() {
303303
var hotstuffViewFunc func() (uint64, error)
304304
// Setup consensus nodes to report their HotStuff view
305305
if fnb.BaseConfig.NodeRole == flow.RoleConsensus.String() {
306-
hotstuffReader, err := persister.NewReader(node.ProtocolDB, node.RootChainID)
306+
hotstuffReader, err := persister.NewReader(node.ProtocolDB, node.RootChainID, node.StorageLockMgr)
307307
if err != nil {
308308
return nil, err
309309
}

cmd/util/cmd/read-hotstuff/cmd/get_liveness.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ func init() {
2222

2323
func runGetLivenessData(*cobra.Command, []string) {
2424
err := common.WithStorage(flagDatadir, func(db storage.DB) error {
25+
lockManager := storage.NewTestingLockManager()
26+
2527
chainID := flow.ChainID(flagChain)
26-
reader, err := persister.NewReader(db, chainID)
28+
reader, err := persister.NewReader(db, chainID, lockManager)
2729
if err != nil {
2830
log.Fatal().Err(err).Msg("could not create reader from db")
2931
}

cmd/util/cmd/read-hotstuff/cmd/get_safety.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ func init() {
2222

2323
func runGetSafetyData(*cobra.Command, []string) {
2424
err := common.WithStorage(flagDatadir, func(db storage.DB) error {
25+
lockManager := storage.NewTestingLockManager()
2526

2627
chainID := flow.ChainID(flagChain)
27-
reader, err := persister.NewReader(db, chainID)
28+
reader, err := persister.NewReader(db, chainID, lockManager)
2829
if err != nil {
2930
log.Fatal().Err(err).Msg("could not create reader from db")
3031
}

cmd/util/cmd/read-light-block/read_light_block_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/onflow/flow-go/storage"
1111
"github.com/onflow/flow-go/storage/operation"
1212
"github.com/onflow/flow-go/storage/operation/dbtest"
13-
"github.com/onflow/flow-go/storage/procedure"
1413
"github.com/onflow/flow-go/storage/store"
1514
"github.com/onflow/flow-go/utils/unittest"
1615
)
@@ -41,14 +40,14 @@ func TestReadClusterRange(t *testing.T) {
4140
for _, block := range blocks {
4241
err = unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
4342
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
44-
return procedure.InsertClusterBlock(lctx, rw, unittest.ClusterProposalFromBlock(block))
43+
return operation.InsertClusterBlock(lctx, rw, unittest.ClusterProposalFromBlock(block))
4544
})
4645
if err != nil {
4746
return err
4847
}
4948

5049
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
51-
return procedure.FinalizeClusterBlock(lctx, rw, block.ID())
50+
return operation.FinalizeClusterBlock(lctx, rw, block.ID())
5251
})
5352
})
5453
require.NoError(t, err)

cmd/util/cmd/verify_execution_result/cmd.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ func run(*cobra.Command, []string) {
8282
Bool("stop_on_mismatch", flagStopOnMismatch).
8383
Logger()
8484

85+
// Log configuration before starting verification so users can cancel and restart with different values if needed
86+
if !flagStopOnMismatch {
87+
lg.Info().Msgf("note flag --stop_on_mismatch is false, so mismatches (if any) are logged but do not stop the verification")
88+
lg.Info().Msgf("look for 'could not verify' in the log for any mismatch, or try again with --stop_on_mismatch true to stop on first mismatch")
89+
}
90+
8591
if flagFromTo != "" {
8692
from, to, err := parseFromTo(flagFromTo)
8793
if err != nil {
@@ -93,16 +99,15 @@ func run(*cobra.Command, []string) {
9399
if err != nil {
94100
lg.Fatal().Err(err).Msgf("could not verify range from %d to %d", from, to)
95101
}
96-
lg.Info().Msgf("successfully verified range from %d to %d", from, to)
97-
102+
lg.Info().Msgf("finished verified range from %d to %d", from, to)
98103
} else {
99104
lg.Info().Msgf("verifying last %d sealed blocks", flagLastK)
100105
err := verifier.VerifyLastKHeight(lockManager, flagLastK, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount, flagStopOnMismatch, flagtransactionFeesDisabled, flagScheduledCallbacksEnabled)
101106
if err != nil {
102107
lg.Fatal().Err(err).Msg("could not verify last k height")
103108
}
104109

105-
lg.Info().Msgf("successfully verified last %d sealed blocks", flagLastK)
110+
lg.Info().Msgf("finished verified last %d sealed blocks", flagLastK)
106111
}
107112
}
108113

0 commit comments

Comments
 (0)