Skip to content

Commit 82a0a33

Browse files
committed
Fix worker stall
1 parent 7325d08 commit 82a0a33

File tree

3 files changed

+76
-56
lines changed

3 files changed

+76
-56
lines changed

internal/orchestrator/committer.go

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@ import (
1919
"github.com/thirdweb-dev/indexer/internal/worker"
2020
)
2121

22-
const DEFAULT_COMMITTER_TRIGGER_INTERVAL = 2000
2322
const DEFAULT_BLOCKS_PER_COMMIT = 1000
2423

2524
type Committer struct {
26-
triggerIntervalMs int
2725
blocksPerCommit int
2826
storage storage.IStorage
2927
commitFromBlock *big.Int
@@ -39,11 +37,6 @@ type Committer struct {
3937
type CommitterOption func(*Committer)
4038

4139
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller, opts ...CommitterOption) *Committer {
42-
triggerInterval := config.Cfg.Committer.Interval
43-
if triggerInterval == 0 {
44-
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
45-
}
46-
4740
blocksPerCommit := config.Cfg.Committer.BlocksPerCommit
4841
if blocksPerCommit == 0 {
4942
blocksPerCommit = DEFAULT_BLOCKS_PER_COMMIT
@@ -56,7 +49,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller,
5649

5750
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
5851
committer := &Committer{
59-
triggerIntervalMs: triggerInterval,
6052
blocksPerCommit: blocksPerCommit,
6153
storage: storage,
6254
commitFromBlock: commitFromBlock,
@@ -78,8 +70,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller,
7870
}
7971

8072
func (c *Committer) Start(ctx context.Context) {
81-
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
82-
8373
log.Debug().Msgf("Committer running")
8474
chainID := c.rpc.GetChainID()
8575

@@ -175,40 +165,35 @@ func (c *Committer) Start(ctx context.Context) {
175165

176166
if config.Cfg.Publisher.Mode == "parallel" {
177167
var wg sync.WaitGroup
178-
publishInterval := interval / 2
179-
if publishInterval <= 0 {
180-
publishInterval = interval
181-
}
182168
wg.Add(2)
169+
183170
go func() {
184171
defer wg.Done()
185-
c.runPublishLoop(ctx, publishInterval)
172+
c.runPublishLoop(ctx)
186173
}()
187174

188-
// allow the publisher to start before the committer
189-
time.Sleep(publishInterval)
190175
go func() {
191176
defer wg.Done()
192-
c.runCommitLoop(ctx, interval)
177+
c.runCommitLoop(ctx)
193178
}()
194179

195180
<-ctx.Done()
181+
196182
wg.Wait()
197183
} else {
198-
c.runCommitLoop(ctx, interval)
184+
c.runCommitLoop(ctx)
199185
}
200186

201187
log.Info().Msg("Committer shutting down")
202188
c.publisher.Close()
203189
}
204190

205-
func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
191+
func (c *Committer) runCommitLoop(ctx context.Context) {
206192
for {
207193
select {
208194
case <-ctx.Done():
209195
return
210196
default:
211-
time.Sleep(interval)
212197
if c.commitToBlock.Sign() > 0 && c.lastCommittedBlock.Load() >= c.commitToBlock.Uint64() {
213198
// Completing the commit loop if we've committed more than commit to block
214199
log.Info().Msgf("Committer reached configured toBlock %s, the last commit block is %d, stopping commits", c.commitToBlock.String(), c.lastCommittedBlock.Load())
@@ -231,13 +216,17 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
231216
}
232217
}
233218

234-
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
219+
func (c *Committer) runPublishLoop(ctx context.Context) {
235220
for {
236221
select {
237222
case <-ctx.Done():
238223
return
239224
default:
240-
time.Sleep(interval)
225+
if c.commitToBlock.Sign() > 0 && c.lastPublishedBlock.Load() >= c.commitToBlock.Uint64() {
226+
// Completing the publish loop if we've published more than commit to block
227+
log.Info().Msgf("Committer reached configured toBlock %s, the last publish block is %d, stopping publishes", c.commitToBlock.String(), c.lastPublishedBlock.Load())
228+
return
229+
}
241230
if err := c.publish(ctx); err != nil {
242231
log.Error().Err(err).Msg("Error publishing blocks")
243232
}
@@ -397,8 +386,8 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl
397386
func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
398387
blocksData := c.poller.Request(ctx, blockNumbers)
399388
if len(blocksData) == 0 {
400-
// TODO: should wait a little bit, as it may take time to load
401-
log.Warn().Msgf("Committer didn't find the following range: %v - %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64())
389+
log.Warn().Msgf("Committer didn't find the following range: %v - %v. %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64(), c.poller.GetPollerStatus())
390+
time.Sleep(500 * time.Millisecond) // TODO: wait for block time
402391
return nil, nil
403392
}
404393
return blocksData, nil

internal/orchestrator/poller.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ func (p *Poller) Request(ctx context.Context, blockNumbers []*big.Int) []common.
203203
if len(blockData) > 0 {
204204
go p.triggerLookahead(endBlock, int64(len(blockNumbers)))
205205
}
206-
207206
return blockData
208207
}
209208

@@ -309,7 +308,7 @@ func (p *Poller) processBatch(blockNumbers []*big.Int) {
309308
if err != ErrBlocksProcessing && err != ErrNoNewBlocks {
310309
if len(blockNumbers) > 0 {
311310
startBlock, endBlock := blockNumbers[0], blockNumbers[len(blockNumbers)-1]
312-
log.Debug().Err(err).Msgf("Failed to poll blocks %s - %s", startBlock.String(), endBlock.String())
311+
log.Error().Err(err).Msgf("Failed to poll blocks %s - %s", startBlock.String(), endBlock.String())
313312
}
314313
}
315314
return
@@ -435,7 +434,7 @@ func (p *Poller) unmarkRangeAsProcessing(rangeKey string) {
435434
delete(p.processingRanges, rangeKey)
436435
}
437436

438-
// waitForRange waits for a range to finish processing
437+
// waitForRange waits for a range to finish processing with a timeout
439438
func (p *Poller) waitForRange(rangeKey string) bool {
440439
p.processingRangesMutex.Lock()
441440

@@ -451,7 +450,7 @@ func (p *Poller) waitForRange(rangeKey string) bool {
451450
p.processingRanges[rangeKey] = append(p.processingRanges[rangeKey], waitChan)
452451
p.processingRangesMutex.Unlock()
453452

454-
// Wait for the range to complete or context cancellation
453+
// Wait for the range to complete, timeout, or context cancellation
455454
select {
456455
case <-waitChan:
457456
log.Debug().Msgf("Got notification for range %s processing completed", rangeKey)
@@ -460,3 +459,43 @@ func (p *Poller) waitForRange(rangeKey string) bool {
460459
return false // Context cancelled
461460
}
462461
}
462+
463+
// GetProcessingRanges returns a list of ranges currently being processed (for diagnostics)
464+
func (p *Poller) GetProcessingRanges() []string {
465+
p.processingRangesMutex.RLock()
466+
defer p.processingRangesMutex.RUnlock()
467+
468+
ranges := make([]string, 0, len(p.processingRanges))
469+
for rangeKey, waiters := range p.processingRanges {
470+
ranges = append(ranges, fmt.Sprintf("%s (waiters: %d)", rangeKey, len(waiters)))
471+
}
472+
return ranges
473+
}
474+
475+
// GetQueuedRanges returns a list of ranges currently queued for processing (for diagnostics)
476+
func (p *Poller) GetQueuedRanges() []string {
477+
p.queuedRangesMutex.RLock()
478+
defer p.queuedRangesMutex.RUnlock()
479+
480+
ranges := make([]string, 0, len(p.queuedRanges))
481+
for rangeKey := range p.queuedRanges {
482+
ranges = append(ranges, rangeKey)
483+
}
484+
return ranges
485+
}
486+
487+
// GetPollerStatus returns diagnostic information about the poller's current state
488+
func (p *Poller) GetPollerStatus() map[string]interface{} {
489+
p.lastPolledBlockMutex.RLock()
490+
lastPolled := p.lastPolledBlock.String()
491+
p.lastPolledBlockMutex.RUnlock()
492+
493+
return map[string]interface{}{
494+
"last_polled_block": lastPolled,
495+
"processing_ranges": p.GetProcessingRanges(),
496+
"queued_ranges": p.GetQueuedRanges(),
497+
"task_queue_size": len(p.tasks),
498+
"task_queue_cap": cap(p.tasks),
499+
"parallel_pollers": p.parallelPollers,
500+
}
501+
}

internal/worker/worker.go

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ func (w *Worker) processBatchWithRetry(ctx context.Context, blocks []*big.Int, s
295295
Int("chunk_size", chunkSize).
296296
Str("first_block", blocks[0].String()).
297297
Str("last_block", blocks[len(blocks)-1].String()).
298-
Msg("Processing blocks")
298+
Msgf("Processing blocks for range %s - %s", blocks[0].String(), blocks[len(blocks)-1].String())
299299

300300
var allResults []rpc.GetFullBlockResult
301301
var allFailures []rpc.GetFullBlockResult
@@ -410,36 +410,28 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull
410410
success = len(results) > 0 && len(errors) == 0
411411
}
412412

413-
if !success {
414-
for _, errResult := range errors {
415-
log.Error().Err(errResult.Error).Msgf("Error fetching block %s", errResult.BlockNumber.String())
416-
}
413+
if len(errors) > 0 {
414+
first, last := blockNumbers[0], blockNumbers[len(blockNumbers)-1]
415+
firstError, lastError := errors[0], errors[len(errors)-1]
416+
log.Error().Msgf("Error fetching block for range: %s - %s. Error: %s - %s (%d)", first.String(), last.String(), firstError.BlockNumber.String(), lastError.BlockNumber.String(), len(errors))
417+
return nil
417418
}
418419

419-
// Update metrics and log summary
420-
if len(results) > 0 {
421-
lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64()
422-
metrics.LastFetchedBlock.Set(lastBlockNumberFloat)
420+
if !success || len(results) == 0 {
421+
first, last := blockNumbers[0], blockNumbers[len(blockNumbers)-1]
422+
log.Error().Msgf("No blocks fetched for range: %s - %s", first.String(), last.String())
423+
return nil
424+
}
423425

424-
// Count successes and failures
425-
successful := 0
426-
failed := 0
427-
for _, r := range results {
428-
if r.Error == nil {
429-
successful++
430-
} else {
431-
failed++
432-
}
433-
}
426+
// Update metrics and log summary
427+
lastBlockNumberFloat, _ := results[len(results)-1].BlockNumber.Float64()
428+
metrics.LastFetchedBlock.Set(lastBlockNumberFloat)
434429

435-
log.Debug().
436-
Str("first_block", results[0].BlockNumber.String()).
437-
Str("last_block", results[len(results)-1].BlockNumber.String()).
438-
Int("successful", successful).
439-
Int("failed", failed).
440-
Str("source", sourceType.String()).
441-
Msg("Block fetching complete")
442-
}
430+
log.Debug().
431+
Str("source", sourceType.String()).
432+
Str("first_block", results[0].BlockNumber.String()).
433+
Str("last_block", results[len(results)-1].BlockNumber.String()).
434+
Msgf("Block fetching complete for range %s - %s", results[0].BlockNumber.String(), results[len(results)-1].BlockNumber.String())
443435

444436
return results
445437
}

0 commit comments

Comments
 (0)