Skip to content

Commit a2c3981

Browse files
committed
debug committer
1 parent a42cd15 commit a2c3981

File tree

1 file changed

+29
-24
lines changed

1 file changed

+29
-24
lines changed

internal/orchestrator/committer.go

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sort"
88
"time"
99

10+
"github.com/google/uuid"
1011
"github.com/rs/zerolog/log"
1112
config "github.com/thirdweb-dev/indexer/configs"
1213
"github.com/thirdweb-dev/indexer/internal/common"
@@ -56,25 +57,27 @@ func (c *Committer) Start(ctx context.Context) {
5657
return
5758
default:
5859
time.Sleep(interval)
59-
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
60+
commitID := uuid.New().String()
61+
log.Debug().Msgf("Starting commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
62+
blockDataToCommit, err := c.getSequentialBlockDataToCommit(commitID)
6063
if err != nil {
61-
log.Error().Err(err).Msg("Error getting block data to commit")
64+
log.Error().Err(err).Msgf("Error getting block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
6265
continue
6366
}
6467
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
65-
log.Debug().Msg("No block data to commit")
68+
log.Debug().Msgf("No block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
6669
continue
6770
}
68-
if err := c.commit(blockDataToCommit); err != nil {
69-
log.Error().Err(err).Msg("Error committing blocks")
71+
if err := c.commit(commitID, blockDataToCommit); err != nil {
72+
log.Error().Err(err).Msgf("Error committing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
7073
}
7174
}
7275
}
7376
}
7477

75-
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
78+
func (c *Committer) getBlockNumbersToCommit(commitID string) ([]*big.Int, error) {
7679
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
77-
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
80+
log.Info().Msgf("Committer found this max block number in main storage: %s. CommitID: %s. Timestamp %d", latestCommittedBlockNumber.String(), commitID, time.Now().UnixMilli())
7881
if err != nil {
7982
return nil, err
8083
}
@@ -96,8 +99,8 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
9699
return blockNumbers, nil
97100
}
98101

99-
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
100-
blocksToCommit, err := c.getBlockNumbersToCommit()
102+
func (c *Committer) getSequentialBlockDataToCommit(commitID string) (*[]common.BlockData, error) {
103+
blocksToCommit, err := c.getBlockNumbersToCommit(commitID)
101104
if err != nil {
102105
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
103106
}
@@ -110,8 +113,8 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
110113
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
111114
}
112115
if blocksData == nil || len(*blocksData) == 0 {
113-
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
114-
c.handleMissingStagingData(blocksToCommit)
116+
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v. CommitID: %s. Timestamp %d", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64(), commitID, time.Now().UnixMilli())
117+
c.handleMissingStagingData(commitID, blocksToCommit)
115118
return nil, nil
116119
}
117120

@@ -121,7 +124,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
121124
})
122125

123126
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
124-
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
127+
return nil, c.handleGap(commitID, blocksToCommit[0], (*blocksData)[0].Block)
125128
}
126129

127130
var sequentialBlockData []common.BlockData
@@ -135,7 +138,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
135138
}
136139
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
137140
// Note: Gap detected, stop here
138-
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
141+
log.Warn().Msgf("Gap detected at block %s, committing until %s. CommitID: %s. Timestamp %d", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String(), commitID, time.Now().UnixMilli())
139142
// increment the a gap counter in prometheus
140143
metrics.GapCounter.Inc()
141144
// record the first missed block number in prometheus
@@ -149,22 +152,24 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
149152
return &sequentialBlockData, nil
150153
}
151154

152-
func (c *Committer) commit(blockData *[]common.BlockData) error {
155+
func (c *Committer) commit(commitID string, blockData *[]common.BlockData) error {
153156
blockNumbers := make([]*big.Int, len(*blockData))
154157
for i, block := range *blockData {
155158
blockNumbers[i] = block.Block.Number
156159
}
157-
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
160+
log.Debug().Msgf("Committing %d blocks: %v. CommitID: %s. Timestamp %d", len(blockNumbers), blockNumbers, commitID, time.Now().UnixMilli())
158161

159162
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
160163
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
161-
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
164+
log.Error().Err(err).Msgf("Failed to commit blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
162165
return fmt.Errorf("error saving data to main storage: %v", err)
163166
}
167+
log.Debug().Msgf("Committer inserted blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
164168

165169
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
166170
return fmt.Errorf("error deleting data from staging storage: %v", err)
167171
}
172+
log.Debug().Msgf("Committer deleted staging data for blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
168173

169174
// Update metrics for successful commits
170175
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
@@ -173,7 +178,7 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
173178
return nil
174179
}
175180

176-
func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
181+
func (c *Committer) handleGap(commitID string, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
177182
// increment the a gap counter in prometheus
178183
metrics.GapCounter.Inc()
179184
// record the first missed block number in prometheus
@@ -182,9 +187,9 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
182187
poller := NewBoundlessPoller(c.rpc, c.storage)
183188

184189
missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()
185-
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
190+
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s. CommitID: %s. Timestamp %d", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String(), commitID, time.Now().UnixMilli())
186191
if missingBlockCount > poller.blocksPerPoll {
187-
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config", poller.blocksPerPoll)
192+
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config. CommitID: %s. Timestamp %d", poller.blocksPerPoll, commitID, time.Now().UnixMilli())
188193
missingBlockCount = poller.blocksPerPoll
189194
}
190195
missingBlockNumbers := make([]*big.Int, missingBlockCount)
@@ -193,29 +198,29 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
193198
missingBlockNumbers[i] = missingBlockNumber
194199
}
195200

196-
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
201+
log.Debug().Msgf("Polling %d blocks while handling gap: %v. CommitID: %s. Timestamp %d", len(missingBlockNumbers), missingBlockNumbers, commitID, time.Now().UnixMilli())
197202
poller.Poll(missingBlockNumbers)
198203
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
199204
}
200205

201-
func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
206+
func (c *Committer) handleMissingStagingData(commitID string, blocksToCommit []*big.Int) {
202207
// Checks if there are any blocks in staging after the current range end
203208
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
204209
if err != nil {
205210
log.Error().Err(err).Msg("Error checking staged data for missing range")
206211
return
207212
}
208213
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
209-
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
214+
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
210215
return
211216
}
212-
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())
217+
log.Debug().Msgf("Detected missing blocks in staging data starting from %s. CommitID: %s. Timestamp %d", blocksToCommit[0].String(), commitID, time.Now().UnixMilli())
213218

214219
poller := NewBoundlessPoller(c.rpc, c.storage)
215220
blocksToPoll := blocksToCommit
216221
if len(blocksToCommit) > int(poller.blocksPerPoll) {
217222
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
218223
}
219224
poller.Poll(blocksToPoll)
220-
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
225+
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s. CommitID: %s. Timestamp %d", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String(), commitID, time.Now().UnixMilli())
221226
}

0 commit comments

Comments
 (0)