Skip to content

Commit 95db159

Browse files
committed
fix: ensure publisher leads committer
1 parent 22592de commit 95db159

File tree

2 files changed

+117
-5
lines changed

2 files changed

+117
-5
lines changed

internal/orchestrator/committer.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Committer struct {
2727
commitFromBlock *big.Int
2828
rpc rpc.IRPCClient
2929
lastCommittedBlock *big.Int
30+
lastCommittedLock sync.RWMutex
3031
publisher *publisher.Publisher
3132
workMode WorkMode
3233
workModeChan chan WorkMode
@@ -76,6 +77,18 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
7677
return committer
7778
}
7879

80+
func (c *Committer) getLastCommittedBlock() *big.Int {
81+
c.lastCommittedLock.RLock()
82+
defer c.lastCommittedLock.RUnlock()
83+
return new(big.Int).Set(c.lastCommittedBlock)
84+
}
85+
86+
func (c *Committer) setLastCommittedBlock(b *big.Int) {
87+
c.lastCommittedLock.Lock()
88+
c.lastCommittedBlock = new(big.Int).Set(b)
89+
c.lastCommittedLock.Unlock()
90+
}
91+
7992
func (c *Committer) initializeParallelPublisher() {
8093
chainID := c.rpc.GetChainID()
8194
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
@@ -115,14 +128,20 @@ func (c *Committer) Start(ctx context.Context) {
115128
if config.Cfg.Publisher.Mode == "parallel" {
116129
c.initializeParallelPublisher()
117130
var wg sync.WaitGroup
131+
publishInterval := interval / 2
132+
if publishInterval <= 0 {
133+
publishInterval = interval
134+
}
118135
wg.Add(2)
119136
go func() {
120137
defer wg.Done()
121-
c.runCommitLoop(ctx, interval)
138+
c.runPublishLoop(ctx, publishInterval)
122139
}()
140+
// allow the publisher to start before the committer
141+
time.Sleep(publishInterval)
123142
go func() {
124143
defer wg.Done()
125-
c.runPublishLoop(ctx, interval)
144+
c.runCommitLoop(ctx, interval)
126145
}()
127146
<-ctx.Done()
128147
wg.Wait()
@@ -169,6 +188,7 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
169188
}
170189

171190
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
191+
chainID := c.rpc.GetChainID()
172192
for {
173193
select {
174194
case <-ctx.Done():
@@ -179,6 +199,15 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration)
179199
log.Debug().Msg("Committer work mode not set, skipping publish")
180200
continue
181201
}
202+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
203+
if err != nil {
204+
log.Error().Err(err).Msg("failed to get last published block number")
205+
continue
206+
}
207+
lastCommitted := c.getLastCommittedBlock()
208+
if lastPublished != nil && lastPublished.Cmp(lastCommitted) >= 0 {
209+
continue
210+
}
182211
if err := c.publish(ctx); err != nil {
183212
log.Error().Err(err).Msg("Error publishing blocks")
184213
}
@@ -225,8 +254,9 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
225254
// If no blocks have been committed yet, start from the fromBlock specified in the config
226255
latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
227256
} else {
228-
if latestCommittedBlockNumber.Cmp(c.lastCommittedBlock) < 0 {
229-
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), c.lastCommittedBlock.String())
257+
lastCommitted := c.getLastCommittedBlock()
258+
if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 {
259+
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String())
230260
return []*big.Int{}, nil
231261
}
232262
}
@@ -472,7 +502,7 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
472502
}()
473503
}
474504

475-
c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number)
505+
c.setLastCommittedBlock(highestBlock.Number)
476506

477507
// Update metrics for successful commits
478508
metrics.SuccessfulCommits.Add(float64(len(blockData)))

internal/orchestrator/committer_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,88 @@ func TestPublishParallelMode(t *testing.T) {
425425
}
426426
}
427427

428+
func TestRunPublishLoopPublishesWhenBehind(t *testing.T) {
429+
defer func() { config.Cfg = config.Config{} }()
430+
config.Cfg.Publisher.Mode = "parallel"
431+
config.Cfg.Publisher.Enabled = false
432+
433+
mockRPC := mocks.NewMockIRPCClient(t)
434+
mockMainStorage := mocks.NewMockIMainStorage(t)
435+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
436+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
437+
mockStorage := storage.IStorage{
438+
MainStorage: mockMainStorage,
439+
StagingStorage: mockStagingStorage,
440+
OrchestratorStorage: mockOrchestratorStorage,
441+
}
442+
committer := NewCommitter(mockRPC, mockStorage)
443+
committer.workMode = WorkModeLive
444+
committer.setLastCommittedBlock(big.NewInt(102))
445+
446+
chainID := big.NewInt(1)
447+
blockData := []common.BlockData{
448+
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
449+
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
450+
}
451+
452+
publishDone := make(chan struct{})
453+
454+
mockRPC.EXPECT().GetChainID().Return(chainID)
455+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
456+
mockRPC.EXPECT().GetChainID().Return(chainID)
457+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
458+
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
459+
mockRPC.EXPECT().GetChainID().Return(chainID)
460+
461+
ctx, cancel := context.WithCancel(context.Background())
462+
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
463+
close(publishDone)
464+
cancel()
465+
return nil
466+
})
467+
468+
go committer.runPublishLoop(ctx, time.Millisecond)
469+
470+
select {
471+
case <-publishDone:
472+
case <-time.After(2 * time.Second):
473+
t.Fatal("publish not triggered")
474+
}
475+
}
476+
477+
func TestRunPublishLoopSkipsWhenAhead(t *testing.T) {
478+
defer func() { config.Cfg = config.Config{} }()
479+
config.Cfg.Publisher.Mode = "parallel"
480+
config.Cfg.Publisher.Enabled = false
481+
482+
mockRPC := mocks.NewMockIRPCClient(t)
483+
mockMainStorage := mocks.NewMockIMainStorage(t)
484+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
485+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
486+
mockStorage := storage.IStorage{
487+
MainStorage: mockMainStorage,
488+
StagingStorage: mockStagingStorage,
489+
OrchestratorStorage: mockOrchestratorStorage,
490+
}
491+
committer := NewCommitter(mockRPC, mockStorage)
492+
committer.workMode = WorkModeLive
493+
committer.setLastCommittedBlock(big.NewInt(102))
494+
495+
chainID := big.NewInt(1)
496+
497+
mockRPC.EXPECT().GetChainID().Return(chainID)
498+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil)
499+
500+
ctx, cancel := context.WithCancel(context.Background())
501+
go committer.runPublishLoop(ctx, time.Millisecond)
502+
time.Sleep(2 * time.Millisecond)
503+
cancel()
504+
time.Sleep(10 * time.Millisecond)
505+
506+
mockStagingStorage.AssertNotCalled(t, "GetStagingData", mock.Anything)
507+
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
508+
}
509+
428510
func TestInitializeParallelPublisherZero(t *testing.T) {
429511
defer func() { config.Cfg = config.Config{} }()
430512
config.Cfg.Publisher.Mode = "parallel"

0 commit comments

Comments
 (0)