Skip to content

Commit 7fd4e93

Browse files
committed
feat: add parallel publisher mode
1 parent 4c5320e commit 7fd4e93

File tree

5 files changed

+141
-55
lines changed

5 files changed

+141
-55
lines changed

cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func init() {
135135
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
136136
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
137137
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
138-
rootCmd.PersistentFlags().String("publisher-mode", "post-commit", "Publisher mode: pre-commit or post-commit")
138+
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
139139
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
140140
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
141141
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")

configs/config.example.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ api:
190190
publisher:
191191
# Whether the publisher is enabled
192192
enabled: true
193-
# Publisher mode: "pre-commit" publishes before writing to storage, "post-commit" publishes after commit
194-
mode: post-commit
193+
# Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing
194+
mode: default
195195
# Kafka broker addresses (comma-separated)
196196
brokers: localhost:9092
197197

configs/test_config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ api:
6464

6565
publisher:
6666
enabled: false
67-
mode: post-commit
67+
mode: default
6868

6969
validation:
7070
mode: minimal

internal/orchestrator/committer.go

Lines changed: 116 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"math/big"
77
"sort"
8+
"sync"
89
"time"
910

1011
"github.com/rs/zerolog/log"
@@ -83,11 +84,33 @@ func (c *Committer) Start(ctx context.Context) {
8384
// Clean up staging data before starting the committer
8485
c.cleanupStagingData()
8586

87+
if config.Cfg.Publisher.Mode == "parallel" {
88+
var wg sync.WaitGroup
89+
wg.Add(2)
90+
go func() {
91+
defer wg.Done()
92+
c.runCommitLoop(ctx, interval)
93+
}()
94+
go func() {
95+
defer wg.Done()
96+
c.runPublishLoop(ctx, interval)
97+
}()
98+
<-ctx.Done()
99+
wg.Wait()
100+
log.Info().Msg("Committer shutting down")
101+
c.publisher.Close()
102+
return
103+
}
104+
105+
c.runCommitLoop(ctx, interval)
106+
log.Info().Msg("Committer shutting down")
107+
c.publisher.Close()
108+
}
109+
110+
func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
86111
for {
87112
select {
88113
case <-ctx.Done():
89-
log.Info().Msg("Committer shutting down")
90-
c.publisher.Close()
91114
return
92115
case workMode := <-c.workModeChan:
93116
if workMode != c.workMode && workMode != "" {
@@ -116,6 +139,24 @@ func (c *Committer) Start(ctx context.Context) {
116139
}
117140
}
118141

142+
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
143+
for {
144+
select {
145+
case <-ctx.Done():
146+
return
147+
default:
148+
time.Sleep(interval)
149+
if c.workMode == "" {
150+
log.Debug().Msg("Committer work mode not set, skipping publish")
151+
continue
152+
}
153+
if err := c.publish(ctx); err != nil {
154+
log.Error().Err(err).Msg("Error publishing blocks")
155+
}
156+
}
157+
}
158+
}
159+
119160
func (c *Committer) cleanupStagingData() {
120161
// Get the last committed block number from main storage
121162
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
@@ -293,6 +334,78 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
293334
return sequentialBlockData, nil
294335
}
295336

337+
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
338+
chainID := c.rpc.GetChainID()
339+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
340+
if err != nil {
341+
return nil, fmt.Errorf("failed to get last published block number: %v", err)
342+
}
343+
344+
startBlock := new(big.Int).Set(c.commitFromBlock)
345+
if lastPublished != nil && lastPublished.Sign() > 0 {
346+
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
347+
}
348+
349+
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
350+
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
351+
blockNumbers := make([]*big.Int, blockCount)
352+
for i := int64(0); i < blockCount; i++ {
353+
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
354+
}
355+
356+
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
357+
if err != nil {
358+
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
359+
}
360+
if len(blocksData) == 0 {
361+
return nil, nil
362+
}
363+
364+
sort.Slice(blocksData, func(i, j int) bool {
365+
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
366+
})
367+
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
368+
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
369+
return nil, nil
370+
}
371+
372+
sequential := []common.BlockData{blocksData[0]}
373+
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
374+
for i := 1; i < len(blocksData); i++ {
375+
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
376+
continue
377+
}
378+
if blocksData[i].Block.Number.Cmp(expected) != 0 {
379+
break
380+
}
381+
sequential = append(sequential, blocksData[i])
382+
expected.Add(expected, big.NewInt(1))
383+
}
384+
385+
return sequential, nil
386+
}
387+
388+
func (c *Committer) publish(ctx context.Context) error {
389+
blockData, err := c.getSequentialBlockDataToPublish(ctx)
390+
if err != nil {
391+
return err
392+
}
393+
if len(blockData) == 0 {
394+
return nil
395+
}
396+
397+
if err := c.publisher.PublishBlockData(blockData); err != nil {
398+
return err
399+
}
400+
401+
chainID := c.rpc.GetChainID()
402+
highest := blockData[len(blockData)-1].Block.Number
403+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
404+
return err
405+
}
406+
return nil
407+
}
408+
296409
func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
297410
blockNumbers := make([]*big.Int, len(blockData))
298411
highestBlock := blockData[0].Block
@@ -303,31 +416,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
303416
}
304417
}
305418
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
306-
307-
shouldPostCommitPublish := true
308-
309-
if config.Cfg.Publisher.Mode == "pre-commit" {
310-
chainID := c.rpc.GetChainID()
311-
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
312-
if err != nil {
313-
log.Error().Err(err).Msg("Failed to get last published block number, falling back to post-commit")
314-
} else if lastPublished == nil || lastPublished.Cmp(highestBlock.Number) < 0 {
315-
go func() {
316-
if err := c.publisher.PublishBlockData(blockData); err != nil {
317-
log.Error().Err(err).Msg("Failed to publish block data to kafka")
318-
return
319-
}
320-
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil {
321-
log.Error().Err(err).Msg("Failed to set last published block number")
322-
}
323-
}()
324-
shouldPostCommitPublish = false
325-
} else {
326-
log.Debug().Msgf("Skipping publish, latest published block %s >= current %s", lastPublished.String(), highestBlock.Number.String())
327-
shouldPostCommitPublish = false
328-
}
329-
}
330-
331419
mainStorageStart := time.Now()
332420
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
333421
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
@@ -336,7 +424,7 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
336424
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
337425
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
338426

