From 01d0ceadef62d5ba83f8697b7e7b12f19b509a75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 3 Dec 2025 11:35:55 -0800 Subject: [PATCH 1/5] add progress logging --- cmd/util/cmd/compare-cadence-vm/cmd.go | 27 ++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/cmd/util/cmd/compare-cadence-vm/cmd.go b/cmd/util/cmd/compare-cadence-vm/cmd.go index d018ca40ead..2c8623ac5c4 100644 --- a/cmd/util/cmd/compare-cadence-vm/cmd.go +++ b/cmd/util/cmd/compare-cadence-vm/cmd.go @@ -8,6 +8,7 @@ import ( "os" "strings" "sync/atomic" + "time" "github.com/kr/pretty" sdk "github.com/onflow/flow-go-sdk" @@ -21,6 +22,7 @@ import ( "github.com/onflow/flow-go/fvm" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/grpcclient" + "github.com/onflow/flow-go/module/util" "github.com/onflow/flow-go/utils/debug" ) @@ -124,6 +126,16 @@ func run(_ *cobra.Command, args []string) { log.Fatal().Msg("either provide a single block ID and use --block-count, or provide multiple block IDs and do not use --block-count") } + blockHeaderProgress := util.LogProgress( + log.Logger, + util.NewLogProgressConfig( + "fetching block headers", + flagBlockCount, + 1*time.Second, + 100/5, // log every 5% + ), + ) + blockID := blockIDs[0] for i := 0; i < flagBlockCount; i++ { header := debug_tx.FetchBlockHeader(blockID, flowClient) @@ -133,8 +145,11 @@ func run(_ *cobra.Command, args []string) { header: header, }) + blockHeaderProgress(1) + blockID = header.ParentID } + } else { for _, blockID := range blockIDs { header := debug_tx.FetchBlockHeader(blockID, flowClient) @@ -146,6 +161,16 @@ func run(_ *cobra.Command, args []string) { } } + blockProgress := util.LogProgress( + log.Logger, + util.NewLogProgressConfig( + "executing blocks", + flagBlockCount, + 1*time.Second, + 100/5, // log every 5% + ), + ) + var ( blocksMismatched int64 blocksMatched int64 @@ -175,6 +200,8 @@ func run(_ *cobra.Command, args []string) { atomic.AddInt64(&blocksMatched, 1) } + blockProgress(1) + return nil }) } From c497a16aaffa7aee156f0d0e166bb0789e61d3b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 3 Dec 2025 16:02:21 -0800 Subject: [PATCH 2/5] add support for subscribing to blocks --- cmd/util/cmd/compare-cadence-vm/cmd.go | 84 +++++++++++++++++++++++++- cmd/util/cmd/debug-tx/cmd.go | 44 ++++++++++++++ utils/debug/api.go | 69 +++++++++++++++++++++ 3 files changed, 196 insertions(+), 1 deletion(-) diff --git a/cmd/util/cmd/compare-cadence-vm/cmd.go b/cmd/util/cmd/compare-cadence-vm/cmd.go index 2c8623ac5c4..f0f4acf4435 100644 --- a/cmd/util/cmd/compare-cadence-vm/cmd.go +++ b/cmd/util/cmd/compare-cadence-vm/cmd.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "fmt" + "io" "os" "strings" "sync/atomic" @@ -37,6 +38,7 @@ var ( flagLogTraces bool flagWriteTraces bool flagParallel int + flagSubscribe bool ) var Cmd = &cobra.Command{ @@ -65,7 +67,6 @@ func init() { Cmd.Flags().BoolVar(&flagUseExecutionDataAPI, "use-execution-data-api", true, "use the execution data API (default: true)") Cmd.Flags().StringVar(&flagBlockIDs, "block-ids", "", "block IDs, comma-separated. if --block-count > 1 is used, provide a single block ID") - _ = Cmd.MarkFlagRequired("block-id") Cmd.Flags().IntVar(&flagBlockCount, "block-count", 1, "number of blocks to process (default: 1). if > 1, provide a single block ID with --block-ids") @@ -74,6 +75,8 @@ func init() { Cmd.Flags().BoolVar(&flagWriteTraces, "write-traces", false, "write traces for mismatched transactions") Cmd.Flags().IntVar(&flagParallel, "parallel", 1, "number of blocks to process in parallel (default: 1)") + + Cmd.Flags().BoolVar(&flagSubscribe, "subscribe", false, "subscribe to new sealed blocks and compare them as they arrive") } func run(_ *cobra.Command, args []string) { @@ -106,6 +109,10 @@ func run(_ *cobra.Command, args []string) { var blockIDs []flow.Identifier for _, rawBlockID := range strings.Split(flagBlockIDs, ",") { + if rawBlockID == "" { + continue + } + blockID, err := flow.HexStringToIdentifier(rawBlockID) if err != nil { log.Fatal().Err(err).Str("ID", rawBlockID).Msg("failed to parse block ID") @@ -114,6 +121,81 @@ func run(_ *cobra.Command, args []string) { blockIDs = append(blockIDs, blockID) } + if flagSubscribe { + if len(blockIDs) > 1 { + log.Fatal().Msg("when using --subscribe, provide a single block ID to start from, or none to start from latest") + } + + compareNewBlocks(blockIDs, flowClient, remoteClient, chain) + } else { + if len(blockIDs) == 0 { + log.Fatal().Msg("at least one block ID must be provided") + } + compareBlocks(blockIDs, flowClient, remoteClient, chain) + } +} + +func compareNewBlocks(blockIDs []flow.Identifier, flowClient *client.Client, remoteClient debug.RemoteClient, chain flow.Chain) { + + const blockStatus = flow.BlockStatusSealed + var getBlockHeader func() (*flow.Header, error) + if len(blockIDs) > 1 { + getBlockHeader = debug_tx.SubscribeBlockHeadersFromStartBlockID(flowClient, blockIDs[0], blockStatus) + } else { + getBlockHeader = debug_tx.SubscribeBlockHeadersFromLatest(flowClient, blockStatus) + } + + var ( + blocksMismatched int64 + blocksMatched int64 + txMismatched int64 + txMatched int64 + ) + + g, _ := errgroup.WithContext(context.Background()) + g.SetLimit(flagParallel) + + for { + log.Info().Msg("Waiting for new sealed block ...") + header, err := getBlockHeader() + if err == io.EOF { + return + } + + log.Info().Msgf("New sealed block received: %s (height %d)", header.ID(), header.Height) + + g.Go(func() error { + + result := compareBlock( + header.ID(), + header, + remoteClient, + flowClient, + chain, + ) + + atomic.AddInt64(&txMismatched, int64(result.mismatches)) + atomic.AddInt64(&txMatched, int64(result.matches)) + if result.mismatches > 0 { + atomic.AddInt64(&blocksMismatched, 1) + } else { + atomic.AddInt64(&blocksMatched, 1) + } + + log.Info().Msgf("Compared %d blocks: %d matched, %d mismatched", blocksMatched+blocksMismatched, blocksMatched, blocksMismatched) + log.Info().Msgf("Compared %d transactions: %d matched, %d mismatched", txMatched+txMismatched, txMatched, txMismatched) + + return nil + }) + } +} + +func compareBlocks( + blockIDs []flow.Identifier, + flowClient *client.Client, + remoteClient debug.RemoteClient, + chain flow.Chain, +) { type block struct { id flow.Identifier header *flow.Header diff --git a/cmd/util/cmd/debug-tx/cmd.go b/cmd/util/cmd/debug-tx/cmd.go index ddcac21467b..c8752ba29fb 100644 --- a/cmd/util/cmd/debug-tx/cmd.go +++ b/cmd/util/cmd/debug-tx/cmd.go @@ -341,6 +341,50 @@ func FetchBlockHeader( return } +func SubscribeBlockHeadersFromStartBlockID( + flowClient *client.Client, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, +) (get func() (*flow.Header, error)) { + log.Info().Msg("Subscribing to block headers ...") + + var err error + get, err = debug.SubscribeAccessAPIBlockHeadersFromStartBlockID( + context.Background(), + flowClient.RPCClient(), + startBlockID, + blockStatus, + ) + if err != nil { + log.Fatal().Err(err).Msg("failed to subscribe to block headers") + } + + log.Info().Msg("Subscribed to block headers") + + return +} + +func SubscribeBlockHeadersFromLatest( + flowClient *client.Client, + blockStatus flow.BlockStatus, +) (get func() (*flow.Header, error)) { + log.Info().Msg("Subscribing to block headers ...") + + var err error + get, err = debug.SubscribeAccessAPIBlockHeadersFromLatest( + context.Background(), + flowClient.RPCClient(), + blockStatus, + ) + if err != nil { + log.Fatal().Err(err).Msg("failed to subscribe to block headers") + } + + log.Info().Msg("Subscribed to block headers") + + return +} + func FetchBlockTransactions( blockID flow.Identifier, flowClient *client.Client, diff --git a/utils/debug/api.go b/utils/debug/api.go index bfe339ad9ec..bc394fcae49 100644 --- a/utils/debug/api.go +++ b/utils/debug/api.go @@ -4,6 +4,7 @@ import ( "context" "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/entities" rpcConvert "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" @@ -25,3 +26,71 @@ func GetAccessAPIBlockHeader( return rpcConvert.MessageToBlockHeader(resp.Block) } + +func convertBlockStatus(status flow.BlockStatus) entities.BlockStatus { + switch status { + case flow.BlockStatusUnknown: + return entities.BlockStatus_BLOCK_UNKNOWN + case flow.BlockStatusFinalized: + return entities.BlockStatus_BLOCK_FINALIZED + case flow.BlockStatusSealed: + return entities.BlockStatus_BLOCK_SEALED + } + return entities.BlockStatus_BLOCK_UNKNOWN +} + +func SubscribeAccessAPIBlockHeadersFromStartBlockID( + ctx context.Context, + client access.AccessAPIClient, + startBlockID flow.Identifier, + blockStatus flow.BlockStatus, +) ( + func() (*flow.Header, error), + error, +) { + req := &access.SubscribeBlockHeadersFromStartBlockIDRequest{ + StartBlockId: startBlockID[:], + BlockStatus: convertBlockStatus(blockStatus), + } + + cl, err := client.SubscribeBlockHeadersFromStartBlockID(ctx, req) + if err != nil { + return nil, err + } + + return func() (*flow.Header, error) { + resp, err := cl.Recv() + if err != nil { + return nil, err + } + + return rpcConvert.MessageToBlockHeader(resp.Header) + }, nil +} + +func SubscribeAccessAPIBlockHeadersFromLatest( + ctx context.Context, + client access.AccessAPIClient, + blockStatus flow.BlockStatus, +) ( + func() (*flow.Header, error), + error, +) { + req := &access.SubscribeBlockHeadersFromLatestRequest{ + BlockStatus: convertBlockStatus(blockStatus), + } + + cl, err := client.SubscribeBlockHeadersFromLatest(ctx, req) + if err != nil { + return nil, err + } + + return func() (*flow.Header, error) { + resp, err := cl.Recv() + if err != nil { + return nil, err + } + + return rpcConvert.MessageToBlockHeader(resp.Header) + }, nil +} From ed9e482d97802b565bb858e600652b3a1a6b029c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 3 Dec 2025 16:27:22 -0800 Subject: [PATCH 3/5] handle error --- cmd/util/cmd/compare-cadence-vm/cmd.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/util/cmd/compare-cadence-vm/cmd.go b/cmd/util/cmd/compare-cadence-vm/cmd.go index f0f4acf4435..0817d710b66 100644 --- a/cmd/util/cmd/compare-cadence-vm/cmd.go +++ b/cmd/util/cmd/compare-cadence-vm/cmd.go @@ -161,6 +161,9 @@ func compareNewBlocks(blockIDs []flow.Identifier, flowClient *client.Client, rem if err == io.EOF { return } + if err != nil { + log.Fatal().Err(err).Msg("failed to receive new block header") + } log.Info().Msgf("New sealed block received: %s (height %d)", header.ID(), header.Height) From 9ea57f0deaa842f7c6ab4e744c1ada70d597d8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 3 Dec 2025 16:27:52 -0800 Subject: [PATCH 4/5] wait until comparing new blocks, so hopefully registers are available --- cmd/util/cmd/compare-cadence-vm/cmd.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/util/cmd/compare-cadence-vm/cmd.go b/cmd/util/cmd/compare-cadence-vm/cmd.go index 0817d710b66..3ef8d6acc8c 100644 --- a/cmd/util/cmd/compare-cadence-vm/cmd.go +++ b/cmd/util/cmd/compare-cadence-vm/cmd.go @@ -39,6 +39,7 @@ var ( flagWriteTraces bool flagParallel int flagSubscribe bool + flagSubscriptionDelay time.Duration ) var Cmd = &cobra.Command{ @@ -77,6 +78,8 @@ func init() { Cmd.Flags().IntVar(&flagParallel, "parallel", 1, "number of blocks to process in parallel (default: 1)") Cmd.Flags().BoolVar(&flagSubscribe, "subscribe", false, "subscribe to new sealed blocks and compare them as they arrive") + + Cmd.Flags().DurationVar(&flagSubscriptionDelay, "subscription-delay", 1*time.Minute, "delay after receiving a new sealed block before comparing it") } func run(_ *cobra.Command, args []string) { @@ -169,6 +172,8 @@ func compareNewBlocks(blockIDs []flow.Identifier, flowClient *client.Client, rem g.Go(func() error { + time.Sleep(flagSubscriptionDelay) + result := compareBlock( header.ID(), header, From 66e3bf8fb11565fe04e9a478cee231f8b3e508ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Thu, 4 Dec 2025 16:42:01 -0800 Subject: [PATCH 5/5] re-connect on failures --- cmd/util/cmd/compare-cadence-vm/cmd.go | 108 ++++++++++++++----------- 1 file changed, 62 insertions(+), 46 deletions(-) diff --git a/cmd/util/cmd/compare-cadence-vm/cmd.go b/cmd/util/cmd/compare-cadence-vm/cmd.go index 3ef8d6acc8c..0e9a86e700d 100644 --- a/cmd/util/cmd/compare-cadence-vm/cmd.go +++ b/cmd/util/cmd/compare-cadence-vm/cmd.go @@ -18,11 +18,13 @@ import ( "github.com/spf13/cobra" otelTrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" debug_tx "github.com/onflow/flow-go/cmd/util/cmd/debug-tx" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/fvm" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/grpcclient" "github.com/onflow/flow-go/module/util" "github.com/onflow/flow-go/utils/debug" ) @@ -87,15 +89,17 @@ func run(_ *cobra.Command, args []string) { chainID := flow.ChainID(flagChain) chain := chainID.Chain() - config, err := grpcclient.NewFlowClientConfig(flagAccessAddress, "", flow.ZeroID, true) - if err != nil { - log.Fatal().Err(err).Msg("failed to create flow client config") - } - - flowClient, err := grpcclient.FlowClient(config) - if err != nil { - log.Fatal().Err(err).Msg("failed to create client") - } + flowClient, err := client.NewClient( + flagAccessAddress, + client.WithGRPCDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(commonrpc.DefaultAccessMaxResponseSize)), + //grpc.WithKeepaliveParams(keepalive.ClientParameters{ + // Time: 10 * time.Second, + // Timeout: 1 * time.Hour, + //}), + ), + ) var remoteClient debug.RemoteClient if flagUseExecutionDataAPI { @@ -140,14 +144,6 @@ func run(_ *cobra.Command, args []string) { func compareNewBlocks(blockIDs []flow.Identifier, flowClient *client.Client, remoteClient debug.RemoteClient, chain flow.Chain) { - const blockStatus = flow.BlockStatusSealed - var getBlockHeader func() (*flow.Header, error) - if len(blockIDs) > 1 { - getBlockHeader = debug_tx.SubscribeBlockHeadersFromStartBlockID(flowClient, blockIDs[0], blockStatus) - } else { - getBlockHeader = debug_tx.SubscribeBlockHeadersFromLatest(flowClient, blockStatus) - } - var ( blocksMismatched int64 blocksMatched int64 @@ -158,43 +154,63 @@ func compareNewBlocks(blockIDs []flow.Identifier, flowClient *client.Client, rem g, _ := errgroup.WithContext(context.Background()) g.SetLimit(flagParallel) + var lastBlockID flow.Identifier + + if len(blockIDs) > 0 { + lastBlockID = blockIDs[0] + } + +reconnect: for { - log.Info().Msg("Waiting for new sealed block ...") - header, err := getBlockHeader() - if err == io.EOF { - return - } - if err != nil { - log.Fatal().Err(err).Msg("failed to receive new block header") + const blockStatus = flow.BlockStatusSealed + var getBlockHeader func() (*flow.Header, error) + if lastBlockID != flow.ZeroID { + getBlockHeader = debug_tx.SubscribeBlockHeadersFromStartBlockID(flowClient, lastBlockID, blockStatus) + } else { + getBlockHeader = debug_tx.SubscribeBlockHeadersFromLatest(flowClient, blockStatus) } - log.Info().Msgf("New sealed block received: %s (height %d)", header.ID(), header.Height) + for { + log.Info().Msg("Waiting for new sealed block ...") + header, err := getBlockHeader() + if err == io.EOF { + return + } + if err != nil { + log.Warn().Err(err).Msg("failed to receive new block header") + continue reconnect + } - g.Go(func() error { + log.Info().Msgf("New sealed block received: %s (height %d)", header.ID(), header.Height) - time.Sleep(flagSubscriptionDelay) + lastBlockID = header.ID() - result := compareBlock( - header.ID(), - header, - remoteClient, - flowClient, - chain, - ) + g.Go(func() error { - atomic.AddInt64(&txMismatched, int64(result.mismatches)) - atomic.AddInt64(&txMatched, int64(result.matches)) - if result.mismatches > 0 { - atomic.AddInt64(&blocksMismatched, 1) - } else { - atomic.AddInt64(&blocksMatched, 1) - } + time.Sleep(flagSubscriptionDelay) - log.Info().Msgf("Compared %d blocks: %d matched, %d mismatched", blocksMatched+blocksMismatched, blocksMatched, blocksMismatched) - log.Info().Msgf("Compared %d transactions: %d matched, %d mismatched", txMatched+txMismatched, txMatched, txMismatched) + result := compareBlock( + header.ID(), + header, + remoteClient, + flowClient, + chain, + ) - return nil - }) + atomic.AddInt64(&txMismatched, int64(result.mismatches)) + atomic.AddInt64(&txMatched, int64(result.matches)) + if result.mismatches > 0 { + atomic.AddInt64(&blocksMismatched, 1) + } else { + atomic.AddInt64(&blocksMatched, 1) + } + + log.Info().Msgf("Compared %d blocks: %d matched, %d mismatched", blocksMatched+blocksMismatched, blocksMatched, blocksMismatched) + log.Info().Msgf("Compared %d transactions: %d matched, %d mismatched", txMatched+txMismatched, txMatched, txMismatched) + + return nil + }) + } } }