Skip to content

Commit 5428af7

Browse files
committed
debug committer
1 parent 8322feb commit 5428af7

File tree

1 file changed

+29
-24
lines changed

1 file changed

+29
-24
lines changed

internal/orchestrator/committer.go

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sort"
88
"time"
99

10+
"github.com/google/uuid"
1011
"github.com/rs/zerolog/log"
1112
config "github.com/thirdweb-dev/indexer/configs"
1213
"github.com/thirdweb-dev/indexer/internal/common"
@@ -57,25 +58,27 @@ func (c *Committer) Start(ctx context.Context) {
5758
log.Info().Msg("Committer shutting down")
5859
return
5960
case <-ticker.C:
60-
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
61+
commitID := uuid.New().String()
62+
log.Debug().Msgf("Starting commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
63+
blockDataToCommit, err := c.getSequentialBlockDataToCommit(commitID)
6164
if err != nil {
62-
log.Error().Err(err).Msg("Error getting block data to commit")
65+
log.Error().Err(err).Msgf("Error getting block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
6366
continue
6467
}
6568
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
66-
log.Debug().Msg("No block data to commit")
69+
log.Debug().Msgf("No block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
6770
continue
6871
}
69-
if err := c.commit(blockDataToCommit); err != nil {
70-
log.Error().Err(err).Msg("Error committing blocks")
72+
if err := c.commit(commitID, blockDataToCommit); err != nil {
73+
log.Error().Err(err).Msgf("Error committing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
7174
}
7275
}
7376
}
7477
}
7578

76-
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
79+
func (c *Committer) getBlockNumbersToCommit(commitID string) ([]*big.Int, error) {
7780
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
78-
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
81+
log.Info().Msgf("Committer found this max block number in main storage: %s. CommitID: %s. Timestamp %d", latestCommittedBlockNumber.String(), commitID, time.Now().UnixMilli())
7982
if err != nil {
8083
return nil, err
8184
}
@@ -97,8 +100,8 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
97100
return blockNumbers, nil
98101
}
99102

100-
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
101-
blocksToCommit, err := c.getBlockNumbersToCommit()
103+
func (c *Committer) getSequentialBlockDataToCommit(commitID string) (*[]common.BlockData, error) {
104+
blocksToCommit, err := c.getBlockNumbersToCommit(commitID)
102105
if err != nil {
103106
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
104107
}
@@ -111,8 +114,8 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
111114
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
112115
}
113116
if blocksData == nil || len(*blocksData) == 0 {
114-
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
115-
c.handleMissingStagingData(blocksToCommit)
117+
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v. CommitID: %s. Timestamp %d", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64(), commitID, time.Now().UnixMilli())
118+
c.handleMissingStagingData(commitID, blocksToCommit)
116119
return nil, nil
117120
}
118121

@@ -122,7 +125,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
122125
})
123126

124127
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
125-
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
128+
return nil, c.handleGap(commitID, blocksToCommit[0], (*blocksData)[0].Block)
126129
}
127130

128131
var sequentialBlockData []common.BlockData
@@ -136,7 +139,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
136139
}
137140
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
138141
// Note: Gap detected, stop here
139-
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
142+
log.Warn().Msgf("Gap detected at block %s, committing until %s. CommitID: %s. Timestamp %d", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String(), commitID, time.Now().UnixMilli())
140143
// increment the a gap counter in prometheus
141144
metrics.GapCounter.Inc()
142145
// record the first missed block number in prometheus
@@ -150,22 +153,24 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
150153
return &sequentialBlockData, nil
151154
}
152155

153-
func (c *Committer) commit(blockData *[]common.BlockData) error {
156+
func (c *Committer) commit(commitID string, blockData *[]common.BlockData) error {
154157
blockNumbers := make([]*big.Int, len(*blockData))
155158
for i, block := range *blockData {
156159
blockNumbers[i] = block.Block.Number
157160
}
158-
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
161+
log.Debug().Msgf("Committing %d blocks: %v. CommitID: %s. Timestamp %d", len(blockNumbers), blockNumbers, commitID, time.Now().UnixMilli())
159162

160163
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
161164
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
162-
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
165+
log.Error().Err(err).Msgf("Failed to commit blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
163166
return fmt.Errorf("error saving data to main storage: %v", err)
164167
}
168+
log.Debug().Msgf("Committer inserted blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
165169

166170
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
167171
return fmt.Errorf("error deleting data from staging storage: %v", err)
168172
}
173+
log.Debug().Msgf("Committer deleted staging data for blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
169174

170175
// Update metrics for successful commits
171176
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
@@ -174,7 +179,7 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
174179
return nil
175180
}
176181

177-
func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
182+
func (c *Committer) handleGap(commitID string, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
178183
// increment the a gap counter in prometheus
179184
metrics.GapCounter.Inc()
180185
// record the first missed block number in prometheus
@@ -183,9 +188,9 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
183188
poller := NewBoundlessPoller(c.rpc, c.storage)
184189

185190
missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()
186-
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
191+
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s. CommitID: %s. Timestamp %d", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String(), commitID, time.Now().UnixMilli())
187192
if missingBlockCount > poller.blocksPerPoll {
188-
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config", poller.blocksPerPoll)
193+
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config. CommitID: %s. Timestamp %d", poller.blocksPerPoll, commitID, time.Now().UnixMilli())
189194
missingBlockCount = poller.blocksPerPoll
190195
}
191196
missingBlockNumbers := make([]*big.Int, missingBlockCount)
@@ -194,29 +199,29 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
194199
missingBlockNumbers[i] = missingBlockNumber
195200
}
196201

197-
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
202+
log.Debug().Msgf("Polling %d blocks while handling gap: %v. CommitID: %s. Timestamp %d", len(missingBlockNumbers), missingBlockNumbers, commitID, time.Now().UnixMilli())
198203
poller.Poll(missingBlockNumbers)
199204
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
200205
}
201206

202-
func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
207+
func (c *Committer) handleMissingStagingData(commitID string, blocksToCommit []*big.Int) {
203208
// Checks if there are any blocks in staging after the current range end
204209
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
205210
if err != nil {
206211
log.Error().Err(err).Msg("Error checking staged data for missing range")
207212
return
208213
}
209214
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
210-
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
215+
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
211216
return
212217
}
213-
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())
218+
log.Debug().Msgf("Detected missing blocks in staging data starting from %s. CommitID: %s. Timestamp %d", blocksToCommit[0].String(), commitID, time.Now().UnixMilli())
214219

215220
poller := NewBoundlessPoller(c.rpc, c.storage)
216221
blocksToPoll := blocksToCommit
217222
if len(blocksToCommit) > int(poller.blocksPerPoll) {
218223
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
219224
}
220225
poller.Poll(blocksToPoll)
221-
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
226+
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s. CommitID: %s. Timestamp %d", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String(), commitID, time.Now().UnixMilli())
222227
}

0 commit comments

Comments
 (0)