Skip to content

Commit 643b8bd

Browse files
committed
refactor: simplify publish loop
1 parent 9942de0 commit 643b8bd

File tree

2 files changed

+46
-58
lines changed

2 files changed

+46
-58
lines changed

internal/orchestrator/committer.go

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -174,31 +174,21 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
174174
}
175175

176176
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
177-
chainID := c.rpc.GetChainID()
178-
for {
179-
select {
180-
case <-ctx.Done():
181-
return
182-
default:
183-
time.Sleep(interval)
184-
if c.workMode == "" {
185-
log.Debug().Msg("Committer work mode not set, skipping publish")
186-
continue
187-
}
188-
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
189-
if err != nil {
190-
log.Error().Err(err).Msg("failed to get last published block number")
191-
continue
192-
}
193-
lastCommitted := c.lastCommittedBlock.Load()
194-
if lastPublished != nil && lastPublished.Uint64() >= lastCommitted {
195-
continue
196-
}
197-
if err := c.publish(ctx); err != nil {
198-
log.Error().Err(err).Msg("Error publishing blocks")
199-
}
200-
}
201-
}
177+
for {
178+
select {
179+
case <-ctx.Done():
180+
return
181+
default:
182+
time.Sleep(interval)
183+
if c.workMode == "" {
184+
log.Debug().Msg("Committer work mode not set, skipping publish")
185+
continue
186+
}
187+
if err := c.publish(ctx); err != nil {
188+
log.Error().Err(err).Msg("Error publishing blocks")
189+
}
190+
}
191+
}
202192
}
203193

204194
func (c *Committer) cleanupProcessedStagingBlocks() {

internal/orchestrator/committer_test.go

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -478,27 +478,25 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) {
478478
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
479479
}
480480

481-
publishDone := make(chan struct{})
482-
deleteDone := make(chan struct{})
483-
484-
mockRPC.EXPECT().GetChainID().Return(chainID)
485-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
486-
mockRPC.EXPECT().GetChainID().Return(chainID)
487-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
488-
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
489-
mockRPC.EXPECT().GetChainID().Return(chainID)
490-
491-
ctx, cancel := context.WithCancel(context.Background())
492-
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
493-
close(publishDone)
494-
cancel()
495-
return nil
496-
})
497-
mockRPC.EXPECT().GetChainID().Return(chainID)
498-
mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
499-
close(deleteDone)
500-
return nil
501-
})
481+
publishDone := make(chan struct{})
482+
deleteDone := make(chan struct{})
483+
484+
mockRPC.EXPECT().GetChainID().Return(chainID)
485+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
486+
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
487+
mockRPC.EXPECT().GetChainID().Return(chainID)
488+
489+
ctx, cancel := context.WithCancel(context.Background())
490+
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
491+
close(publishDone)
492+
cancel()
493+
return nil
494+
})
495+
mockRPC.EXPECT().GetChainID().Return(chainID)
496+
mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
497+
close(deleteDone)
498+
return nil
499+
})
502500

503501
go committer.runPublishLoop(ctx, time.Millisecond)
504502

@@ -514,7 +512,7 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) {
514512
}
515513
}
516514

517-
func TestRunPublishLoopSkipsWhenAhead(t *testing.T) {
515+
func TestRunPublishLoopDoesNothingWhenNoNewBlocks(t *testing.T) {
518516
defer func() { config.Cfg = config.Config{} }()
519517
config.Cfg.Publisher.Mode = "parallel"
520518
config.Cfg.Publisher.Enabled = false
@@ -532,20 +530,20 @@ func TestRunPublishLoopSkipsWhenAhead(t *testing.T) {
532530
committer.workMode = WorkModeLive
533531
committer.setLastCommittedBlock(big.NewInt(102))
534532

535-
chainID := big.NewInt(1)
533+
chainID := big.NewInt(1)
536534

537-
mockRPC.EXPECT().GetChainID().Return(chainID)
538-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil)
535+
mockRPC.EXPECT().GetChainID().Return(chainID)
536+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil)
537+
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return([]common.BlockData{}, nil)
539538

540-
ctx, cancel := context.WithCancel(context.Background())
541-
go committer.runPublishLoop(ctx, time.Millisecond)
542-
time.Sleep(2 * time.Millisecond)
543-
cancel()
544-
time.Sleep(10 * time.Millisecond)
539+
ctx, cancel := context.WithCancel(context.Background())
540+
go committer.runPublishLoop(ctx, time.Millisecond)
541+
time.Sleep(2 * time.Millisecond)
542+
cancel()
543+
time.Sleep(10 * time.Millisecond)
545544

546-
mockStagingStorage.AssertNotCalled(t, "GetStagingData", mock.Anything)
547-
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
548-
mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything)
545+
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
546+
mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything)
549547
}
550548

551549
func TestHandleGap(t *testing.T) {

0 commit comments

Comments
 (0)