Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 144 additions & 11 deletions cmd/util/cmd/compare-cadence-vm/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"context"
"encoding/hex"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"time"

"github.com/kr/pretty"
sdk "github.com/onflow/flow-go-sdk"
Expand All @@ -16,12 +18,15 @@ import (
"github.com/spf13/cobra"
otelTrace "go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

debug_tx "github.com/onflow/flow-go/cmd/util/cmd/debug-tx"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/grpcclient"
"github.com/onflow/flow-go/module/util"
"github.com/onflow/flow-go/utils/debug"
)

Expand All @@ -36,6 +41,8 @@ var (
flagLogTraces bool
flagWriteTraces bool
flagParallel int
flagSubscribe bool
flagSubscriptionDelay time.Duration
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -64,7 +71,6 @@ func init() {
Cmd.Flags().BoolVar(&flagUseExecutionDataAPI, "use-execution-data-api", true, "use the execution data API (default: true)")

Cmd.Flags().StringVar(&flagBlockIDs, "block-ids", "", "block IDs, comma-separated. if --block-count > 1 is used, provide a single block ID")
_ = Cmd.MarkFlagRequired("block-id")

Cmd.Flags().IntVar(&flagBlockCount, "block-count", 1, "number of blocks to process (default: 1). if > 1, provide a single block ID with --block-ids")

Expand All @@ -73,22 +79,28 @@ func init() {
Cmd.Flags().BoolVar(&flagWriteTraces, "write-traces", false, "write traces for mismatched transactions")

Cmd.Flags().IntVar(&flagParallel, "parallel", 1, "number of blocks to process in parallel (default: 1)")

Cmd.Flags().BoolVar(&flagSubscribe, "subscribe", false, "subscribe to new sealed blocks and compare them as they arrive")

Cmd.Flags().DurationVar(&flagSubscriptionDelay, "subscription-delay", 1*time.Minute, "delay after receiving a new sealed block before comparing it")
}

func run(_ *cobra.Command, args []string) {

chainID := flow.ChainID(flagChain)
chain := chainID.Chain()

config, err := grpcclient.NewFlowClientConfig(flagAccessAddress, "", flow.ZeroID, true)
if err != nil {
log.Fatal().Err(err).Msg("failed to create flow client config")
}

flowClient, err := grpcclient.FlowClient(config)
if err != nil {
log.Fatal().Err(err).Msg("failed to create client")
}
flowClient, err := client.NewClient(
flagAccessAddress,
client.WithGRPCDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(commonrpc.DefaultAccessMaxResponseSize)),
//grpc.WithKeepaliveParams(keepalive.ClientParameters{
// Time: 10 * time.Second,
// Timeout: 1 * time.Hour,
//}),
),
)

var remoteClient debug.RemoteClient
if flagUseExecutionDataAPI {
Expand All @@ -105,6 +117,10 @@ func run(_ *cobra.Command, args []string) {

var blockIDs []flow.Identifier
for _, rawBlockID := range strings.Split(flagBlockIDs, ",") {
if rawBlockID == "" {
continue
}

blockID, err := flow.HexStringToIdentifier(rawBlockID)
if err != nil {
log.Fatal().Err(err).Str("ID", rawBlockID).Msg("failed to parse block ID")
Expand All @@ -113,6 +129,98 @@ func run(_ *cobra.Command, args []string) {
blockIDs = append(blockIDs, blockID)
}

if flagSubscribe {
if len(blockIDs) > 1 {
log.Fatal().Msg("when using --subscribe, provide a single block ID to start from, or none to start from latest")
}

compareNewBlocks(blockIDs, flowClient, remoteClient, chain)
} else {
if len(blockIDs) == 0 {
log.Fatal().Msg("at least one block ID must be provided")
}
compareBlocks(blockIDs, flowClient, remoteClient, chain)
}
}

func compareNewBlocks(blockIDs []flow.Identifier, flowClient *client.Client, remoteClient debug.RemoteClient, chain flow.Chain) {

var (
blocksMismatched int64
blocksMatched int64
txMismatched int64
txMatched int64
)

g, _ := errgroup.WithContext(context.Background())
g.SetLimit(flagParallel)

var lastBlockID flow.Identifier

if len(blockIDs) > 0 {
lastBlockID = blockIDs[0]
}

reconnect:
for {
const blockStatus = flow.BlockStatusSealed
var getBlockHeader func() (*flow.Header, error)
if lastBlockID != flow.ZeroID {
getBlockHeader = debug_tx.SubscribeBlockHeadersFromStartBlockID(flowClient, lastBlockID, blockStatus)
} else {
getBlockHeader = debug_tx.SubscribeBlockHeadersFromLatest(flowClient, blockStatus)
}

for {
log.Info().Msg("Waiting for new sealed block ...")
header, err := getBlockHeader()
if err == io.EOF {
return
}
if err != nil {
log.Warn().Err(err).Msg("failed to receive new block header")
continue reconnect
}

log.Info().Msgf("New sealed block received: %s (height %d)", header.ID(), header.Height)

lastBlockID = header.ID()

g.Go(func() error {

time.Sleep(flagSubscriptionDelay)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this delay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the when the subscription notifies about a new sealed block, the registers for it are not immediately available, and execution will fail. Once I waited for a few seconds, the registers became available, so I added a conservative default of about half a minute


result := compareBlock(
header.ID(),
header,
remoteClient,
flowClient,
chain,
)

atomic.AddInt64(&txMismatched, int64(result.mismatches))
atomic.AddInt64(&txMatched, int64(result.matches))
if result.mismatches > 0 {
atomic.AddInt64(&blocksMismatched, 1)
} else {
atomic.AddInt64(&blocksMatched, 1)
}

log.Info().Msgf("Compared %d blocks: %d matched, %d mismatched", blocksMatched+blocksMismatched, blocksMatched, blocksMismatched)
log.Info().Msgf("Compared %d transactions: %d matched, %d mismatched", txMatched+txMismatched, txMatched, txMismatched)

return nil
})
}
}
}

func compareBlocks(
blockIDs []flow.Identifier,
flowClient *client.Client,
remoteClient debug.RemoteClient,
chain flow.Chain,
) {
type block struct {
id flow.Identifier
header *flow.Header
Expand All @@ -125,6 +233,16 @@ func run(_ *cobra.Command, args []string) {
log.Fatal().Msg("either provide a single block ID and use --block-count, or provide multiple block IDs and do not use --block-count")
}

blockHeaderProgress := util.LogProgress(
log.Logger,
util.NewLogProgressConfig(
"fetching block headers",
flagBlockCount,
1*time.Second,
100/5, // log every 5%
),
)

blockID := blockIDs[0]
for i := 0; i < flagBlockCount; i++ {
header := debug_tx.FetchBlockHeader(blockID, flowClient)
Expand All @@ -134,8 +252,11 @@ func run(_ *cobra.Command, args []string) {
header: header,
})

blockHeaderProgress(1)

blockID = header.ParentID
}

} else {
for _, blockID := range blockIDs {
header := debug_tx.FetchBlockHeader(blockID, flowClient)
Expand All @@ -147,6 +268,16 @@ func run(_ *cobra.Command, args []string) {
}
}

blockProgress := util.LogProgress(
log.Logger,
util.NewLogProgressConfig(
"executing blocks",
flagBlockCount,
1*time.Second,
100/5, // log every 5%
),
)

var (
blocksMismatched int64
blocksMatched int64
Expand Down Expand Up @@ -176,6 +307,8 @@ func run(_ *cobra.Command, args []string) {
atomic.AddInt64(&blocksMatched, 1)
}

blockProgress(1)

return nil
})
}
Expand Down
44 changes: 44 additions & 0 deletions cmd/util/cmd/debug-tx/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,50 @@ func FetchBlockHeader(
return
}

func SubscribeBlockHeadersFromStartBlockID(
flowClient *client.Client,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (get func() (*flow.Header, error)) {
log.Info().Msg("Subscribing to block headers ...")

var err error
get, err = debug.SubscribeAccessAPIBlockHeadersFromStartBlockID(
context.Background(),
flowClient.RPCClient(),
startBlockID,
blockStatus,
)
if err != nil {
log.Fatal().Err(err).Msg("failed to subscribe to block headers")
}

log.Info().Msg("Subscribed to block headers")

return
}

func SubscribeBlockHeadersFromLatest(
flowClient *client.Client,
blockStatus flow.BlockStatus,
) (get func() (*flow.Header, error)) {
log.Info().Msg("Subscribing to block headers ...")

var err error
get, err = debug.SubscribeAccessAPIBlockHeadersFromLatest(
context.Background(),
flowClient.RPCClient(),
blockStatus,
)
if err != nil {
log.Fatal().Err(err).Msg("failed to subscribe to block headers")
}

log.Info().Msg("Subscribed to block headers")

return
}

func FetchBlockTransactions(
blockID flow.Identifier,
flowClient *client.Client,
Expand Down
69 changes: 69 additions & 0 deletions utils/debug/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"

rpcConvert "github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -25,3 +26,71 @@ func GetAccessAPIBlockHeader(

return rpcConvert.MessageToBlockHeader(resp.Block)
}

func convertBlockStatus(status flow.BlockStatus) entities.BlockStatus {
switch status {
case flow.BlockStatusUnknown:
return entities.BlockStatus_BLOCK_UNKNOWN
case flow.BlockStatusFinalized:
return entities.BlockStatus_BLOCK_FINALIZED
case flow.BlockStatusSealed:
return entities.BlockStatus_BLOCK_SEALED
}
return entities.BlockStatus_BLOCK_UNKNOWN
}

func SubscribeAccessAPIBlockHeadersFromStartBlockID(
ctx context.Context,
client access.AccessAPIClient,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (
func() (*flow.Header, error),
error,
) {
req := &access.SubscribeBlockHeadersFromStartBlockIDRequest{
StartBlockId: startBlockID[:],
BlockStatus: convertBlockStatus(blockStatus),
}

cl, err := client.SubscribeBlockHeadersFromStartBlockID(ctx, req)
if err != nil {
return nil, err
}

return func() (*flow.Header, error) {
resp, err := cl.Recv()
if err != nil {
return nil, err
}

return rpcConvert.MessageToBlockHeader(resp.Header)
}, nil
}

func SubscribeAccessAPIBlockHeadersFromLatest(
ctx context.Context,
client access.AccessAPIClient,
blockStatus flow.BlockStatus,
) (
func() (*flow.Header, error),
error,
) {
req := &access.SubscribeBlockHeadersFromLatestRequest{
BlockStatus: convertBlockStatus(blockStatus),
}

cl, err := client.SubscribeBlockHeadersFromLatest(ctx, req)
if err != nil {
return nil, err
}

return func() (*flow.Header, error) {
resp, err := cl.Recv()
if err != nil {
return nil, err
}

return rpcConvert.MessageToBlockHeader(resp.Header)
}, nil
}
Loading