From fb2f4968f01a9000562aef0f3039b01065127a8f Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Mon, 17 Feb 2025 21:30:08 +0200 Subject: [PATCH] debug committer --- internal/orchestrator/committer.go | 53 ++++++++++++++++-------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 14183ae..98cea6e 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -7,6 +7,7 @@ import ( "sort" "time" + "github.com/google/uuid" "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/common" @@ -56,25 +57,27 @@ func (c *Committer) Start(ctx context.Context) { return default: time.Sleep(interval) - blockDataToCommit, err := c.getSequentialBlockDataToCommit() + commitID := uuid.New().String() + log.Debug().Msgf("Starting commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli()) + blockDataToCommit, err := c.getSequentialBlockDataToCommit(commitID) if err != nil { - log.Error().Err(err).Msg("Error getting block data to commit") + log.Error().Err(err).Msgf("Error getting block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli()) continue } if blockDataToCommit == nil || len(*blockDataToCommit) == 0 { - log.Debug().Msg("No block data to commit") + log.Debug().Msgf("No block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli()) continue } - if err := c.commit(blockDataToCommit); err != nil { - log.Error().Err(err).Msg("Error committing blocks") + if err := c.commit(commitID, blockDataToCommit); err != nil { + log.Error().Err(err).Msgf("Error committing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli()) } } } } -func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) { +func (c *Committer) getBlockNumbersToCommit(commitID string) ([]*big.Int, error) { latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) - log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) + log.Info().Msgf("Committer found this max block number in main storage: %s. CommitID: %s. Timestamp %d", latestCommittedBlockNumber.String(), commitID, time.Now().UnixMilli()) if err != nil { return nil, err } @@ -96,8 +99,8 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) { return blockNumbers, nil } -func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) { - blocksToCommit, err := c.getBlockNumbersToCommit() +func (c *Committer) getSequentialBlockDataToCommit(commitID string) (*[]common.BlockData, error) { + blocksToCommit, err := c.getBlockNumbersToCommit(commitID) if err != nil { return nil, fmt.Errorf("error determining blocks to commit: %v", err) } @@ -110,8 +113,8 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error return nil, fmt.Errorf("error fetching blocks to commit: %v", err) } if blocksData == nil || len(*blocksData) == 0 { - log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64()) - c.handleMissingStagingData(blocksToCommit) + log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v. CommitID: %s. Timestamp %d", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64(), commitID, time.Now().UnixMilli()) + c.handleMissingStagingData(commitID, blocksToCommit) return nil, nil } @@ -121,7 +124,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error }) if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 { - return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block) + return nil, c.handleGap(commitID, blocksToCommit[0], (*blocksData)[0].Block) } var sequentialBlockData []common.BlockData @@ -135,7 +138,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error } if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 { // Note: Gap detected, stop here - log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String()) + log.Warn().Msgf("Gap detected at block %s, committing until %s. CommitID: %s. Timestamp %d", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String(), commitID, time.Now().UnixMilli()) // increment the a gap counter in prometheus metrics.GapCounter.Inc() // record the first missed block number in prometheus @@ -149,22 +152,24 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error return &sequentialBlockData, nil } -func (c *Committer) commit(blockData *[]common.BlockData) error { +func (c *Committer) commit(commitID string, blockData *[]common.BlockData) error { blockNumbers := make([]*big.Int, len(*blockData)) for i, block := range *blockData { blockNumbers[i] = block.Block.Number } - log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) + log.Debug().Msgf("Committing %d blocks: %v. CommitID: %s. Timestamp %d", len(blockNumbers), blockNumbers, commitID, time.Now().UnixMilli()) // TODO if next parts (saving or deleting) fail, we'll have to do a rollback if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { - log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) + log.Error().Err(err).Msgf("Failed to commit blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli()) return fmt.Errorf("error saving data to main storage: %v", err) } + log.Debug().Msgf("Committer inserted blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli()) if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil { return fmt.Errorf("error deleting data from staging storage: %v", err) } + log.Debug().Msgf("Committer deleted staging data for blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli()) // Update metrics for successful commits metrics.SuccessfulCommits.Add(float64(len(*blockData))) @@ -173,7 +178,7 @@ func (c *Committer) commit(blockData *[]common.BlockData) error { return nil } -func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error { +func (c *Committer) handleGap(commitID string, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error { // increment the a gap counter in prometheus metrics.GapCounter.Inc() // record the first missed block number in prometheus @@ -182,9 +187,9 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc poller := NewBoundlessPoller(c.rpc, c.storage) missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64() - log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String()) + log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s. CommitID: %s. Timestamp %d", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String(), commitID, time.Now().UnixMilli()) if missingBlockCount > poller.blocksPerPoll { - log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config", poller.blocksPerPoll) + log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config. CommitID: %s. Timestamp %d", poller.blocksPerPoll, commitID, time.Now().UnixMilli()) missingBlockCount = poller.blocksPerPoll } missingBlockNumbers := make([]*big.Int, missingBlockCount) @@ -193,12 +198,12 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc missingBlockNumbers[i] = missingBlockNumber } - log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers) + log.Debug().Msgf("Polling %d blocks while handling gap: %v. CommitID: %s. Timestamp %d", len(missingBlockNumbers), missingBlockNumbers, commitID, time.Now().UnixMilli()) poller.Poll(missingBlockNumbers) return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String()) } -func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) { +func (c *Committer) handleMissingStagingData(commitID string, blocksToCommit []*big.Int) { // Checks if there are any blocks in staging after the current range end lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0)) if err != nil { @@ -206,10 +211,10 @@ func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) { return } if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 { - log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.") + log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli()) return } - log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String()) + log.Debug().Msgf("Detected missing blocks in staging data starting from %s. CommitID: %s. Timestamp %d", blocksToCommit[0].String(), commitID, time.Now().UnixMilli()) poller := NewBoundlessPoller(c.rpc, c.storage) blocksToPoll := blocksToCommit @@ -217,5 +222,5 @@ func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) { blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)] } poller.Poll(blocksToPoll) - log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String()) + log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s. CommitID: %s. Timestamp %d", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String(), commitID, time.Now().UnixMilli()) }