Skip to content

Commit 402f4bd

Browse files
committed
debug committer
1 parent 8322feb commit 402f4bd

File tree

1 file changed

+27
-23
lines changed

1 file changed

+27
-23
lines changed

internal/orchestrator/committer.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sort"
88
"time"
99

10+
"github.com/google/uuid"
1011
"github.com/rs/zerolog/log"
1112
config "github.com/thirdweb-dev/indexer/configs"
1213
"github.com/thirdweb-dev/indexer/internal/common"
@@ -57,25 +58,26 @@ func (c *Committer) Start(ctx context.Context) {
5758
log.Info().Msg("Committer shutting down")
5859
return
5960
case <-ticker.C:
60-
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
61+
commitID := uuid.New().String()
62+
blockDataToCommit, err := c.getSequentialBlockDataToCommit(commitID)
6163
if err != nil {
62-
log.Error().Err(err).Msg("Error getting block data to commit")
64+
log.Error().Err(err).Msgf("CommitID: %s. Error getting block data to commit", commitID)
6365
continue
6466
}
6567
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
66-
log.Debug().Msg("No block data to commit")
68+
log.Debug().Msgf("CommitID: %s. No block data to commit", commitID)
6769
continue
6870
}
69-
if err := c.commit(blockDataToCommit); err != nil {
70-
log.Error().Err(err).Msg("Error committing blocks")
71+
if err := c.commit(commitID, blockDataToCommit); err != nil {
72+
log.Error().Err(err).Msgf("CommitID: %s. Error committing blocks", commitID)
7173
}
7274
}
7375
}
7476
}
7577

76-
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
78+
func (c *Committer) getBlockNumbersToCommit(commitID string) ([]*big.Int, error) {
7779
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
78-
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
80+
log.Info().Msgf("CommitID: %s. Committer found this max block number in main storage: %s", commitID, latestCommittedBlockNumber.String())
7981
if err != nil {
8082
return nil, err
8183
}
@@ -97,8 +99,8 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
9799
return blockNumbers, nil
98100
}
99101

100-
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
101-
blocksToCommit, err := c.getBlockNumbersToCommit()
102+
func (c *Committer) getSequentialBlockDataToCommit(commitID string) (*[]common.BlockData, error) {
103+
blocksToCommit, err := c.getBlockNumbersToCommit(commitID)
102104
if err != nil {
103105
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
104106
}
@@ -111,8 +113,8 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
111113
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
112114
}
113115
if blocksData == nil || len(*blocksData) == 0 {
114-
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
115-
c.handleMissingStagingData(blocksToCommit)
116+
log.Warn().Msgf("CommitID: %s. Committer didn't find the following range in staging: %v - %v", commitID, blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
117+
c.handleMissingStagingData(commitID, blocksToCommit)
116118
return nil, nil
117119
}
118120

@@ -122,7 +124,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
122124
})
123125

124126
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
125-
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
127+
return nil, c.handleGap(commitID, blocksToCommit[0], (*blocksData)[0].Block)
126128
}
127129

128130
var sequentialBlockData []common.BlockData
@@ -136,7 +138,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
136138
}
137139
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
138140
// Note: Gap detected, stop here
139-
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
141+
log.Warn().Msgf("CommitID: %s. Gap detected at block %s, committing until %s", commitID, expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
140142
// increment the a gap counter in prometheus
141143
metrics.GapCounter.Inc()
142144
// record the first missed block number in prometheus
@@ -150,22 +152,24 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
150152
return &sequentialBlockData, nil
151153
}
152154

153-
func (c *Committer) commit(blockData *[]common.BlockData) error {
155+
func (c *Committer) commit(commitID string, blockData *[]common.BlockData) error {
154156
blockNumbers := make([]*big.Int, len(*blockData))
155157
for i, block := range *blockData {
156158
blockNumbers[i] = block.Block.Number
157159
}
158-
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
160+
log.Debug().Msgf("CommitID: %s. Committing %d blocks: %v", commitID, len(blockNumbers), blockNumbers)
159161

160162
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
161163
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
162164
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
163165
return fmt.Errorf("error saving data to main storage: %v", err)
164166
}
167+
log.Debug().Msgf("CommitID: %s. Committer inserted blocks: %v", commitID, blockNumbers)
165168

166169
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
167170
return fmt.Errorf("error deleting data from staging storage: %v", err)
168171
}
172+
log.Debug().Msgf("CommitID: %s. Committer deleted staging data for blocks: %v", commitID, blockNumbers)
169173

170174
// Update metrics for successful commits
171175
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
@@ -174,7 +178,7 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
174178
return nil
175179
}
176180

177-
func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
181+
func (c *Committer) handleGap(commitID string, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
178182
// increment the a gap counter in prometheus
179183
metrics.GapCounter.Inc()
180184
// record the first missed block number in prometheus
@@ -183,9 +187,9 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
183187
poller := NewBoundlessPoller(c.rpc, c.storage)
184188

185189
missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()
186-
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
190+
log.Debug().Msgf("CommitID: %s. Detected %d missing blocks between blocks %s and %s", commitID, missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
187191
if missingBlockCount > poller.blocksPerPoll {
188-
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config", poller.blocksPerPoll)
192+
log.Debug().Msgf("CommitID: %s. Limiting polling missing blocks to %d blocks due to config", commitID, poller.blocksPerPoll)
189193
missingBlockCount = poller.blocksPerPoll
190194
}
191195
missingBlockNumbers := make([]*big.Int, missingBlockCount)
@@ -194,29 +198,29 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
194198
missingBlockNumbers[i] = missingBlockNumber
195199
}
196200

197-
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
201+
log.Debug().Msgf("CommitID: %s. Polling %d blocks while handling gap: %v", commitID, len(missingBlockNumbers), missingBlockNumbers)
198202
poller.Poll(missingBlockNumbers)
199203
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
200204
}
201205

202-
func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
206+
func (c *Committer) handleMissingStagingData(commitID string, blocksToCommit []*big.Int) {
203207
// Checks if there are any blocks in staging after the current range end
204208
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
205209
if err != nil {
206210
log.Error().Err(err).Msg("Error checking staged data for missing range")
207211
return
208212
}
209213
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
210-
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
214+
log.Debug().Msgf("CommitID: %s. Committer is caught up with staging. No need to poll for missing blocks.", commitID)
211215
return
212216
}
213-
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())
217+
log.Debug().Msgf("CommitID: %s. Detected missing blocks in staging data starting from %s.", commitID, blocksToCommit[0].String())
214218

215219
poller := NewBoundlessPoller(c.rpc, c.storage)
216220
blocksToPoll := blocksToCommit
217221
if len(blocksToCommit) > int(poller.blocksPerPoll) {
218222
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
219223
}
220224
poller.Poll(blocksToPoll)
221-
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
225+
log.Debug().Msgf("CommitID: %s. Polled %d blocks due to committer detecting them as missing. Range: %s - %s", commitID, len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
222226
}

0 commit comments

Comments
 (0)