diff --git a/cmd/util/cmd/compare-cadence-vm/cmd.go b/cmd/util/cmd/compare-cadence-vm/cmd.go index d74254f9792..795c75f2ed8 100644 --- a/cmd/util/cmd/compare-cadence-vm/cmd.go +++ b/cmd/util/cmd/compare-cadence-vm/cmd.go @@ -5,9 +5,11 @@ import ( "context" "encoding/hex" "fmt" + "io" "os" "strings" "sync/atomic" + "time" "github.com/kr/pretty" sdk "github.com/onflow/flow-go-sdk" @@ -16,12 +18,15 @@ import ( "github.com/spf13/cobra" otelTrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" debug_tx "github.com/onflow/flow-go/cmd/util/cmd/debug-tx" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/fvm" "github.com/onflow/flow-go/fvm/errors" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/grpcclient" + "github.com/onflow/flow-go/module/util" "github.com/onflow/flow-go/utils/debug" ) @@ -36,6 +41,8 @@ var ( flagLogTraces bool flagWriteTraces bool flagParallel int + flagSubscribe bool + flagSubscriptionDelay time.Duration ) var Cmd = &cobra.Command{ @@ -64,7 +71,6 @@ func init() { Cmd.Flags().BoolVar(&flagUseExecutionDataAPI, "use-execution-data-api", true, "use the execution data API (default: true)") Cmd.Flags().StringVar(&flagBlockIDs, "block-ids", "", "block IDs, comma-separated. if --block-count > 1 is used, provide a single block ID") - _ = Cmd.MarkFlagRequired("block-id") Cmd.Flags().IntVar(&flagBlockCount, "block-count", 1, "number of blocks to process (default: 1). if > 1, provide a single block ID with --block-ids") @@ -73,6 +79,10 @@ func init() { Cmd.Flags().BoolVar(&flagWriteTraces, "write-traces", false, "write traces for mismatched transactions") Cmd.Flags().IntVar(&flagParallel, "parallel", 1, "number of blocks to process in parallel (default: 1)") + + Cmd.Flags().BoolVar(&flagSubscribe, "subscribe", false, "subscribe to new sealed blocks and compare them as they arrive") + + Cmd.Flags().DurationVar(&flagSubscriptionDelay, "subscription-delay", 1*time.Minute, "delay after receiving a new sealed block before comparing it") } func run(_ *cobra.Command, args []string) { @@ -80,15 +90,17 @@ func run(_ *cobra.Command, args []string) { chainID := flow.ChainID(flagChain) chain := chainID.Chain() - config, err := grpcclient.NewFlowClientConfig(flagAccessAddress, "", flow.ZeroID, true) - if err != nil { - log.Fatal().Err(err).Msg("failed to create flow client config") - } - - flowClient, err := grpcclient.FlowClient(config) - if err != nil { - log.Fatal().Err(err).Msg("failed to create client") - } + flowClient, err := client.NewClient( + flagAccessAddress, + client.WithGRPCDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(commonrpc.DefaultAccessMaxResponseSize)), + //grpc.WithKeepaliveParams(keepalive.ClientParameters{ + // Time: 10 * time.Second, + // Timeout: 1 * time.Hour, + //}), + ), + ) var remoteClient debug.RemoteClient if flagUseExecutionDataAPI { @@ -105,6 +117,10 @@ func run(_ *cobra.Command, args []string) { var blockIDs []flow.Identifier for _, rawBlockID := range strings.Split(flagBlockIDs, ",") { + if rawBlockID == "" { + continue + } + blockID, err := flow.HexStringToIdentifier(rawBlockID) if err != nil { log.Fatal().Err(err).Str("ID", rawBlockID).Msg("failed to parse block ID") @@ -113,6 +129,98 @@ func run(_ *cobra.Command, args []string) { blockIDs = append(blockIDs, blockID) } + if flagSubscribe { + if len(blockIDs) > 1 { + log.Fatal().Msg("when using --subscribe, provide a single block ID to start from, or none to start from latest") + } + + compareNewBlocks(blockIDs, flowClient, remoteClient, chain) + } else { + if len(blockIDs) == 0 { + log.Fatal().Msg("at least one block ID must be provided") + } + compareBlocks(blockIDs, flowClient, remoteClient, chain) + } +} + +func compareNewBlocks(blockIDs []flow.Identifier, flowClient *client.Client, remoteClient debug.RemoteClient, chain flow.Chain) { + + var ( + blocksMismatched int64 + blocksMatched int64 + txMismatched int64 + txMatched int64 + ) + + g, _ := errgroup.WithContext(context.Background()) + g.SetLimit(flagParallel) + + var lastBlockID flow.Identifier + + if len(blockIDs) > 0 { + lastBlockID = blockIDs[0] + } + +reconnect: + for { + const blockStatus = flow.BlockStatusSealed + var getBlockHeader func() (*flow.Header, error) + if lastBlockID != flow.ZeroID { + getBlockHeader = debug_tx.SubscribeBlockHeadersFromStartBlockID(flowClient, lastBlockID, blockStatus) + } else { + getBlockHeader = debug_tx.SubscribeBlockHeadersFromLatest(flowClient, blockStatus) + } + + for { + log.Info().Msg("Waiting for new sealed block ...") + header, err := getBlockHeader() + if err == io.EOF { + return + } + if err != nil { + log.Warn().Err(err).Msg("failed to receive new block header") + continue reconnect + } + + log.Info().Msgf("New sealed block received: %s (height %d)", header.ID(), header.Height) + + lastBlockID = header.ID() + + g.Go(func() error { + + time.Sleep(flagSubscriptionDelay) + + result := compareBlock( + header.ID(), + header, + remoteClient, + flowClient, + chain, + ) + + atomic.AddInt64(&txMismatched, int64(result.mismatches)) + atomic.AddInt64(&txMatched, int64(result.matches)) + if result.mismatches > 0 { + atomic.AddInt64(&blocksMismatched, 1) + } else { + atomic.AddInt64(&blocksMatched, 1) + } + + log.Info().Msgf("Compared %d blocks: %d matched, %d mismatched", blocksMatched+blocksMismatched, blocksMatched, blocksMismatched) + log.Info().Msgf("Compared %d transactions: %d matched, %d mismatched", txMatched+txMismatched, txMatched, txMismatched) + + return nil + }) + } + } +} + +func compareBlocks( + blockIDs []flow.Identifier, + flowClient *client.Client, + remoteClient debug.RemoteClient, + chain flow.Chain, +) { type block struct { id flow.Identifier header *flow.Header @@ -125,6 +233,16 @@ func run(_ *cobra.Command, args []string) { log.Fatal().Msg("either provide a single block ID and use --block-count, or provide multiple block IDs and do not use --block-count") } + blockHeaderProgress := util.LogProgress( + log.Logger, + util.NewLogProgressConfig( + "fetching block headers", + flagBlockCount, + 1*time.Second, + 100/5, // log every 5% + ), + ) + blockID := blockIDs[0] for i := 0; i < flagBlockCount; i++ { header := debug_tx.FetchBlockHeader(blockID, flowClient) @@ -134,8 +252,11 @@ func run(_ *cobra.Command, args []string) { header: header, }) + blockHeaderProgress(1) + blockID = header.ParentID } + } else { for _, blockID := range blockIDs { header := debug_tx.FetchBlockHeader(blockID, flowClient) @@ -147,6 +268,16 @@ func run(_ *cobra.Command, args []string) { } } + blockProgress := util.LogProgress( + log.Logger, + util.NewLogProgressConfig( + "executing blocks", + flagBlockCount, + 1*time.Second, + 100/5, // log every 5% + ), + ) + var ( blocksMismatched int64 blocksMatched int64 @@ -176,6 +307,8 @@ func run(_ *cobra.Command, args []string) { atomic.AddInt64(&blocksMatched, 1) } + blockProgress(1) + return nil }) } diff --git a/cmd/util/cmd/debug-tx/cmd.go b/cmd/util/cmd/debug-tx/cmd.go index b5c23b12662..0725dd01380 100644 --- a/cmd/util/cmd/debug-tx/cmd.go +++ b/cmd/util/cmd/debug-tx/cmd.go @@ -341,6 +341,50 @@ func FetchBlockHeader( return } +func SubscribeBlockHeadersFromStartBlockID( + flowClient *client.Client, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, +) (get func() (*flow.Header, error)) { + log.Info().Msg("Subscribing to block headers ...") + + var err error + get, err = debug.SubscribeAccessAPIBlockHeadersFromStartBlockID( + context.Background(), + flowClient.RPCClient(), + startBlockID, + blockStatus, + ) + if err != nil { + log.Fatal().Err(err).Msg("failed to subscribe to block headers") + } + + log.Info().Msg("Subscribed to block headers") + + return +} + +func SubscribeBlockHeadersFromLatest( + flowClient *client.Client, + blockStatus flow.BlockStatus, +) (get func() (*flow.Header, error)) { + log.Info().Msg("Subscribing to block headers ...") + + var err error + get, err = debug.SubscribeAccessAPIBlockHeadersFromLatest( + context.Background(), + flowClient.RPCClient(), + blockStatus, + ) + if err != nil { + log.Fatal().Err(err).Msg("failed to subscribe to block headers") + } + + log.Info().Msg("Subscribed to block headers") + + return +} + func FetchBlockTransactions( blockID flow.Identifier, flowClient *client.Client, diff --git a/utils/debug/api.go b/utils/debug/api.go index bfe339ad9ec..bc394fcae49 100644 --- a/utils/debug/api.go +++ b/utils/debug/api.go @@ -4,6 +4,7 @@ import ( "context" "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/entities" rpcConvert "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" @@ -25,3 +26,71 @@ func GetAccessAPIBlockHeader( return rpcConvert.MessageToBlockHeader(resp.Block) } + +func convertBlockStatus(status flow.BlockStatus) entities.BlockStatus { + switch status { + case flow.BlockStatusUnknown: + return entities.BlockStatus_BLOCK_UNKNOWN + case flow.BlockStatusFinalized: + return entities.BlockStatus_BLOCK_FINALIZED + case flow.BlockStatusSealed: + return entities.BlockStatus_BLOCK_SEALED + } + return entities.BlockStatus_BLOCK_UNKNOWN +} + +func SubscribeAccessAPIBlockHeadersFromStartBlockID( + ctx context.Context, + client access.AccessAPIClient, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, +) ( + func() (*flow.Header, error), + error, +) { + req := &access.SubscribeBlockHeadersFromStartBlockIDRequest{ + StartBlockId: startBlockID[:], + BlockStatus: convertBlockStatus(blockStatus), + } + + cl, err := client.SubscribeBlockHeadersFromStartBlockID(ctx, req) + if err != nil { + return nil, err + } + + return func() (*flow.Header, error) { + resp, err := cl.Recv() + if err != nil { + return nil, err + } + + return rpcConvert.MessageToBlockHeader(resp.Header) + }, nil +} + +func SubscribeAccessAPIBlockHeadersFromLatest( + ctx context.Context, + client access.AccessAPIClient, + blockStatus flow.BlockStatus, +) ( + func() (*flow.Header, error), + error, +) { + req := &access.SubscribeBlockHeadersFromLatestRequest{ + BlockStatus: convertBlockStatus(blockStatus), + } + + cl, err := client.SubscribeBlockHeadersFromLatest(ctx, req) + if err != nil { + return nil, err + } + + return func() (*flow.Header, error) { + resp, err := cl.Recv() + if err != nil { + return nil, err + } + + return rpcConvert.MessageToBlockHeader(resp.Header) + }, nil +}