339-
if shouldPostCommitPublish {
427+
if config.Cfg.Publisher.Mode == "default" {
340428
go func() {
341429
if err := c.publisher.PublishBlockData(blockData); err != nil {
342430
log.Error().Err(err).Msg("Failed to publish block data to kafka")

internal/orchestrator/committer_test.go

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package orchestrator
22

33
import (
44
"context"
5-
"errors"
65
"math/big"
76
"testing"
87
"time"
@@ -352,9 +351,9 @@ func TestCommit(t *testing.T) {
352351
}
353352
}
354353

355-
func TestCommitPreCommitPublisherMode(t *testing.T) {
354+
func TestCommitParallelPublisherMode(t *testing.T) {
356355
defer func() { config.Cfg = config.Config{} }()
357-
config.Cfg.Publisher.Mode = "pre-commit"
356+
config.Cfg.Publisher.Mode = "parallel"
358357

359358
mockRPC := mocks.NewMockIRPCClient(t)
360359
mockMainStorage := mocks.NewMockIMainStorage(t)
@@ -374,29 +373,18 @@ func TestCommitPreCommitPublisherMode(t *testing.T) {
374373
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
375374
}
376375

377-
publishDone := make(chan struct{})
378-
379-
mockRPC.EXPECT().GetChainID().Return(chainID)
380-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
381-
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
382-
close(publishDone)
383-
return nil
384-
})
385376
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
386377

387378
err := committer.commit(context.Background(), blockData)
388379
assert.NoError(t, err)
389380

390-
select {
391-
case <-publishDone:
392-
case <-time.After(2 * time.Second):
393-
t.Fatal("SetLastPublishedBlockNumber was not called")
394-
}
381+
mockStagingStorage.AssertNotCalled(t, "GetLastPublishedBlockNumber", mock.Anything)
382+
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
395383
}
396384

397-
func TestCommitPreCommitPublisherModeFallback(t *testing.T) {
385+
func TestPublishParallelMode(t *testing.T) {
398386
defer func() { config.Cfg = config.Config{} }()
399-
config.Cfg.Publisher.Mode = "pre-commit"
387+
config.Cfg.Publisher.Mode = "parallel"
400388

401389
mockRPC := mocks.NewMockIRPCClient(t)
402390
mockMainStorage := mocks.NewMockIMainStorage(t)
@@ -416,15 +404,25 @@ func TestCommitPreCommitPublisherModeFallback(t *testing.T) {
416404
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
417405
}
418406

407+
publishDone := make(chan struct{})
408+
419409
mockRPC.EXPECT().GetChainID().Return(chainID)
420-
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(nil, errors.New("boom"))
421-
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
410+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
411+
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
412+
mockRPC.EXPECT().GetChainID().Return(chainID)
413+
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
414+
close(publishDone)
415+
return nil
416+
})
422417

423-
err := committer.commit(context.Background(), blockData)
418+
err := committer.publish(context.Background())
424419
assert.NoError(t, err)
425420

426-
time.Sleep(100 * time.Millisecond)
427-
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
421+
select {
422+
case <-publishDone:
423+
case <-time.After(2 * time.Second):
424+
t.Fatal("SetLastPublishedBlockNumber was not called")
425+
}
428426
}
429427

430428
func TestHandleGap(t *testing.T) {

0 commit comments

Comments
 (0)