Skip to content

Commit b54cf9a

Browse files
committed
Fixes
1 parent b7a399f commit b54cf9a

File tree

5 files changed

+26
-14
lines changed

5 files changed

+26
-14
lines changed

internal/orchestrator/poller.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,9 @@ func (p *Poller) Request(ctx context.Context, blockNumbers []*big.Int) []common.
230230
return nil
231231
}
232232

233-
p.lastPolledBlockMutex.Lock()
233+
p.lastRequestedBlockMutex.Lock()
234234
p.lastRequestedBlock = new(big.Int).Set(highestBlockNumber)
235-
p.lastPolledBlockMutex.Unlock()
235+
p.lastRequestedBlockMutex.Unlock()
236236
return blockData
237237
}
238238

@@ -292,14 +292,14 @@ func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
292292
}
293293

294294
func (p *Poller) getNextBlockRange(ctx context.Context) ([]*big.Int, error) {
295-
p.blockRangeMutex.Lock()
296-
defer p.blockRangeMutex.Unlock()
297-
298295
latestBlock, err := p.rpc.GetLatestBlockNumber(ctx)
299296
if err != nil {
300297
return nil, err
301298
}
302299

300+
p.blockRangeMutex.Lock()
301+
defer p.blockRangeMutex.Unlock()
302+
303303
p.lastPendingFetchBlockMutex.Lock()
304304
lastPendingFetchBlock := new(big.Int).Set(p.lastPendingFetchBlock)
305305
p.lastPendingFetchBlockMutex.Unlock()
@@ -323,16 +323,16 @@ func (p *Poller) getNextBlockRange(ctx context.Context) ([]*big.Int, error) {
323323
return nil, nil
324324
}
325325

326+
p.lastPendingFetchBlockMutex.Lock()
327+
p.lastPendingFetchBlock = new(big.Int).Set(endBlock)
328+
p.lastPendingFetchBlockMutex.Unlock()
329+
326330
log.Debug().
327331
Str("last_pending_block", lastPendingFetchBlock.String()).
328332
Str("last_polled_block", lastPolledBlock.String()).
329333
Str("last_requested_block", lastRequestedBlock.String()).
330334
Msgf("GetNextBlockRange for poller workers")
331335

332-
p.lastPendingFetchBlockMutex.Lock()
333-
p.lastPendingFetchBlock = new(big.Int).Set(endBlock)
334-
p.lastPendingFetchBlockMutex.Unlock()
335-
336336
return p.createBlockNumbersForRange(startBlock, endBlock), nil
337337
}
338338

internal/orchestrator/poller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ import (
1010
func TestPollerPlaceholder(t *testing.T) {
1111
// Placeholder test to keep the test file valid
1212
t.Skip("Poller tests need to be rewritten for new implementation")
13-
}
13+
}

internal/orchestrator/reorg_handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ import (
1010
func TestReorgHandlerPlaceholder(t *testing.T) {
1111
// Placeholder test to keep the test file valid
1212
t.Skip("Reorg handler tests need to be rewritten for new implementation")
13-
}
13+
}

internal/storage/badger.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,11 @@ func (bc *BadgerConnector) Close() error {
206206
bc.gcTicker.Stop()
207207
close(bc.stopGC)
208208
}
209-
close(bc.stopRangeUpdate)
209+
select {
210+
case <-bc.stopRangeUpdate:
211+
default:
212+
close(bc.stopRangeUpdate)
213+
}
210214
return bc.db.Close()
211215
}
212216

internal/worker/worker.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ const (
2626
SourceTypeStaging SourceType = "staging"
2727
)
2828

29+
const (
30+
DEFAULT_RPC_CHUNK_SIZE = 25
31+
)
32+
2933
// String returns the string representation of the source type
3034
func (s SourceType) String() string {
3135
return string(s)
@@ -41,9 +45,13 @@ type Worker struct {
4145
}
4246

4347
func NewWorker(rpc rpc.IRPCClient) *Worker {
48+
chunk := rpc.GetBlocksPerRequest().Blocks
49+
if chunk <= 0 {
50+
chunk = DEFAULT_RPC_CHUNK_SIZE
51+
}
4452
return &Worker{
4553
rpc: rpc,
46-
rpcChunkSize: rpc.GetBlocksPerRequest().Blocks,
54+
rpcChunkSize: chunk,
4755
rpcSemaphore: make(chan struct{}, 20),
4856
}
4957
}
@@ -395,7 +403,7 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull
395403
}
396404

397405
if !success {
398-
sourceType := SourceTypeRPC
406+
sourceType = SourceTypeRPC
399407
results, errors = w.processBatchWithRetry(ctx, blockNumbers, sourceType, w.fetchFromRPC)
400408
success = len(results) > 0 && len(errors) == 0
401409
}

0 commit comments

Comments
 (0)