Skip to content

Commit 8d15fa9

Browse files
authored
Merge pull request #246 from thirdweb-dev/07-24-validate_and_migrate_script
validate and migrate script
2 parents 1d95e08 + 713a077 commit 8d15fa9

File tree

8 files changed

+613
-2
lines changed

8 files changed

+613
-2
lines changed

cmd/migrate_valid.go

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"math/big"
6+
"os"
7+
"strconv"
8+
9+
"github.com/rs/zerolog/log"
10+
"github.com/spf13/cobra"
11+
config "github.com/thirdweb-dev/indexer/configs"
12+
"github.com/thirdweb-dev/indexer/internal/common"
13+
"github.com/thirdweb-dev/indexer/internal/orchestrator"
14+
"github.com/thirdweb-dev/indexer/internal/rpc"
15+
"github.com/thirdweb-dev/indexer/internal/storage"
16+
)
17+
18+
var (
19+
migrateValidationCmd = &cobra.Command{
20+
Use: "validationMigration",
21+
Short: "Migrate valid block data from main storage to target storage",
22+
Long: "Migrate valid blocks, logs, transactions, traces, etc. to target storage. It will query current data from main storage and validate it. Anything missing or not passing validation will be queried from the RPC.",
23+
Run: func(cmd *cobra.Command, args []string) {
24+
RunValidationMigration(cmd, args)
25+
},
26+
}
27+
)
28+
29+
const (
30+
TARGET_STORAGE_DATABASE = "temp"
31+
DEFAULT_RPC_BATCH_SIZE = 200
32+
DEFAULT_BATCH_SIZE = 1000
33+
)
34+
35+
func RunValidationMigration(cmd *cobra.Command, args []string) {
36+
migrator := NewMigrator()
37+
defer migrator.Close()
38+
39+
rangeStartBlock, rangeEndBlock := migrator.DetermineMigrationBoundaries()
40+
41+
log.Info().Msgf("Migrating blocks from %s to %s (both ends inclusive)", rangeStartBlock.String(), rangeEndBlock.String())
42+
43+
// 2. Start going in loops
44+
for currentBlock := rangeStartBlock; currentBlock.Cmp(rangeEndBlock) <= 0; {
45+
endBlock := new(big.Int).Add(currentBlock, big.NewInt(int64(migrator.migrationBatchSize-1)))
46+
if endBlock.Cmp(rangeEndBlock) > 0 {
47+
endBlock = rangeEndBlock
48+
}
49+
50+
blockNumbers := generateBlockNumbersForRange(currentBlock, endBlock)
51+
log.Info().Msgf("Processing blocks %s to %s", blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String())
52+
53+
validBlocksForRange := migrator.GetValidBlocksForRange(blockNumbers)
54+
55+
blocksToInsertMap := make(map[string]common.BlockData)
56+
for _, blockData := range validBlocksForRange {
57+
blocksToInsertMap[blockData.Block.Number.String()] = blockData
58+
}
59+
60+
// Loop over block numbers to find missing blocks
61+
missingBlocks := make([]*big.Int, 0)
62+
for _, blockNum := range blockNumbers {
63+
if _, exists := blocksToInsertMap[blockNum.String()]; !exists {
64+
missingBlocks = append(missingBlocks, blockNum)
65+
}
66+
}
67+
68+
validMissingBlocks := migrator.GetValidBlocksFromRPC(missingBlocks)
69+
for _, blockData := range validMissingBlocks {
70+
blocksToInsertMap[blockData.Block.Number.String()] = blockData
71+
}
72+
73+
blocksToInsert := make([]common.BlockData, 0)
74+
for _, blockData := range blocksToInsertMap {
75+
blocksToInsert = append(blocksToInsert, blockData)
76+
}
77+
78+
err := migrator.targetConn.InsertBlockData(blocksToInsert)
79+
if err != nil {
80+
log.Fatal().Err(err).Msg("Failed to insert blocks to target storage")
81+
}
82+
83+
currentBlock = new(big.Int).Add(endBlock, big.NewInt(1))
84+
}
85+
86+
// 3. then finally copy partitions from target table to main tables
87+
log.Info().Msg("Done")
88+
}
89+
90+
type Migrator struct {
91+
rpcClient rpc.IRPCClient
92+
storage storage.IStorage
93+
validator *orchestrator.Validator
94+
targetConn *storage.ClickHouseConnector
95+
migrationBatchSize int
96+
rpcBatchSize int
97+
}
98+
99+
func NewMigrator() *Migrator {
100+
targetDBName := os.Getenv("TARGET_STORAGE_DATABASE")
101+
if targetDBName == "" {
102+
targetDBName = TARGET_STORAGE_DATABASE
103+
}
104+
batchSize := DEFAULT_BATCH_SIZE
105+
batchSizeEnvInt, err := strconv.Atoi(os.Getenv("MIGRATION_BATCH_SIZE"))
106+
if err == nil && batchSizeEnvInt > 0 {
107+
batchSize = batchSizeEnvInt
108+
}
109+
rpcBatchSize := DEFAULT_RPC_BATCH_SIZE
110+
rpcBatchSizeEnvInt, err := strconv.Atoi(os.Getenv("MIGRATION_RPC_BATCH_SIZE"))
111+
if err == nil && rpcBatchSizeEnvInt > 0 {
112+
rpcBatchSize = rpcBatchSizeEnvInt
113+
}
114+
rpcClient, err := rpc.Initialize()
115+
if err != nil {
116+
log.Fatal().Err(err).Msg("Failed to initialize RPC")
117+
}
118+
s, err := storage.NewStorageConnector(&config.Cfg.Storage)
119+
if err != nil {
120+
log.Fatal().Err(err).Msg("Failed to initialize storage")
121+
}
122+
123+
// check if chain was indexed with block receipts. If it was, then the current RPC must support block receipts
124+
validRpc, err := validateRPC(rpcClient, s)
125+
if err != nil {
126+
log.Fatal().Err(err).Msg("Failed to validate RPC")
127+
}
128+
if !validRpc {
129+
log.Fatal().Msg("RPC does not support block receipts, but transactions were indexed with receipts")
130+
}
131+
132+
validator := orchestrator.NewValidator(rpcClient, s)
133+
134+
targetStorageConfig := *config.Cfg.Storage.Main.Clickhouse
135+
targetStorageConfig.Database = targetDBName
136+
targetConn, err := storage.NewClickHouseConnector(&targetStorageConfig)
137+
if err != nil {
138+
log.Fatal().Err(err).Msg("Failed to initialize target storage")
139+
}
140+
141+
return &Migrator{
142+
migrationBatchSize: batchSize,
143+
rpcBatchSize: rpcBatchSize,
144+
rpcClient: rpcClient,
145+
storage: s,
146+
validator: validator,
147+
targetConn: targetConn,
148+
}
149+
}
150+
151+
func (m *Migrator) Close() {
152+
m.rpcClient.Close()
153+
}
154+
155+
func (m *Migrator) DetermineMigrationBoundaries() (*big.Int, *big.Int) {
156+
// get latest block from main storage
157+
latestBlockStored, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpcClient.GetChainID())
158+
if err != nil {
159+
log.Fatal().Err(err).Msg("Failed to get latest block from main storage")
160+
}
161+
log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)
162+
163+
endBlock := latestBlockStored
164+
// set range end from env instead if configured
165+
endBlockEnv := os.Getenv("END_BLOCK")
166+
if endBlockEnv != "" {
167+
configuredEndBlock, ok := new(big.Int).SetString(endBlockEnv, 10)
168+
if !ok {
169+
log.Fatal().Msgf("Failed to parse end block %s", endBlockEnv)
170+
}
171+
log.Info().Msgf("Configured end block: %s", configuredEndBlock.String())
172+
// set configured end block only if it's greater than 0 and less than latest block in main storage
173+
if configuredEndBlock.Sign() > 0 && configuredEndBlock.Cmp(latestBlockStored) < 0 {
174+
endBlock = configuredEndBlock
175+
}
176+
}
177+
178+
startBlock := big.NewInt(0) // default start block is 0
179+
// if start block is configured, use it
180+
startBlockEnv := os.Getenv("START_BLOCK")
181+
if startBlockEnv != "" {
182+
configuredStartBlock, ok := new(big.Int).SetString(startBlockEnv, 10)
183+
if !ok {
184+
log.Fatal().Msgf("Failed to parse start block %s", startBlockEnv)
185+
}
186+
log.Info().Msgf("Configured start block: %s", configuredStartBlock.String())
187+
startBlock = configuredStartBlock
188+
}
189+
190+
latestMigratedBlock, err := m.targetConn.GetMaxBlockNumberInRange(m.rpcClient.GetChainID(), startBlock, endBlock)
191+
if err != nil {
192+
log.Fatal().Err(err).Msg("Failed to get latest block from target storage")
193+
}
194+
log.Info().Msgf("Latest block in target storage: %d", latestMigratedBlock)
195+
196+
if latestMigratedBlock.Cmp(endBlock) >= 0 {
197+
log.Fatal().Msgf("Full range is already migrated")
198+
}
199+
200+
// if configured start block is less than or equal to already migrated and migrated block is not 0, start from last migrated + 1
201+
if startBlock.Cmp(latestMigratedBlock) <= 0 && latestMigratedBlock.Sign() > 0 {
202+
startBlock = new(big.Int).Add(latestMigratedBlock, big.NewInt(1))
203+
}
204+
205+
return startBlock, endBlock
206+
}
207+
208+
func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockData, error) {
209+
allBlockData := make([]common.BlockData, 0)
210+
for i := 0; i < len(blockNumbers); i += m.rpcBatchSize {
211+
end := i + m.rpcBatchSize
212+
if end > len(blockNumbers) {
213+
end = len(blockNumbers)
214+
}
215+
batch := blockNumbers[i:end]
216+
blockData := m.rpcClient.GetFullBlocks(context.Background(), batch)
217+
218+
for _, block := range blockData {
219+
if block.Error != nil {
220+
log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String())
221+
continue
222+
}
223+
allBlockData = append(allBlockData, block.Data)
224+
}
225+
}
226+
return allBlockData, nil
227+
}
228+
229+
func (m *Migrator) GetValidBlocksForRange(blockNumbers []*big.Int) []common.BlockData {
230+
blockData, err := m.storage.MainStorage.GetFullBlockData(m.rpcClient.GetChainID(), blockNumbers)
231+
if err != nil {
232+
log.Fatal().Err(err).Msg("Failed to get full block data")
233+
}
234+
235+
validBlocks, _, err := m.validator.ValidateBlocks(blockData)
236+
if err != nil {
237+
log.Fatal().Err(err).Msg("Failed to validate blocks")
238+
}
239+
return validBlocks
240+
}
241+
242+
func (m *Migrator) GetValidBlocksFromRPC(blockNumbers []*big.Int) []common.BlockData {
243+
missingBlocksData, err := m.FetchBlocksFromRPC(blockNumbers)
244+
if err != nil {
245+
log.Fatal().Err(err).Msg("Failed to query missing blocks")
246+
}
247+
248+
validBlocks, invalidBlocks, err := m.validator.ValidateBlocks(missingBlocksData)
249+
if err != nil {
250+
log.Fatal().Err(err).Msg("Failed to validate missing blocks")
251+
}
252+
if len(invalidBlocks) > 0 {
253+
log.Fatal().Msgf("Unable to validate %d newly queried missing blocks", len(invalidBlocks))
254+
}
255+
return validBlocks
256+
}
257+
258+
func validateRPC(rpcClient rpc.IRPCClient, s storage.IStorage) (bool, error) {
259+
if rpcClient.SupportsBlockReceipts() {
260+
return true, nil
261+
}
262+
263+
// If rpc does not support block receipts, we need to check if the transactions are indexed with block receipts
264+
transactionsQueryResult, err := s.MainStorage.GetTransactions(storage.QueryFilter{
265+
ChainId: rpcClient.GetChainID(),
266+
Limit: 1,
267+
})
268+
if err != nil {
269+
log.Fatal().Err(err).Msg("Failed to get transactions from main storage")
270+
}
271+
if len(transactionsQueryResult.Data) == 0 {
272+
log.Warn().Msg("No transactions found in main storage, assuming RPC is valid")
273+
return true, nil
274+
}
275+
tx := transactionsQueryResult.Data[0]
276+
if tx.GasUsed == nil {
277+
// was indexed with logs not receipts and current rpc does not support block receipts
278+
return true, nil
279+
}
280+
// was indexed with receipts and current rpc does not support block receipts
281+
return false, nil
282+
}
283+
284+
func generateBlockNumbersForRange(startBlock, endBlock *big.Int) []*big.Int {
285+
blockNumbers := make([]*big.Int, 0)
286+
for i := new(big.Int).Set(startBlock); i.Cmp(endBlock) <= 0; i.Add(i, big.NewInt(1)) {
287+
blockNumbers = append(blockNumbers, new(big.Int).Set(i))
288+
}
289+
return blockNumbers
290+
}

cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ func init() {
230230
rootCmd.AddCommand(apiCmd)
231231
rootCmd.AddCommand(validateAndFixCmd)
232232
rootCmd.AddCommand(validateCmd)
233+
rootCmd.AddCommand(migrateValidationCmd)
233234
}
234235

235236
func initConfig() {

internal/rpc/rpc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type IRPCClient interface {
4949
GetBlocksPerRequest() BlocksPerRequestConfig
5050
IsWebsocket() bool
5151
SupportsTraceBlock() bool
52+
SupportsBlockReceipts() bool
5253
HasCode(ctx context.Context, address string) (bool, error)
5354
Close()
5455
}
@@ -135,6 +136,10 @@ func (rpc *Client) SupportsTraceBlock() bool {
135136
return rpc.supportsTraceBlock
136137
}
137138

139+
func (rpc *Client) SupportsBlockReceipts() bool {
140+
return rpc.supportsBlockReceipts
141+
}
142+
138143
func (rpc *Client) Close() {
139144
rpc.RPCClient.Close()
140145
rpc.EthClient.Close()

0 commit comments

Comments
 (0)