Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"time"

"github.com/google/uuid"
"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
Expand Down Expand Up @@ -56,25 +57,27 @@ func (c *Committer) Start(ctx context.Context) {
return
default:
time.Sleep(interval)
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
commitID := uuid.New().String()
log.Debug().Msgf("Starting commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
blockDataToCommit, err := c.getSequentialBlockDataToCommit(commitID)
if err != nil {
log.Error().Err(err).Msg("Error getting block data to commit")
log.Error().Err(err).Msgf("Error getting block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
continue
}
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
log.Debug().Msg("No block data to commit")
log.Debug().Msgf("No block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
continue
}
if err := c.commit(blockDataToCommit); err != nil {
log.Error().Err(err).Msg("Error committing blocks")
if err := c.commit(commitID, blockDataToCommit); err != nil {
log.Error().Err(err).Msgf("Error committing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
}
}
}
}

func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
func (c *Committer) getBlockNumbersToCommit(commitID string) ([]*big.Int, error) {
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
log.Info().Msgf("Committer found this max block number in main storage: %s. CommitID: %s. Timestamp %d", latestCommittedBlockNumber.String(), commitID, time.Now().UnixMilli())
if err != nil {
return nil, err
}
Expand All @@ -96,8 +99,8 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
return blockNumbers, nil
}

func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit()
func (c *Committer) getSequentialBlockDataToCommit(commitID string) (*[]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit(commitID)
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
}
Expand All @@ -110,8 +113,8 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
}
if blocksData == nil || len(*blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
c.handleMissingStagingData(blocksToCommit)
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v. CommitID: %s. Timestamp %d", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64(), commitID, time.Now().UnixMilli())
c.handleMissingStagingData(commitID, blocksToCommit)
return nil, nil
}

Expand All @@ -121,7 +124,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
})

if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
return nil, c.handleGap(commitID, blocksToCommit[0], (*blocksData)[0].Block)
}

var sequentialBlockData []common.BlockData
Expand All @@ -135,7 +138,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
}
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
// Note: Gap detected, stop here
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
log.Warn().Msgf("Gap detected at block %s, committing until %s. CommitID: %s. Timestamp %d", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String(), commitID, time.Now().UnixMilli())
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
Expand All @@ -149,22 +152,24 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
return &sequentialBlockData, nil
}

func (c *Committer) commit(blockData *[]common.BlockData) error {
func (c *Committer) commit(commitID string, blockData *[]common.BlockData) error {
blockNumbers := make([]*big.Int, len(*blockData))
for i, block := range *blockData {
blockNumbers[i] = block.Block.Number
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
log.Debug().Msgf("Committing %d blocks: %v. CommitID: %s. Timestamp %d", len(blockNumbers), blockNumbers, commitID, time.Now().UnixMilli())

// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
log.Error().Err(err).Msgf("Failed to commit blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
return fmt.Errorf("error saving data to main storage: %v", err)
}
log.Debug().Msgf("Committer inserted blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())

if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
return fmt.Errorf("error deleting data from staging storage: %v", err)
}
log.Debug().Msgf("Committer deleted staging data for blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())

// Update metrics for successful commits
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
Expand All @@ -173,7 +178,7 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
return nil
}

func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
func (c *Committer) handleGap(commitID string, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
Expand All @@ -182,9 +187,9 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
poller := NewBoundlessPoller(c.rpc, c.storage)

missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s. CommitID: %s. Timestamp %d", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String(), commitID, time.Now().UnixMilli())
if missingBlockCount > poller.blocksPerPoll {
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config", poller.blocksPerPoll)
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config. CommitID: %s. Timestamp %d", poller.blocksPerPoll, commitID, time.Now().UnixMilli())
missingBlockCount = poller.blocksPerPoll
}
missingBlockNumbers := make([]*big.Int, missingBlockCount)
Expand All @@ -193,29 +198,29 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
missingBlockNumbers[i] = missingBlockNumber
}

log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
log.Debug().Msgf("Polling %d blocks while handling gap: %v. CommitID: %s. Timestamp %d", len(missingBlockNumbers), missingBlockNumbers, commitID, time.Now().UnixMilli())
poller.Poll(missingBlockNumbers)
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
}

func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
func (c *Committer) handleMissingStagingData(commitID string, blocksToCommit []*big.Int) {
// Checks if there are any blocks in staging after the current range end
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
if err != nil {
log.Error().Err(err).Msg("Error checking staged data for missing range")
return
}
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
return
}
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())
log.Debug().Msgf("Detected missing blocks in staging data starting from %s. CommitID: %s. Timestamp %d", blocksToCommit[0].String(), commitID, time.Now().UnixMilli())

poller := NewBoundlessPoller(c.rpc, c.storage)
blocksToPoll := blocksToCommit
if len(blocksToCommit) > int(poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
}
poller.Poll(blocksToPoll)
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s. CommitID: %s. Timestamp %d", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String(), commitID, time.Now().UnixMilli())
}
Loading