Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,61 @@ committer:
blocksPerCommit: 1000
```

#### Reorg Handler
Whether to enable the reorg handler. Default is `true`.

cmd: `--reorgHandler-enabled`
env: `REORGHANDLER_ENABLED`
yaml:
```yaml
reorgHandler:
enabled: true
```

#### Reorg Handler Interval
Reorg handler trigger interval in milliseconds. Default is `1000`.

cmd: `--reorgHandler-interval`
env: `REORGHANDLER_INTERVAL`
yaml:
```yaml
reorgHandler:
interval: 3000
```

#### Reorg Handler Blocks Per Scan
How many blocks to scan for reorgs. Default is `100`.

cmd: `--reorgHandler-blocks-per-scan`
env: `REORGHANDLER_BLOCKSPERSCAN`
yaml:
```yaml
reorgHandler:
blocksPerScan: 1000
```

#### Reorg Handler From Block
From which block to start scanning for reorgs. Default is `0`.

cmd: `--reorgHandler-from-block`
env: `REORGHANDLER_FROMBLOCK`
yaml:
```yaml
reorgHandler:
fromBlock: 20000000
```

#### Reorg Handler Force From Block
Whether to force the reorg handler to start from the block specified in `reorgHandler-from-block`. Default is `false`.

cmd: `--reorgHandler-force-from-block`
env: `REORGHANDLER_FORCEFROMBLOCK`
yaml:
```yaml
reorgHandler:
forceFromBlock: true
```

#### Failure Recoverer
Whether to enable the failure recoverer. Default is `true`.

Expand Down
10 changes: 10 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func init() {
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler")
rootCmd.PersistentFlags().Int("reorgHandler-interval", 1000, "How often to run reorg handler in milliseconds")
rootCmd.PersistentFlags().Int("reorgHandler-blocks-per-scan", 100, "How many blocks to scan for reorgs")
rootCmd.PersistentFlags().Int("reorgHandler-from-block", 0, "From which block to start scanning for reorgs")
rootCmd.PersistentFlags().Bool("reorgHandler-force-from-block", false, "Force the reorg handler to start from the block specified in `reorgHandler-from-block`")
rootCmd.PersistentFlags().Bool("failure-recoverer-enabled", true, "Toggle failure recoverer")
rootCmd.PersistentFlags().Int("failure-recoverer-blocks-per-run", 10, "How many blocks to run failure recoverer for")
rootCmd.PersistentFlags().Int("failure-recoverer-interval", 1000, "How often to run failure recoverer in milliseconds")
Expand Down Expand Up @@ -98,6 +103,11 @@ func init() {
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled"))
viper.BindPFlag("reorgHandler.interval", rootCmd.PersistentFlags().Lookup("reorgHandler-interval"))
viper.BindPFlag("reorgHandler.blocksPerScan", rootCmd.PersistentFlags().Lookup("reorgHandler-blocks-per-scan"))
viper.BindPFlag("reorgHandler.fromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-from-block"))
viper.BindPFlag("reorgHandler.forceFromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-force-from-block"))
viper.BindPFlag("failureRecoverer.enabled", rootCmd.PersistentFlags().Lookup("failure-recoverer-enabled"))
viper.BindPFlag("failureRecoverer.blocksPerRun", rootCmd.PersistentFlags().Lookup("failure-recoverer-blocks-per-run"))
viper.BindPFlag("failureRecoverer.interval", rootCmd.PersistentFlags().Lookup("failure-recoverer-interval"))
Expand Down
9 changes: 9 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type CommitterConfig struct {
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
}

type ReorgHandlerConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerScan int `mapstructure:"blocksPerScan"`
FromBlock int `mapstructure:"fromBlock"`
ForceFromBlock bool `mapstructure:"forceFromBlock"`
}

type FailureRecovererConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
Expand Down Expand Up @@ -101,6 +109,7 @@ type Config struct {
Poller PollerConfig `mapstructure:"poller"`
Committer CommitterConfig `mapstructure:"committer"`
FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"`
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
Storage StorageConfig `mapstructure:"storage"`
API APIConfig `mapstructure:"api"`
}
Expand Down
13 changes: 13 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ var (
Help: "The first block number in the failure recoverer batch",
})
)

// Reorg Handler Metrics
var (
ReorgHandlerLastCheckedBlock = promauto.NewGauge(prometheus.GaugeOpts{
Name: "reorg_handler_last_checked_block",
Help: "The last block number that the reorg handler checked",
})

ReorgCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "reorg_handler_reorg_counter",
Help: "The number of reorgs detected",
})
)
11 changes: 11 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Orchestrator struct {
pollerEnabled bool
failureRecovererEnabled bool
committerEnabled bool
reorgHandlerEnabled bool
}

func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
Expand All @@ -28,6 +29,7 @@ func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
pollerEnabled: config.Cfg.Poller.Enabled,
failureRecovererEnabled: config.Cfg.FailureRecoverer.Enabled,
committerEnabled: config.Cfg.Committer.Enabled,
reorgHandlerEnabled: config.Cfg.ReorgHandler.Enabled,
}, nil
}

