Skip to content

Commit c93f1a1

Browse files
committed
debug committer
1 parent b7704b1 commit c93f1a1

File tree

1 file changed

+30
-25
lines changed

1 file changed

+30
-25
lines changed

internal/orchestrator/committer.go

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sort"
88
"time"
99

10+
"github.com/google/uuid"
1011
"github.com/rs/zerolog/log"
1112
config "github.com/thirdweb-dev/indexer/configs"
1213
"github.com/thirdweb-dev/indexer/internal/common"
@@ -56,25 +57,27 @@ func (c *Committer) Start(ctx context.Context) {
5657
return
5758
default:
5859
time.Sleep(interval)
59-
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
60+
commitID := uuid.New().String()
61+
log.Debug().Msgf("Starting commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
62+
blockDataToCommit, err := c.getSequentialBlockDataToCommit(commitID)
6063
if err != nil {
61-
log.Error().Err(err).Msg("Error getting block data to commit")
64+
log.Error().Err(err).Msgf("Error getting block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
6265
continue
6366
}
6467
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
65-
log.Debug().Msg("No block data to commit")
68+
log.Debug().Msgf("No block data to commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
6669
continue
6770
}
68-
if err := c.commit(blockDataToCommit); err != nil {
69-
log.Error().Err(err).Msg("Error committing blocks")
71+
if err := c.commit(commitID, blockDataToCommit); err != nil {
72+
log.Error().Err(err).Msgf("Error committing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
7073
}
7174
}
7275
}
7376
}
7477

75-
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
78+
func (c *Committer) getBlockNumbersToCommit(commitID string) ([]*big.Int, error) {
7679
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
77-
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
80+
log.Info().Msgf("Committer found this max block number in main storage: %s. CommitID: %s. Timestamp %d", latestCommittedBlockNumber.String(), commitID, time.Now().UnixMilli())
7881
if err != nil {
7982
return nil, err
8083
}
@@ -96,8 +99,8 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
9699
return blockNumbers, nil
97100
}
98101

99-
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
100-
blocksToCommit, err := c.getBlockNumbersToCommit()
102+
func (c *Committer) getSequentialBlockDataToCommit(commitID string) (*[]common.BlockData, error) {
103+
blocksToCommit, err := c.getBlockNumbersToCommit(commitID)
101104
if err != nil {
102105
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
103106
}
@@ -110,8 +113,8 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
110113
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
111114
}
112115
if blocksData == nil || len(*blocksData) == 0 {
113-
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
114-
c.handleMissingStagingData(blocksToCommit)
116+
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v. CommitID: %s. Timestamp %d", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64(), commitID, time.Now().UnixMilli())
117+
c.handleMissingStagingData(commitID, blocksToCommit)
115118
return nil, nil
116119
}
117120

@@ -121,7 +124,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
121124
})
122125

123126
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
124-
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
127+
return nil, c.handleGap(commitID, blocksToCommit[0], (*blocksData)[0].Block)
125128
}
126129

127130
var sequentialBlockData []common.BlockData
@@ -135,7 +138,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
135138
}
136139
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
137140
// Note: Gap detected, stop here
138-
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
141+
log.Warn().Msgf("Gap detected at block %s, committing until %s. CommitID: %s. Timestamp %d", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String(), commitID, time.Now().UnixMilli())
139142
// increment the a gap counter in prometheus
140143
metrics.GapCounter.Inc()
141144
// record the first missed block number in prometheus
@@ -149,13 +152,13 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
149152
return &sequentialBlockData, nil
150153
}
151154

