Skip to content

Commit 15bdf2d

Browse files
authored
reorg (#294)
* handle reorg
1 parent faf59c3 commit 15bdf2d

File tree

9 files changed

+359
-24
lines changed

9 files changed

+359
-24
lines changed

cmd/committer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,8 @@ func RunCommitter(cmd *cobra.Command, args []string) {
3030
}()
3131

3232
committer.Init()
33+
committer.InitReorg()
34+
35+
go committer.RunReorgValidator()
3336
committer.CommitStreaming()
3437
}

configs/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ type Config struct {
302302
ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"`
303303
InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"`
304304
InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"`
305+
RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"`
306+
RedisUsername string `env:"REDIS_USERNAME"`
307+
RedisPassword string `env:"REDIS_PASSWORD"`
308+
RedisDB int `env:"REDIS_DB" envDefault:"0"`
305309
}
306310

307311
var Cfg Config

internal/committer/poollatest.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
"github.com/rs/zerolog/log"
88
config "github.com/thirdweb-dev/indexer/configs"
9-
"github.com/thirdweb-dev/indexer/internal/common"
109
"github.com/thirdweb-dev/indexer/internal/libs"
1110
"github.com/thirdweb-dev/indexer/internal/libs/libblockdata"
1211
"github.com/thirdweb-dev/indexer/internal/metrics"
@@ -59,13 +58,7 @@ func pollLatest() error {
5958
Uint64("end_block", expectedBlockNumber-1).
6059
Msg("All blocks validated successfully. Publishing blocks to Kafka")
6160

62-
// Convert slice of BlockData to slice of *BlockData for Kafka publisher
63-
blockDataPointers := make([]*common.BlockData, len(blockDataArray))
64-
for i, block := range blockDataArray {
65-
blockDataPointers[i] = &block
66-
}
67-
68-
if err := libs.KafkaPublisherV2.PublishBlockData(blockDataPointers); err != nil {
61+
if err := libs.KafkaPublisherV2.PublishBlockData(blockDataArray); err != nil {
6962
log.Panic().
7063
Err(err).
7164
Int("blocks_count", len(blockDataArray)).

internal/committer/reorg.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package committer
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/rs/zerolog/log"
8+
config "github.com/thirdweb-dev/indexer/configs"
9+
"github.com/thirdweb-dev/indexer/internal/libs"
10+
"github.com/thirdweb-dev/indexer/internal/libs/libblockdata"
11+
"github.com/thirdweb-dev/indexer/internal/metrics"
12+
)
13+
14+
func InitReorg() {
15+
libs.InitRedis()
16+
}
17+
18+
func RunReorgValidator() {
19+
lastBlockCheck := int64(0)
20+
for {
21+
startBlock, endBlock, err := getReorgRange()
22+
if err != nil {
23+
log.Error().Err(err).Msg("Failed to get reorg range")
24+
time.Sleep(2 * time.Second)
25+
continue
26+
}
27+
28+
if endBlock == lastBlockCheck || endBlock-startBlock < 5 {
29+
log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.")
30+
time.Sleep(1 * time.Minute)
31+
continue
32+
}
33+
34+
// Detect reorgs and handle them
35+
err = detectAndHandleReorgs(startBlock, endBlock)
36+
if err != nil {
37+
log.Error().Err(err).Msg("Failed to detect and handle reorgs")
38+
time.Sleep(2 * time.Second)
39+
continue
40+
}
41+
lastBlockCheck = endBlock
42+
}
43+
}
44+
45+
func getReorgRange() (int64, int64, error) {
46+
lastValidBlock, err := getLastValidBlock()
47+
if err != nil {
48+
return 0, 0, fmt.Errorf("failed to get last valid block: %w", err)
49+
}
50+
51+
startBlock := max(lastValidBlock-1, 1)
52+
endBlock, err := libs.GetMaxBlockNumberFromClickHouseV2(libs.ChainId)
53+
if err != nil {
54+
return 0, 0, fmt.Errorf("failed to get max block number: %w", err)
55+
}
56+
57+
endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety
58+
59+
if startBlock >= endBlock {
60+
return 0, 0, fmt.Errorf("start block is greater than end block")
61+
}
62+
63+
return startBlock, endBlock, nil
64+
}
65+
66+
func getLastValidBlock() (int64, error) {
67+
// Try to get last reorg checked block number
68+
lastReorgBlock, err := libs.GetReorgLastValidBlock(libs.ChainIdStr)
69+
if err != nil {
70+
return 0, fmt.Errorf("failed to get last reorg checked block: %w", err)
71+
}
72+
73+
if lastReorgBlock > 0 {
74+
return lastReorgBlock, nil
75+
}
76+
77+
// get block number 1 day ago
78+
lastValidBlock, err := libs.GetBlockNumberFromClickHouseV2DaysAgo(libs.ChainId, 1)
79+
if err != nil {
80+
return 0, fmt.Errorf("failed to get block number 1 day ago: %w", err)
81+
}
82+
83+
return lastValidBlock, nil
84+
}
85+
86+
func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
87+
log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock)
88+
89+
// Fetch block headers for the range
90+
blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
91+
if err != nil {
92+
return fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err)
93+
}
94+
95+
if len(blockHeaders) == 0 {
96+
log.Debug().Msg("detectAndHandleReorgs: No block headers found in range")
97+
return nil
98+
}
99+
100+
// finding the reorg start and end block
101+
reorgStartBlock := int64(-1)
102+
reorgEndBlock := int64(-1)
103+
for i := 1; i < len(blockHeaders); i++ {
104+
if blockHeaders[i].Number.Int64() != blockHeaders[i-1].Number.Int64()+1 {
105+
// non-sequential block numbers
106+
reorgStartBlock = blockHeaders[i-1].Number.Int64()
107+
reorgEndBlock = blockHeaders[i].Number.Int64()
108+
break
109+
}
110+
if blockHeaders[i].ParentHash != blockHeaders[i-1].Hash {
111+
// hash mismatch start
112+
if reorgStartBlock == -1 {
113+
reorgStartBlock = blockHeaders[i-1].Number.Int64()
114+
}
115+
continue
116+
} else {
117+
// hash matches end
118+
if reorgStartBlock != -1 {
119+
reorgEndBlock = blockHeaders[i].Number.Int64()
120+
break
121+
}
122+
}
123+
}
124+
125+
// set end to the last block if not set
126+
if reorgEndBlock == -1 {
127+
reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64()
128+
}
129+
130+
if reorgStartBlock > -1 {
131+
if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil {
132+
return err
133+
}
134+
}
135+
136+
// update last valid block. if there was no reorg, this will update to the last block
137+
libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock)
138+
139+
return nil
140+
}
141+
142+
func handleReorgForRange(startBlock uint64, endBlock uint64) error {
143+
// nothing to do
144+
if startBlock == 0 {
145+
return nil
146+
}
147+
148+
// will panic if any block is invalid
149+
newblockDataArray := libblockdata.GetValidBlockDataInBatch(endBlock, startBlock)
150+
expectedBlockNumber := startBlock
151+
for i, blockData := range newblockDataArray {
152+
if blockData.Block.Number.Uint64() != expectedBlockNumber {
153+
log.Error().
154+
Int("index", i).
155+
Uint64("expected_block", expectedBlockNumber).
156+
Uint64("actual_block", blockData.Block.Number.Uint64()).
157+
Msg("Reorg: Block sequence mismatch - missing or out of order block")
158+
159+
return fmt.Errorf("reorg: block sequence mismatch - missing or out of order block")
160+
}
161+
expectedBlockNumber++
162+
}
163+
164+
oldblockDataArray, err := libs.GetBlockDataFromClickHouseV2(libs.ChainId.Uint64(), startBlock, endBlock)
165+
if err != nil {
166+
return fmt.Errorf("handleReorgForRange: failed to get old block data: %w", err)
167+
}
168+
169+
if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newblockDataArray, oldblockDataArray); err != nil {
170+
log.Error().
171+
Err(err).
172+
Int("blocks_count", len(newblockDataArray)).
173+
Msg("Reorg: Failed to publish blocks to Kafka")
174+
return fmt.Errorf("reorg: failed to publish blocks to kafka")
175+
}
176+
177+
for _, blockData := range newblockDataArray {
178+
metrics.CommitterLastPublishedReorgBlockNumber.WithLabelValues(config.Cfg.ZeetProjectName, libs.ChainIdStr).Set(float64(blockData.Block.Number.Uint64()))
179+
}
180+
181+
return nil
182+
}