Expand Down Expand Up @@ -61,6 +63,15 @@ func (o *Orchestrator) Start() {
}()
}

if o.reorgHandlerEnabled {
wg.Add(1)
go func() {
defer wg.Done()
reorgHandler := NewReorgHandler(o.rpc, o.storage)
reorgHandler.Start()
}()
}

// The chain tracker is always running
wg.Add(1)
go func() {
Expand Down
200 changes: 200 additions & 0 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package orchestrator

import (
"fmt"
"math/big"
"time"

"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)

type ReorgHandler struct {
rpc rpc.Client
storage storage.IStorage
triggerInterval int
blocksPerScan int
lastCheckedBlock *big.Int
worker *worker.Worker
}

const DEFAULT_REORG_HANDLER_INTERVAL = 1000
const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100

func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
triggerInterval := config.Cfg.ReorgHandler.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL
}
blocksPerScan := config.Cfg.ReorgHandler.BlocksPerScan
if blocksPerScan == 0 {
blocksPerScan = DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN
}
return &ReorgHandler{
rpc: rpc,
storage: storage,
worker: worker.NewWorker(rpc),
triggerInterval: triggerInterval,
blocksPerScan: blocksPerScan,
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID),
}
}

func getInitialCheckedBlockNumber(storage storage.IStorage, chainId *big.Int) *big.Int {
bn := big.NewInt(int64(config.Cfg.ReorgHandler.FromBlock))
if !config.Cfg.ReorgHandler.ForceFromBlock {
storedFromBlock, err := storage.OrchestratorStorage.GetLastReorgCheckedBlockNumber(chainId)
if err != nil {
log.Debug().Err(err).Msgf("Error getting last reorg checked block number, using configured: %s", bn)
return bn
}
if storedFromBlock.Sign() <= 0 {
log.Debug().Msgf("Last reorg checked block number not found, using configured: %s", bn)
return bn
}
log.Debug().Msgf("Last reorg checked block number found, using: %s", storedFromBlock)
return storedFromBlock
}
log.Debug().Msgf("Force from block reorg check flag set, using configured: %s", bn)
return bn
}

func (rh *ReorgHandler) Start() {
interval := time.Duration(rh.triggerInterval) * time.Millisecond
ticker := time.NewTicker(interval)

log.Debug().Msgf("Reorg handler running")
go func() {
for range ticker.C {
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error getting recent block headers")
continue
}
if len(blockHeaders) == 0 {
log.Warn().Msg("No block headers found")
continue
}
mostRecentBlockHeader := blockHeaders[0]
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
continue
}
metrics.ReorgCounter.Inc()
forkPoint, err := rh.findForkPoint(blockHeaders[reorgEndIndex:])
if err != nil {
log.Error().Err(err).Msg("Error while finding fork point")
continue
}
err = rh.handleReorg(forkPoint, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error while handling reorg")
continue
}
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
}
}()

// Keep the program running (otherwise it will exit)
select {}
}

func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
for i := 0; i < len(reversedBlockHeaders)-1; i++ {
currentBlock := reversedBlockHeaders[i]
previousBlock := reversedBlockHeaders[i+1]

if currentBlock.ParentHash != previousBlock.Hash {
log.Debug().
Str("currentBlockNumber", currentBlock.Number.String()).
Str("currentBlockHash", currentBlock.Hash).
Str("currentBlockParentHash", currentBlock.ParentHash).
Str("previousBlockNumber", previousBlock.Number.String()).
Str("previousBlockHash", previousBlock.Hash).
Msg("Reorg detected: parent hash mismatch")
return i
}
}
return -1
}

func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
if err != nil {
return nil, err
}

for i := 0; i < len(reversedBlockHeaders)-1; i++ {
blockHeader := reversedBlockHeaders[i]
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
if !ok {
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
}
if block.Hash == blockHeader.Hash {
previousBlock := reversedBlockHeaders[i+1]
return previousBlock.Number, nil
}
}
lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
if err != nil {
return nil, fmt.Errorf("error getting next headers batch: %w", err)
}
return rh.findForkPoint(nextHeadersBatch)
}

func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
for _, header := range reversedBlockHeaders {
blockNumbers = append(blockNumbers, header.Number)
}
blockResults := rh.rpc.GetBlocks(blockNumbers)
fetchedBlocksByNumber := make(map[string]common.Block)
for _, blockResult := range blockResults {
if blockResult.Error != nil {
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
}
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
}
return &fetchedBlocksByNumber, nil
}

func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
blockRange = append(blockRange, new(big.Int).Set(i))
}

results := rh.worker.Run(blockRange)
data := make([]common.BlockData, 0, len(results))
for _, result := range results {
if result.Error != nil {
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
}
data = append(data, common.BlockData{
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
}
// TODO make delete and insert atomic
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
}
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
return fmt.Errorf("error saving data to main storage: %w", err)
}
return nil
}
Loading