Skip to content

Commit 9942de0

Browse files
committed
refactor: simplify publisher initialization
1 parent b054a1e commit 9942de0

File tree

2 files changed

+30
-142
lines changed

2 files changed

+30
-142
lines changed

internal/orchestrator/committer.go

Lines changed: 30 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ type Committer struct {
2727
storage storage.IStorage
2828
commitFromBlock *big.Int
2929
rpc rpc.IRPCClient
30-
lastCommittedBlock atomic.Int64
31-
lastPublishedBlock atomic.Int64
30+
lastCommittedBlock atomic.Uint64
31+
lastPublishedBlock atomic.Uint64
3232
publisher *publisher.Publisher
3333
workMode WorkMode
3434
workModeChan chan WorkMode
@@ -69,7 +69,9 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
6969
publisher: publisher.GetInstance(),
7070
workMode: "",
7171
}
72-
committer.lastCommittedBlock.Store(commitFromBlock.Int64())
72+
cfb := commitFromBlock.Uint64()
73+
committer.lastCommittedBlock.Store(cfb)
74+
committer.lastPublishedBlock.Store(cfb)
7375

7476
for _, opt := range opts {
7577
opt(committer)
@@ -79,55 +81,38 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
7981
}
8082

8183
func (c *Committer) getLastCommittedBlock() *big.Int {
82-
return big.NewInt(c.lastCommittedBlock.Load())
84+
return new(big.Int).SetUint64(c.lastCommittedBlock.Load())
8385
}
8486

8587
func (c *Committer) setLastCommittedBlock(b *big.Int) {
86-
c.lastCommittedBlock.Store(b.Int64())
88+
c.lastCommittedBlock.Store(b.Uint64())
8789
}
8890

89-
func (c *Committer) initializeParallelPublisher() {
91+
func (c *Committer) Start(ctx context.Context) {
92+
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
93+
94+
log.Debug().Msgf("Committer running")
9095
chainID := c.rpc.GetChainID()
91-
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
96+
97+
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
9298
if err != nil {
93-
log.Error().Err(err).Msg("failed to get last published block number")
94-
return
99+
log.Error().Msgf("Error getting latest committed block number: %v", err)
100+
} else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
101+
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
95102
}
96-
mainMax, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
103+
104+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
97105
if err != nil {
98-
log.Error().Err(err).Msg("failed to get max block number from main storage")
99-
return
100-
}
101-
if lastPublished == nil || lastPublished.Sign() == 0 {
102-
if mainMax != nil && mainMax.Sign() > 0 {
103-
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil {
104-
log.Error().Err(err).Msg("failed to set last published block number")
105-
}
106-
c.lastPublishedBlock.Store(mainMax.Int64())
107-
}
108-
return
109-
}
110-
if lastPublished.Cmp(mainMax) < 0 {
111-
log.Warn().Msgf("Publish block number seek ahead from %s to %s", lastPublished.String(), mainMax.String())
112-
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil {
113-
log.Error().Err(err).Msg("failed to set last published block number")
114-
}
115-
c.lastPublishedBlock.Store(mainMax.Int64())
116-
return
106+
log.Error().Err(err).Msg("failed to get last published block number")
107+
} else if lastPublished != nil && lastPublished.Sign() > 0 {
108+
c.lastPublishedBlock.Store(lastPublished.Uint64())
109+
} else {
110+
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
117111
}
118-
c.lastPublishedBlock.Store(lastPublished.Int64())
119-
}
120112

121-
func (c *Committer) Start(ctx context.Context) {
122-
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
123-
124-
log.Debug().Msgf("Committer running")
125-
126-
// Clean up staging data before starting the committer
127-
c.cleanupStagingData()
113+
c.cleanupProcessedStagingBlocks()
128114

129115
if config.Cfg.Publisher.Mode == "parallel" {
130-
c.initializeParallelPublisher()
131116
var wg sync.WaitGroup
132117
publishInterval := interval / 2
133118
if publishInterval <= 0 {
@@ -206,7 +191,7 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration)
206191
continue
207192
}
208193
lastCommitted := c.lastCommittedBlock.Load()
209-
if lastPublished != nil && lastPublished.Int64() >= lastCommitted {
194+
if lastPublished != nil && lastPublished.Uint64() >= lastCommitted {
210195
continue
211196
}
212197
if err := c.publish(ctx); err != nil {
@@ -216,27 +201,6 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration)
216201
}
217202
}
218203