internal/libs/clickhouse.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,32 @@ func initClickhouse(host string, port int, username string, password string, dat
8989
return clickhouseConn
9090
}
9191

92+
func GetBlockNumberFromClickHouseV2DaysAgo(chainId *big.Int, daysAgo int) (int64, error) {
93+
query := fmt.Sprintf(`SELECT toString(max(block_number))
94+
FROM default.blocks WHERE chain_id = %d AND block_timestamp <= now() - INTERVAL %d DAY ;`, chainId.Uint64(), daysAgo)
95+
rows, err := ClickhouseConnV2.Query(context.Background(), query)
96+
if err != nil {
97+
return -1, err
98+
}
99+
defer rows.Close()
100+
101+
if !rows.Next() {
102+
return -1, nil
103+
}
104+
105+
var blockNumberStr string
106+
if err := rows.Scan(&blockNumberStr); err != nil {
107+
return -1, err
108+
}
109+
110+
blockNumber, err := strconv.ParseInt(blockNumberStr, 10, 64)
111+
if err != nil {
112+
return -1, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
113+
}
114+
115+
return blockNumber, nil
116+
}
117+
92118
func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) {
93119
// Use toString() to convert UInt256 to string, then parse to int64
94120
query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d HAVING count() > 0", chainId.Uint64())
@@ -115,6 +141,55 @@ func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) {
115141
return maxBlockNumber, nil
116142
}
117143

