Skip to content

Commit 1d1f463

Browse files
committed
chain validation and fix command
1 parent 627b36c commit 1d1f463

File tree

16 files changed

+1408
-121
lines changed

16 files changed

+1408
-121
lines changed

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ func init() {
220220
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
221221
rootCmd.AddCommand(orchestratorCmd)
222222
rootCmd.AddCommand(apiCmd)
223+
rootCmd.AddCommand(validateAndFixCmd)
224+
rootCmd.AddCommand(validateCmd)
223225
}
224226

225227
func initConfig() {

cmd/validate.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package cmd
2+
3+
import (
4+
"math/big"
5+
6+
"github.com/rs/zerolog/log"
7+
"github.com/spf13/cobra"
8+
config "github.com/thirdweb-dev/indexer/configs"
9+
"github.com/thirdweb-dev/indexer/internal/orchestrator"
10+
"github.com/thirdweb-dev/indexer/internal/rpc"
11+
"github.com/thirdweb-dev/indexer/internal/storage"
12+
)
13+
14+
var (
15+
validateCmd = &cobra.Command{
16+
Use: "validate",
17+
Short: "Validate blockchain data integrity",
18+
Long: "Validate a range of blocks for data integrity issues including transaction roots and logs bloom verification",
19+
Run: func(cmd *cobra.Command, args []string) {
20+
RunValidate(cmd, args)
21+
},
22+
}
23+
)
24+
25+
/**
26+
* Validates a range of blocks (end and start are inclusive) for a given chain
27+
* First argument is the start block number
28+
* Second argument (optional) is the end block number
29+
*/
30+
func RunValidate(cmd *cobra.Command, args []string) {
31+
if len(args) < 1 {
32+
log.Fatal().Msg("Start block number is required")
33+
}
34+
startBlock, success := new(big.Int).SetString(args[0], 10)
35+
if !success {
36+
log.Fatal().Msg("Failed to parse start block number")
37+
}
38+
39+
var endBlock *big.Int
40+
if len(args) > 1 {
41+
endBlock, success = new(big.Int).SetString(args[1], 10)
42+
if !success {
43+
log.Fatal().Msg("Failed to parse end block number")
44+
}
45+
}
46+
if endBlock == nil {
47+
endBlock = startBlock
48+
}
49+
50+
rpcClient, err := rpc.Initialize()
51+
if err != nil {
52+
log.Fatal().Err(err).Msg("Failed to initialize RPC")
53+
}
54+
log.Info().Msgf("Running validation for chain %d", rpcClient.GetChainID())
55+
56+
s, err := storage.NewStorageConnector(&config.Cfg.Storage)
57+
if err != nil {
58+
log.Fatal().Err(err).Msg("Failed to initialize storage")
59+
}
60+
61+
validator := orchestrator.NewValidator(rpcClient, s)
62+
63+
_, invalidBlocks, err := validator.ValidateBlockRange(startBlock, endBlock)
64+
if err != nil {
65+
log.Fatal().Err(err).Msg("Failed to validate blocks")
66+
}
67+
68+
if len(invalidBlocks) > 0 {
69+
log.Info().Msgf("Found %d invalid blocks", len(invalidBlocks))
70+
for _, block := range invalidBlocks {
71+
log.Info().Msgf("Invalid block: %s", block.Block.Number)
72+
}
73+
} else {
74+
log.Info().Msg("No invalid blocks found")
75+
}
76+
}

cmd/validate_and_fix.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package cmd
2+
3+
import (
4+
"crypto/tls"
5+
"fmt"
6+
"math/big"
7+
"strconv"
8+
9+
"github.com/ClickHouse/clickhouse-go/v2"
10+
"github.com/rs/zerolog/log"
11+
"github.com/spf13/cobra"
12+
config "github.com/thirdweb-dev/indexer/configs"
13+
"github.com/thirdweb-dev/indexer/internal/orchestrator"
14+
"github.com/thirdweb-dev/indexer/internal/rpc"
15+
"github.com/thirdweb-dev/indexer/internal/storage"
16+
"github.com/thirdweb-dev/indexer/internal/validation"
17+
)
18+
19+
var (
20+
validateAndFixCmd = &cobra.Command{
21+
Use: "validateAndFix",
22+
Short: "Validate and fix blockchain data",
23+
Long: "Validate blockchain data in batches and automatically fix any issues found including duplicates, gaps, and invalid blocks",
24+
Run: func(cmd *cobra.Command, args []string) {
25+
RunValidateAndFix(cmd, args)
26+
},
27+
}
28+
)
29+
30+
func RunValidateAndFix(cmd *cobra.Command, args []string) {
31+
batchSize := big.NewInt(1000)
32+
fixBatchSize := 0 // default is no batch size
33+
if len(args) > 0 {
34+
batchSizeFromArgs, err := strconv.Atoi(args[0])
35+
if err != nil {
36+
log.Fatal().Err(err).Msg("Failed to parse batch size")
37+
}
38+
if batchSizeFromArgs < 1 {
39+
batchSizeFromArgs = 1
40+
}
41+
batchSize = big.NewInt(int64(batchSizeFromArgs))
42+
log.Info().Msgf("Using batch size %d from args", batchSize)
43+
}
44+
if len(args) > 1 {
45+
fixBatchSizeFromArgs, err := strconv.Atoi(args[1])
46+
if err != nil {
47+
log.Fatal().Err(err).Msg("Failed to parse fix batch size")
48+
}
49+
fixBatchSize = fixBatchSizeFromArgs
50+
}
51+
log.Debug().Msgf("Batch size: %d, fix batch size: %d", batchSize, fixBatchSize)
52+
batchSize = new(big.Int).Sub(batchSize, big.NewInt(1)) // -1 because range ends are inclusive
53+
54+
rpcClient, err := rpc.Initialize()
55+
if err != nil {
56+
log.Fatal().Err(err).Msg("Failed to initialize RPC")
57+
}
58+
log.Info().Msgf("Running validationAndFix for chain %d", rpcClient.GetChainID())
59+
60+
s, err := storage.NewStorageConnector(&config.Cfg.Storage)
61+
if err != nil {
62+
log.Fatal().Err(err).Msg("Failed to initialize storage")
63+
}
64+
cursor, err := validation.InitCursor(rpcClient.GetChainID(), s)
65+
if err != nil {
66+
log.Fatal().Err(err).Msg("Failed to initialize cursor")
67+
}
68+
log.Debug().Msgf("Cursor initialized for chain %d, starting from block %d", rpcClient.GetChainID(), cursor.LastScannedBlockNumber)
69+
70+
conn, err := clickhouse.Open(&clickhouse.Options{
71+
Addr: []string{fmt.Sprintf("%s:%d", config.Cfg.Storage.Main.Clickhouse.Host, config.Cfg.Storage.Main.Clickhouse.Port)},
72+
Protocol: clickhouse.Native,
73+
TLS: &tls.Config{
74+
MinVersion: tls.VersionTLS12,
75+
},
76+
Auth: clickhouse.Auth{
77+
Username: config.Cfg.Storage.Main.Clickhouse.Username,
78+
Password: config.Cfg.Storage.Main.Clickhouse.Password,
79+
},
80+
Settings: func() clickhouse.Settings {
81+
settings := clickhouse.Settings{
82+
"do_not_merge_across_partitions_select_final": "1",
83+
"use_skip_indexes_if_final": "1",
84+
"optimize_move_to_prewhere_if_final": "1",
85+
"async_insert": "1",
86+
"wait_for_async_insert": "1",
87+
}
88+
return settings
89+
}(),
90+
})
91+
if err != nil {
92+
log.Fatal().Err(err).Msg("Failed to connect to ClickHouse")
93+
}
94+
defer conn.Close()
95+
96+
startBlock := new(big.Int).Add(cursor.LastScannedBlockNumber, big.NewInt(1))
97+
98+
for startBlock.Cmp(cursor.MaxBlockNumber) <= 0 {
99+
batchEndBlock := new(big.Int).Add(startBlock, batchSize)
100+
if batchEndBlock.Cmp(cursor.MaxBlockNumber) > 0 {
101+
batchEndBlock = new(big.Int).Set(cursor.MaxBlockNumber)
102+
}
103+
104+
log.Info().Msgf("Validating batch of blocks from %s to %s", startBlock.String(), batchEndBlock.String())
105+
err := validateAndFixRange(rpcClient, s, conn, startBlock, batchEndBlock, fixBatchSize)
106+
if err != nil {
107+
log.Fatal().Err(err).Msgf("failed to validate and fix range %v-%v", startBlock, batchEndBlock)
108+
}
109+
110+
startBlock = new(big.Int).Add(batchEndBlock, big.NewInt(1))
111+
cursor.Update(batchEndBlock)
112+
}
113+
}
114+
115+
/**
116+
* Validates a range of blocks (end and start are inclusive) for a given chain and fixes any problems it finds
117+
*/
118+
func validateAndFixRange(rpcClient rpc.IRPCClient, s storage.IStorage, conn clickhouse.Conn, startBlock *big.Int, endBlock *big.Int, fixBatchSize int) error {
119+
validator := orchestrator.NewValidator(rpcClient, s)
120+
121+
chainId := rpcClient.GetChainID()
122+
err := validation.FindAndRemoveDuplicates(conn, chainId, startBlock, endBlock)
123+
if err != nil {
124+
return fmt.Errorf("failed to find and fix duplicates: %w", err)
125+
}
126+
127+
err = validator.FindAndFixGaps(startBlock, endBlock)
128+
if err != nil {
129+
return fmt.Errorf("failed to find and fix gaps: %w", err)
130+
}
131+
132+
_, invalidBlocks, err := validator.ValidateBlockRange(startBlock, endBlock)
133+
if err != nil {
134+
return fmt.Errorf("failed to validate and fix blocks: %w", err)
135+
}
136+
137+
invalidBlockNumbers := make([]*big.Int, 0)
138+
for _, blockData := range invalidBlocks {
139+
invalidBlockNumbers = append(invalidBlockNumbers, blockData.Block.Number)
140+
}
141+
142+
if len(invalidBlocks) > 0 {
143+
err = validator.FixBlocks(invalidBlockNumbers, fixBatchSize)
144+
if err != nil {
145+
return fmt.Errorf("failed to fix blocks: %w", err)
146+
}
147+
}
148+
149+
log.Debug().Msgf("ValidationAndFix complete for range %v-%v", startBlock, endBlock)
150+
return nil
151+
}

go.mod

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ go 1.23.0
44

55
require (
66
github.com/ClickHouse/clickhouse-go/v2 v2.36.0
7-
github.com/ethereum/go-ethereum v1.14.8
7+
github.com/ethereum/go-ethereum v1.15.11
88
github.com/gin-gonic/gin v1.10.0
99
github.com/gorilla/schema v1.4.1
10+
github.com/holiman/uint256 v1.3.2
1011
github.com/prometheus/client_golang v1.20.4
1112
github.com/rs/zerolog v1.33.0
1213
github.com/spf13/cobra v1.8.1
@@ -24,21 +25,21 @@ require (
2425
github.com/Microsoft/go-winio v0.6.2 // indirect
2526
github.com/andybalholm/brotli v1.1.1 // indirect
2627
github.com/beorn7/perks v1.0.1 // indirect
27-
github.com/bits-and-blooms/bitset v1.10.0 // indirect
28-
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
28+
github.com/bits-and-blooms/bitset v1.20.0 // indirect
2929
github.com/bytedance/sonic v1.12.6 // indirect
3030
github.com/bytedance/sonic/loader v0.2.1 // indirect
3131
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3232
github.com/cloudwego/base64x v0.1.4 // indirect
3333
github.com/cloudwego/iasm v0.2.0 // indirect
34-
github.com/consensys/bavard v0.1.13 // indirect
35-
github.com/consensys/gnark-crypto v0.12.1 // indirect
36-
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
37-
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
34+
github.com/consensys/bavard v0.1.27 // indirect
35+
github.com/consensys/gnark-crypto v0.16.0 // indirect
36+
github.com/crate-crypto/go-eth-kzg v1.3.0 // indirect
37+
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect
3838
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3939
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
4040
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
41-
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
41+
github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect
42+
github.com/ethereum/go-verkle v0.2.2 // indirect
4243
github.com/fsnotify/fsnotify v1.7.0 // indirect
4344
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
4445
github.com/gin-contrib/sse v0.1.0 // indirect
@@ -53,11 +54,11 @@ require (
5354
github.com/go-playground/universal-translator v0.18.1 // indirect
5455
github.com/go-playground/validator/v10 v10.23.0 // indirect
5556
github.com/goccy/go-json v0.10.4 // indirect
56-
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
57+
github.com/gofrs/flock v0.8.1 // indirect
58+
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
5759
github.com/google/uuid v1.6.0 // indirect
5860
github.com/gorilla/websocket v1.4.2 // indirect
5961
github.com/hashicorp/hcl v1.0.0 // indirect
60-
github.com/holiman/uint256 v1.3.1 // indirect
6162
github.com/inconshreveable/mousetrap v1.1.0 // indirect
6263
github.com/josharian/intern v1.0.0 // indirect
6364
github.com/json-iterator/go v1.1.12 // indirect
@@ -68,11 +69,13 @@ require (
6869
github.com/mailru/easyjson v0.7.7 // indirect
6970
github.com/mattn/go-colorable v0.1.13 // indirect
7071
github.com/mattn/go-isatty v0.0.20 // indirect
72+
github.com/mattn/go-runewidth v0.0.13 // indirect
7173
github.com/mitchellh/mapstructure v1.5.0 // indirect
7274
github.com/mmcloughlin/addchain v0.4.0 // indirect
7375
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
7476
github.com/modern-go/reflect2 v1.0.2 // indirect
7577
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
78+
github.com/olekukonko/tablewriter v0.0.5 // indirect
7679
github.com/paulmach/orb v0.11.1 // indirect
7780
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
7881
github.com/pierrec/lz4/v4 v4.1.22 // indirect
@@ -81,6 +84,7 @@ require (
8184
github.com/prometheus/client_model v0.6.1 // indirect
8285
github.com/prometheus/common v0.55.0 // indirect
8386
github.com/prometheus/procfs v0.15.1 // indirect
87+
github.com/rivo/uniseg v0.2.0 // indirect
8488
github.com/sagikazarmark/locafero v0.4.0 // indirect
8589
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
8690
github.com/segmentio/asm v1.2.0 // indirect
@@ -92,13 +96,12 @@ require (
9296
github.com/spf13/pflag v1.0.5 // indirect
9397
github.com/stretchr/objx v0.5.2 // indirect
9498
github.com/subosito/gotenv v1.6.0 // indirect
95-
github.com/supranational/blst v0.3.11 // indirect
99+
github.com/supranational/blst v0.3.14 // indirect
96100
github.com/tklauser/go-sysconf v0.3.12 // indirect
97101
github.com/tklauser/numcpus v0.6.1 // indirect
98102
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
99103
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
100104
github.com/ugorji/go/codec v1.2.12 // indirect
101-
github.com/urfave/cli/v2 v2.27.4 // indirect
102105
github.com/yusufpapurcu/wmi v1.2.4 // indirect
103106
go.opentelemetry.io/otel v1.36.0 // indirect
104107
go.opentelemetry.io/otel/trace v1.36.0 // indirect

0 commit comments

Comments
 (0)