Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/orchestrator/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/thirdweb-dev/indexer/internal/rpc"
)

const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 60 * 1000 // 1 minutes

type ChainTracker struct {
rpc rpc.IRPCClient
Expand Down
12 changes: 9 additions & 3 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,12 @@ func (c *Committer) cleanupProcessedStagingBlocks() {
log.Error().Err(err).Msg("Failed to delete staging data")
return
}
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingDataOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds())

log.Debug().
Uint64("committed_block_number", committed).
Uint64("published_block_number", published).
Str("older_than_block_number", blockNumber.String()).
Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingDataOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds())
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
}

Expand Down Expand Up @@ -469,7 +474,7 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
blocksToPublish, err := c.getBlockNumbersToPublish(ctx)
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
return nil, fmt.Errorf("error determining blocks to publish: %v", err)
}
if len(blocksToPublish) == 0 {
return nil, nil
Expand Down Expand Up @@ -508,7 +513,8 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
highestBlock = block.Block
}
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
log.Debug().Msgf("Committing %d blocks from %s to %s", len(blockNumbers), blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String())

mainStorageStart := time.Now()
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
Expand Down
11 changes: 0 additions & 11 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func (o *Orchestrator) Start() {
o.cancel()
}()

// Create the work mode monitor first
workModeMonitor := NewWorkModeMonitor(o.rpc, o.storage)

o.initializeWorkerAndPoller()

o.wg.Add(1)
Expand Down Expand Up @@ -91,14 +88,6 @@ func (o *Orchestrator) Start() {
}()
}

o.wg.Add(1)
go func() {
defer o.wg.Done()
workModeMonitor.Start(ctx)

log.Info().Msg("Work mode monitor completed")
}()

// The chain tracker is always running
o.wg.Add(1)
go func() {
Expand Down
Loading