144+
func GetBlockReorgDataFromClickHouseV2(chainId *big.Int, startBlockNumber int64, endBlockNumber int64) ([]*common.Block, error) {
145+
query := fmt.Sprintf(`SELECT block_number, hash, parent_hash
146+
FROM default.blocks WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number`, chainId.Uint64(), startBlockNumber, endBlockNumber)
147+
rows, err := ClickhouseConnV2.Query(context.Background(), query)
148+
if err != nil {
149+
return nil, err
150+
}
151+
defer rows.Close()
152+
153+
blocks := make([]*common.Block, 0)
154+
for rows.Next() {
155+
var block common.Block
156+
err := rows.Scan(&block.Number, &block.Hash, &block.ParentHash)
157+
if err != nil {
158+
return nil, err
159+
}
160+
blocks = append(blocks, &block)
161+
}
162+
return blocks, nil
163+
}
164+
165+
func GetBlockHeadersForReorgCheck(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.Block, error) {
166+
sb := startBlockNumber
167+
length := endBlockNumber - startBlockNumber + 1
168+
blocksRaw := make([]*common.Block, length)
169+
170+
query := fmt.Sprintf("SELECT block_number, hash, parent_hash FROM %s.blocks FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number",
171+
config.Cfg.CommitterClickhouseDatabase,
172+
chainId,
173+
startBlockNumber,
174+
endBlockNumber,
175+
)
176+
blocks, err := execQueryV2[common.Block](query)
177+
if err != nil {
178+
return blocksRaw, err
179+
}
180+
181+
// just to make sure the blocks are in the correct order
182+
for _, block := range blocks {
183+
idx := block.Number.Uint64() - sb
184+
if idx >= length {
185+
log.Error().Msgf("Block number %s is out of range", block.Number.String())
186+
continue
187+
}
188+
blocksRaw[idx] = &block
189+
}
190+
return blocksRaw, nil
191+
}
192+
118193
func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) {
119194
length := endBlockNumber - startBlockNumber + 1
120195

internal/libs/libblockdata/getblockdata.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ import (
1515
"github.com/thirdweb-dev/indexer/internal/rpc"
1616
)
1717

18-
func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) []common.BlockData {
18+
func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) []*common.BlockData {
1919
rpcNumParallelCalls := config.Cfg.RPCNumParallelCalls
2020
rpcBatchSize := config.Cfg.RPCBatchSize
2121
maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls
2222

2323
// Calculate the range of blocks to fetch
24-
blocksToFetch := latestBlock - nextCommitBlockNumber
24+
blocksToFetch := latestBlock - nextCommitBlockNumber + 1
2525
if blocksToFetch > maxBlocksPerFetch {
2626
blocksToFetch = maxBlocksPerFetch
2727
}
@@ -35,7 +35,7 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64)
3535
Msg("Starting to fetch latest blocks")
3636

3737
// Precreate array of block data
38-
blockDataArray := make([]common.BlockData, blocksToFetch)
38+
blockDataArray := make([]*common.BlockData, blocksToFetch)
3939

4040
// Create batches and calculate number of parallel calls needed
4141
numBatches := min((blocksToFetch+rpcBatchSize-1)/rpcBatchSize, rpcNumParallelCalls)
@@ -74,8 +74,8 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64)
7474
for i, bd := range batchResults {
7575
arrayIndex := batchIdx*rpcBatchSize + uint64(i)
7676
if arrayIndex < uint64(len(blockDataArray)) {
77-
blockDataArray[arrayIndex] = *bd // todo: update to use pointer, kafka is using normal block data
78-
batchResults[i] = nil // free memory
77+
blockDataArray[arrayIndex] = bd
78+
batchResults[i] = nil // free memory
7979
}
8080
}
8181
mu.Unlock()

0 commit comments

Comments
 (0)