Skip to content

Commit 22592de

Browse files
committed
fix: initialize publish cursor in parallel mode
1 parent 7bfa6de commit 22592de

File tree

10 files changed

+476
-18
lines changed

10 files changed

+476
-18
lines changed

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func init() {
135135
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
136136
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
137137
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
138+
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
138139
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
139140
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
140141
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
@@ -250,6 +251,7 @@ func init() {
250251
viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression"))
251252
viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout"))
252253
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
254+
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
253255
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
254256
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
255257
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))

configs/config.example.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,11 @@ api:
190190
publisher:
191191
# Whether the publisher is enabled
192192
enabled: true
193+
# Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing
194+
mode: default
193195
# Kafka broker addresses (comma-separated)
194196
brokers: localhost:9092
195-
197+
196198
# Block publishing configuration
197199
blocks:
198200
# Whether to publish block data

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ type EventPublisherConfig struct {
172172

173173
type PublisherConfig struct {
174174
Enabled bool `mapstructure:"enabled"`
175+
Mode string `mapstructure:"mode"`
175176
Brokers string `mapstructure:"brokers"`
176177
Username string `mapstructure:"username"`
177178
Password string `mapstructure:"password"`

configs/test_config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ api:
6464

6565
publisher:
6666
enabled: false
67+
mode: default
6768

6869
validation:
6970
mode: minimal

internal/orchestrator/committer.go

Lines changed: 155 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"math/big"
77
"sort"
8+
"sync"
89
"time"
910

1011
"github.com/rs/zerolog/log"
@@ -75,6 +76,34 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
7576
return committer
7677
}
7778

79+
func (c *Committer) initializeParallelPublisher() {
80+
chainID := c.rpc.GetChainID()
81+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
82+
if err != nil {
83+
log.Error().Err(err).Msg("failed to get last published block number")
84+
return
85+
}
86+
mainMax, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
87+
if err != nil {
88+
log.Error().Err(err).Msg("failed to get max block number from main storage")
89+
return
90+
}
91+
if lastPublished == nil || lastPublished.Sign() == 0 {
92+
if mainMax != nil && mainMax.Sign() > 0 {
93+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil {
94+
log.Error().Err(err).Msg("failed to set last published block number")
95+
}
96+
}
97+
return
98+
}
99+
if lastPublished.Cmp(mainMax) < 0 {
100+
log.Warn().Msgf("Publish block number seek ahead from %s to %s", lastPublished.String(), mainMax.String())
101+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil {
102+
log.Error().Err(err).Msg("failed to set last published block number")
103+
}
104+
}
105+
}
106+
78107
func (c *Committer) Start(ctx context.Context) {
79108
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
80109

@@ -83,11 +112,34 @@ func (c *Committer) Start(ctx context.Context) {
83112
// Clean up staging data before starting the committer
84113
c.cleanupStagingData()
85114

115+
if config.Cfg.Publisher.Mode == "parallel" {
116+
c.initializeParallelPublisher()
117+
var wg sync.WaitGroup
118+
wg.Add(2)
119+
go func() {
120+
defer wg.Done()
121+
c.runCommitLoop(ctx, interval)
122+
}()
123+
go func() {
124+
defer wg.Done()
125+
c.runPublishLoop(ctx, interval)
126+
}()
127+
<-ctx.Done()
128+
wg.Wait()
129+
log.Info().Msg("Committer shutting down")
130+
c.publisher.Close()
131+
return
132+
}
133+
134+
c.runCommitLoop(ctx, interval)
135+
log.Info().Msg("Committer shutting down")
136+
c.publisher.Close()
137+
}
138+
139+
func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
86140
for {
87141
select {
88142
case <-ctx.Done():
89-
log.Info().Msg("Committer shutting down")
90-
c.publisher.Close()
91143
return
92144
case workMode := <-c.workModeChan:
93145
if workMode != c.workMode && workMode != "" {
@@ -116,6 +168,24 @@ func (c *Committer) Start(ctx context.Context) {
116168
}
117169
}
118170

171+
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
172+
for {
173+
select {
174+
case <-ctx.Done():
175+
return
176+
default:
177+
time.Sleep(interval)
178+
if c.workMode == "" {
179+
log.Debug().Msg("Committer work mode not set, skipping publish")
180+
continue
181+
}
182+
if err := c.publish(ctx); err != nil {
183+
log.Error().Err(err).Msg("Error publishing blocks")
184+
}
185+
}
186+
}
187+
}
188+
119189
func (c *Committer) cleanupStagingData() {
120190
// Get the last committed block number from main storage
121191
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
@@ -293,13 +363,88 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
293363
return sequentialBlockData, nil
294364
}
295365

366+
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
367+
chainID := c.rpc.GetChainID()
368+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
369+
if err != nil {
370+
return nil, fmt.Errorf("failed to get last published block number: %v", err)
371+
}
372+
373+
startBlock := new(big.Int).Set(c.commitFromBlock)
374+
if lastPublished != nil && lastPublished.Sign() > 0 {
375+
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
376+
}
377+
378+
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
379+
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
380+
blockNumbers := make([]*big.Int, blockCount)
381+
for i := int64(0); i < blockCount; i++ {
382+
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
383+
}
384+
385+
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
386+
if err != nil {
387+
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
388+
}
389+
if len(blocksData) == 0 {
390+
return nil, nil
391+
}
392+
393+
sort.Slice(blocksData, func(i, j int) bool {
394+
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
395+
})
396+
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
397+
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
398+
return nil, nil
399+
}
400+
401+
sequential := []common.BlockData{blocksData[0]}
402+
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
403+
for i := 1; i < len(blocksData); i++ {
404+
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
405+
continue
406+
}
407+
if blocksData[i].Block.Number.Cmp(expected) != 0 {
408+
break
409+
}
410+
sequential = append(sequential, blocksData[i])
411+
expected.Add(expected, big.NewInt(1))
412+
}
413+
414+
return sequential, nil
415+
}
416+
417+
func (c *Committer) publish(ctx context.Context) error {
418+
blockData, err := c.getSequentialBlockDataToPublish(ctx)
419+
if err != nil {
420+
return err
421+
}
422+
if len(blockData) == 0 {
423+
return nil
424+
}
425+
426+
if err := c.publisher.PublishBlockData(blockData); err != nil {
427+
return err
428+
}
429+
430+
chainID := c.rpc.GetChainID()
431+
highest := blockData[len(blockData)-1].Block.Number
432+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
433+
return err
434+
}
435+
return nil
436+
}
437+
296438
func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
297439
blockNumbers := make([]*big.Int, len(blockData))
440+
highestBlock := blockData[0].Block
298441
for i, block := range blockData {
299442
blockNumbers[i] = block.Block.Number
443+
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
444+
highestBlock = block.Block
445+
}
300446
}
301447
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
302-
303448
mainStorageStart := time.Now()
304449
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
305450
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
@@ -308,11 +453,13 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
308453
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
309454
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
310455

311-
go func() {
312-
if err := c.publisher.PublishBlockData(blockData); err != nil {
313-
log.Error().Err(err).Msg("Failed to publish block data to kafka")
314-
}
315-
}()
456+
if config.Cfg.Publisher.Mode == "default" {
457+
go func() {
458+
if err := c.publisher.PublishBlockData(blockData); err != nil {
459+
log.Error().Err(err).Msg("Failed to publish block data to kafka")
460+
}
461+
}()
462+
}
316463

317464
if c.workMode == WorkModeBackfill {
318465
go func() {
@@ -325,13 +472,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
325472
}()
326473
}
327474

328-
// Find highest block number from committed blocks
329-
highestBlock := blockData[0].Block
330-
for _, block := range blockData {
331-
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
332-
highestBlock = block.Block
333-
}
334-
}
335475
c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number)
336476

337477
// Update metrics for successful commits

0 commit comments

Comments
 (0)