Skip to content

Commit bb0a3fe

Browse files
committed
chore: inline last committed block accessors
1 parent 643b8bd commit bb0a3fe

File tree

2 files changed

+52
-56
lines changed

2 files changed

+52
-56
lines changed

internal/orchestrator/committer.go

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
8080
return committer
8181
}
8282

83-
func (c *Committer) getLastCommittedBlock() *big.Int {
84-
return new(big.Int).SetUint64(c.lastCommittedBlock.Load())
85-
}
86-
87-
func (c *Committer) setLastCommittedBlock(b *big.Int) {
88-
c.lastCommittedBlock.Store(b.Uint64())
89-
}
90-
9183
func (c *Committer) Start(ctx context.Context) {
9284
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
9385

@@ -96,13 +88,17 @@ func (c *Committer) Start(ctx context.Context) {
9688

9789
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
9890
if err != nil {
91+
// It's okay to fail silently here; this value is only used for staging cleanup and
92+
// the worker loop will eventually correct the state and delete as needed.
9993
log.Error().Msgf("Error getting latest committed block number: %v", err)
10094
} else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
10195
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
10296
}
10397

10498
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
10599
if err != nil {
100+
// It's okay to fail silently here; it's only used for staging cleanup and will be
101+
// corrected by the worker loop.
106102
log.Error().Err(err).Msg("failed to get last published block number")
107103
} else if lastPublished != nil && lastPublished.Sign() > 0 {
108104
c.lastPublishedBlock.Store(lastPublished.Uint64())
@@ -174,21 +170,21 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
174170
}
175171

176172
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
177-
for {
178-
select {
179-
case <-ctx.Done():
180-
return
181-
default:
182-
time.Sleep(interval)
183-
if c.workMode == "" {
184-
log.Debug().Msg("Committer work mode not set, skipping publish")
185-
continue
186-
}
187-
if err := c.publish(ctx); err != nil {
188-
log.Error().Err(err).Msg("Error publishing blocks")
189-
}
190-
}
191-
}
173+
for {
174+
select {
175+
case <-ctx.Done():
176+
return
177+
default:
178+
time.Sleep(interval)
179+
if c.workMode == "" {
180+
log.Debug().Msg("Committer work mode not set, skipping publish")
181+
continue
182+
}
183+
if err := c.publish(ctx); err != nil {
184+
log.Error().Err(err).Msg("Error publishing blocks")
185+
}
186+
}
187+
}
192188
}
193189

194190
func (c *Committer) cleanupProcessedStagingBlocks() {
@@ -232,7 +228,7 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
232228
// If no blocks have been committed yet, start from the fromBlock specified in the config
233229
latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
234230
} else {
235-
lastCommitted := c.getLastCommittedBlock()
231+
lastCommitted := new(big.Int).SetUint64(c.lastCommittedBlock.Load())
236232
if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 {
237233
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String())
238234
return []*big.Int{}, nil

internal/orchestrator/committer_test.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -470,33 +470,33 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) {
470470
}
471471
committer := NewCommitter(mockRPC, mockStorage)
472472
committer.workMode = WorkModeLive
473-
committer.setLastCommittedBlock(big.NewInt(102))
473+
committer.lastCommittedBlock.Store(102)
474474

475475
chainID := big.NewInt(1)
476476
blockData := []common.BlockData{
477477
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
478478
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
479479
}
480480

481-
publishDone := make(chan struct{})
482-
deleteDone := make(chan struct{})
483-
484-
mockRPC.EXPECT().GetChainID().Return(chainID)
485-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
486-
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
487-
mockRPC.EXPECT().GetChainID().Return(chainID)
488-
489-
ctx, cancel := context.WithCancel(context.Background())
490-
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
491-
close(publishDone)
492-
cancel()
493-
return nil
494-
})
495-
mockRPC.EXPECT().GetChainID().Return(chainID)
496-
mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
497-
close(deleteDone)
498-
return nil
499-
})
481+
publishDone := make(chan struct{})
482+
deleteDone := make(chan struct{})
483+
484+
mockRPC.EXPECT().GetChainID().Return(chainID)
485+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
486+
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
487+
mockRPC.EXPECT().GetChainID().Return(chainID)
488+
489+
ctx, cancel := context.WithCancel(context.Background())
490+
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
491+
close(publishDone)
492+
cancel()
493+
return nil
494+
})
495+
mockRPC.EXPECT().GetChainID().Return(chainID)
496+
mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
497+
close(deleteDone)
498+
return nil
499+
})
500500

501501
go committer.runPublishLoop(ctx, time.Millisecond)
502502

@@ -528,22 +528,22 @@ func TestRunPublishLoopDoesNothingWhenNoNewBlocks(t *testing.T) {
528528
}
529529
committer := NewCommitter(mockRPC, mockStorage)
530530
committer.workMode = WorkModeLive
531-
committer.setLastCommittedBlock(big.NewInt(102))
531+
committer.lastCommittedBlock.Store(102)
532532

533-
chainID := big.NewInt(1)
533+
chainID := big.NewInt(1)
534534

535-
mockRPC.EXPECT().GetChainID().Return(chainID)
536-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil)
537-
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return([]common.BlockData{}, nil)
535+
mockRPC.EXPECT().GetChainID().Return(chainID)
536+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil)
537+
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return([]common.BlockData{}, nil)
538538

539-
ctx, cancel := context.WithCancel(context.Background())
540-
go committer.runPublishLoop(ctx, time.Millisecond)
541-
time.Sleep(2 * time.Millisecond)
542-
cancel()
543-
time.Sleep(10 * time.Millisecond)
539+
ctx, cancel := context.WithCancel(context.Background())
540+
go committer.runPublishLoop(ctx, time.Millisecond)
541+
time.Sleep(2 * time.Millisecond)
542+
cancel()
543+
time.Sleep(10 * time.Millisecond)
544544

545-
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
546-
mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything)
545+
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
546+
mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything)
547547
}
548548

549549
func TestHandleGap(t *testing.T) {

0 commit comments

Comments
 (0)