152-
func (c *Committer) commit(blockData *[]common.BlockData) error {
155+
func (c *Committer) commit(commitID string, blockData *[]common.BlockData) error {
153156
locked, err := c.storage.OrchestratorStorage.IsCommittingLocked(c.rpc.GetChainID())
154157
if err != nil {
155158
return fmt.Errorf("error checking if committing is locked: %v", err)
156159
}
157160
if locked {
158-
log.Debug().Msg("Committing is locked, skipping commit")
161+
log.Debug().Msgf("Committing is locked, skipping commit. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
159162
return nil
160163
} else {
161164
c.storage.OrchestratorStorage.SetCommittingLocked(c.rpc.GetChainID(), true)
@@ -169,17 +172,19 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
169172
for i, block := range *blockData {
170173
blockNumbers[i] = block.Block.Number
171174
}
172-
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
175+
log.Debug().Msgf("Committing %d blocks: %v. CommitID: %s. Timestamp %d", len(blockNumbers), blockNumbers, commitID, time.Now().UnixMilli())
173176

174177
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
175178
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
176-
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
179+
log.Error().Err(err).Msgf("Failed to commit blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
177180
return fmt.Errorf("error saving data to main storage: %v", err)
178181
}
182+
log.Debug().Msgf("Committer inserted blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
179183

180184
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
181185
return fmt.Errorf("error deleting data from staging storage: %v", err)
182186
}
187+
log.Debug().Msgf("Committer deleted staging data for blocks: %v. CommitID: %s. Timestamp %d", blockNumbers, commitID, time.Now().UnixMilli())
183188

184189
// Update metrics for successful commits
185190
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
@@ -188,7 +193,7 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
188193
return nil
189194
}
190195

191-
func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
196+
func (c *Committer) handleGap(commitID string, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
192197
// increment the a gap counter in prometheus
193198
metrics.GapCounter.Inc()
194199
// record the first missed block number in prometheus
@@ -197,9 +202,9 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
197202
poller := NewBoundlessPoller(c.rpc, c.storage)
198203

199204
missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()
200-
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
205+
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s. CommitID: %s. Timestamp %d", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String(), commitID, time.Now().UnixMilli())
201206
if missingBlockCount > poller.blocksPerPoll {
202-
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config", poller.blocksPerPoll)
207+
log.Debug().Msgf("Limiting polling missing blocks to %d blocks due to config. CommitID: %s. Timestamp %d", poller.blocksPerPoll, commitID, time.Now().UnixMilli())
203208
missingBlockCount = poller.blocksPerPoll
204209
}
205210
missingBlockNumbers := make([]*big.Int, missingBlockCount)
@@ -208,29 +213,29 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
208213
missingBlockNumbers[i] = missingBlockNumber
209214
}
210215

211-
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
216+
log.Debug().Msgf("Polling %d blocks while handling gap: %v. CommitID: %s. Timestamp %d", len(missingBlockNumbers), missingBlockNumbers, commitID, time.Now().UnixMilli())
212217
poller.Poll(missingBlockNumbers)
213218
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
214219
}
215220

216-
func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
221+
func (c *Committer) handleMissingStagingData(commitID string, blocksToCommit []*big.Int) {
217222
// Checks if there are any blocks in staging after the current range end
218223
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
219224
if err != nil {
220225
log.Error().Err(err).Msg("Error checking staged data for missing range")
221226
return
222227
}
223228
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
224-
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
229+
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks. CommitID: %s. Timestamp %d", commitID, time.Now().UnixMilli())
225230
return
226231
}
227-
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())
232+
log.Debug().Msgf("Detected missing blocks in staging data starting from %s. CommitID: %s. Timestamp %d", blocksToCommit[0].String(), commitID, time.Now().UnixMilli())
228233

229234
poller := NewBoundlessPoller(c.rpc, c.storage)
230235
blocksToPoll := blocksToCommit
231236
if len(blocksToCommit) > int(poller.blocksPerPoll) {
232237
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
233238
}
234239
poller.Poll(blocksToPoll)
235-
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
240+
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s. CommitID: %s. Timestamp %d", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String(), commitID, time.Now().UnixMilli())
236241
}

0 commit comments

Comments
 (0)