219-
func (c *Committer) cleanupStagingData() {
220-
chainID := c.rpc.GetChainID()
221-
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
222-
if err != nil {
223-
log.Error().Msgf("Error getting latest committed block number: %v", err)
224-
return
225-
}
226-
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
227-
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Int64())
228-
}
229-
230-
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
231-
if err != nil {
232-
log.Error().Err(err).Msg("failed to get last published block number")
233-
} else if lastPublished != nil && lastPublished.Sign() > 0 {
234-
c.lastPublishedBlock.Store(lastPublished.Int64())
235-
}
236-
237-
c.cleanupProcessedStagingBlocks()
238-
}
239-
240204
func (c *Committer) cleanupProcessedStagingBlocks() {
241205
committed := c.lastCommittedBlock.Load()
242206
published := c.lastPublishedBlock.Load()
@@ -247,11 +211,11 @@ func (c *Committer) cleanupProcessedStagingBlocks() {
247211
if published < limit {
248212
limit = published
249213
}
250-
if limit <= 0 {
214+
if limit == 0 {
251215
return
252216
}
253217
chainID := c.rpc.GetChainID()
254-
blockNumber := big.NewInt(limit)
218+
blockNumber := new(big.Int).SetUint64(limit)
255219
stagingDeleteStart := time.Now()
256220
if err := c.storage.StagingStorage.DeleteOlderThan(chainID, blockNumber); err != nil {
257221
log.Error().Err(err).Msg("Failed to delete staging data")
@@ -486,7 +450,7 @@ func (c *Committer) publish(ctx context.Context) error {
486450
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
487451
return err
488452
}
489-
c.lastPublishedBlock.Store(highest.Int64())
453+
c.lastPublishedBlock.Store(highest.Uint64())
490454
go c.cleanupProcessedStagingBlocks()
491455
return nil
492456
}
@@ -510,7 +474,7 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
510474
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
511475

512476
if config.Cfg.Publisher.Mode == "default" {
513-
highest := highestBlock.Number.Int64()
477+
highest := highestBlock.Number.Uint64()
514478
go func() {
515479
if err := c.publisher.PublishBlockData(blockData); err != nil {
516480
log.Error().Err(err).Msg("Failed to publish block data to kafka")
@@ -521,7 +485,7 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
521485
}()
522486
}
523487

524-
c.lastCommittedBlock.Store(highestBlock.Number.Int64())
488+
c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
525489
go c.cleanupProcessedStagingBlocks()
526490

527491
// Update metrics for successful commits

internal/orchestrator/committer_test.go

Lines changed: 0 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -548,82 +548,6 @@ func TestRunPublishLoopSkipsWhenAhead(t *testing.T) {
548548
mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything)
549549
}
550550

551-
func TestInitializeParallelPublisherZero(t *testing.T) {
552-
defer func() { config.Cfg = config.Config{} }()
553-
config.Cfg.Publisher.Mode = "parallel"
554-
555-
mockRPC := mocks.NewMockIRPCClient(t)
556-
mockMainStorage := mocks.NewMockIMainStorage(t)
557-
mockStagingStorage := mocks.NewMockIStagingStorage(t)
558-
mockStorage := storage.IStorage{
559-
MainStorage: mockMainStorage,
560-
StagingStorage: mockStagingStorage,
561-
}
562-
committer := NewCommitter(mockRPC, mockStorage)
563-
564-
chainID := big.NewInt(1)
565-
last := big.NewInt(0)
566-
max := big.NewInt(100)
567-
568-
mockRPC.EXPECT().GetChainID().Return(chainID)
569-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil)
570-
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil)
571-
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, max).Return(nil)
572-
573-
committer.initializeParallelPublisher()
574-
}
575-
576-
func TestInitializeParallelPublisherSeekAhead(t *testing.T) {
577-
defer func() { config.Cfg = config.Config{} }()
578-
config.Cfg.Publisher.Mode = "parallel"
579-
580-
mockRPC := mocks.NewMockIRPCClient(t)
581-
mockMainStorage := mocks.NewMockIMainStorage(t)
582-
mockStagingStorage := mocks.NewMockIStagingStorage(t)
583-
mockStorage := storage.IStorage{
584-
MainStorage: mockMainStorage,
585-
StagingStorage: mockStagingStorage,
586-
}
587-
committer := NewCommitter(mockRPC, mockStorage)
588-
589-
chainID := big.NewInt(1)
590-
last := big.NewInt(50)
591-
max := big.NewInt(100)
592-
593-
mockRPC.EXPECT().GetChainID().Return(chainID)
594-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil)
595-
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil)
596-
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, max).Return(nil)
597-
598-
committer.initializeParallelPublisher()
599-
}
600-
601-
func TestInitializeParallelPublisherAhead(t *testing.T) {
602-
defer func() { config.Cfg = config.Config{} }()
603-
config.Cfg.Publisher.Mode = "parallel"
604-
605-
mockRPC := mocks.NewMockIRPCClient(t)
606-
mockMainStorage := mocks.NewMockIMainStorage(t)
607-
mockStagingStorage := mocks.NewMockIStagingStorage(t)
608-
mockStorage := storage.IStorage{
609-
MainStorage: mockMainStorage,
610-
StagingStorage: mockStagingStorage,
611-
}
612-
committer := NewCommitter(mockRPC, mockStorage)
613-
614-
chainID := big.NewInt(1)
615-
last := big.NewInt(150)
616-
max := big.NewInt(100)
617-
618-
mockRPC.EXPECT().GetChainID().Return(chainID)
619-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil)
620-
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil)
621-
622-
committer.initializeParallelPublisher()
623-
624-
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
625-
}
626-
627551
func TestHandleGap(t *testing.T) {
628552
mockRPC := mocks.NewMockIRPCClient(t)
629553
mockMainStorage := mocks.NewMockIMainStorage(t)

0 commit comments

Comments
 (0)