Skip to content

Commit b6fdc97

Browse files
authored
Fix Poller Behavior (#279)
* new Poller to fix parallel block range fetching * Minor updates * nil safety for committer
1 parent 101165b commit b6fdc97

File tree

6 files changed

+215
-226
lines changed

6 files changed

+215
-226
lines changed

internal/orchestrator/chain_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/thirdweb-dev/indexer/internal/rpc"
1010
)
1111

12-
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes
12+
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 60 * 1000 // 1 minutes
1313

1414
type ChainTracker struct {
1515
rpc rpc.IRPCClient

internal/orchestrator/committer.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,12 @@ func (c *Committer) cleanupProcessedStagingBlocks() {
274274
log.Error().Err(err).Msg("Failed to delete staging data")
275275
return
276276
}
277-
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingDataOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds())
277+
278+
log.Debug().
279+
Uint64("committed_block_number", committed).
280+
Uint64("published_block_number", published).
281+
Str("older_than_block_number", blockNumber.String()).
282+
Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingDataOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds())
278283
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
279284
}
280285

@@ -286,10 +291,13 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
286291
}()
287292

288293
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
289-
log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
290294
if err != nil {
291295
return nil, err
292296
}
297+
if latestCommittedBlockNumber == nil {
298+
latestCommittedBlockNumber = new(big.Int).SetUint64(0)
299+
}
300+
log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
293301

294302
if latestCommittedBlockNumber.Sign() == 0 {
295303
// If no blocks have been committed yet, start from the fromBlock specified in the config
@@ -469,7 +477,7 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
469477
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
470478
blocksToPublish, err := c.getBlockNumbersToPublish(ctx)
471479
if err != nil {
472-
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
480+
return nil, fmt.Errorf("error determining blocks to publish: %v", err)
473481
}
474482
if len(blocksToPublish) == 0 {
475483
return nil, nil
@@ -508,7 +516,8 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
508516
highestBlock = block.Block
509517
}
510518
}
511-
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
519+
log.Debug().Msgf("Committing %d blocks from %s to %s", len(blockNumbers), blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String())
520+
512521
mainStorageStart := time.Now()
513522
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
514523
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)

internal/orchestrator/orchestrator.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,6 @@ func (o *Orchestrator) Start() {
5151
o.cancel()
5252
}()
5353

54-
// Create the work mode monitor first
55-
workModeMonitor := NewWorkModeMonitor(o.rpc, o.storage)
56-
5754
o.initializeWorkerAndPoller()
5855

5956
o.wg.Add(1)
@@ -91,14 +88,6 @@ func (o *Orchestrator) Start() {
9188
}()
9289
}
9390

94-
o.wg.Add(1)
95-
go func() {
96-
defer o.wg.Done()
97-
workModeMonitor.Start(ctx)
98-
99-
log.Info().Msg("Work mode monitor completed")
100-
}()
101-
10291
// The chain tracker is always running
10392
o.wg.Add(1)
10493
go func() {

0 commit comments

Comments
 (0)