Skip to content

Commit 13db8af

Browse files
jakeloonischitpra
andauthored
feat: add parallel publisher mode (#262)
* fix: initialize publish cursor in parallel mode * fix: ensure publisher leads committer * feat: gate staging cleanup on publish and commit * refactor: simplify publisher initialization * refactor: simplify publish loop * chore: inline last committed block accessors * get block range from staging instead * minor query update --------- Co-authored-by: nischit <[email protected]>
1 parent 7bfa6de commit 13db8af

File tree

11 files changed

+566
-127
lines changed

11 files changed

+566
-127
lines changed

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func init() {
135135
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
136136
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
137137
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
138+
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
138139
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
139140
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
140141
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
@@ -250,6 +251,7 @@ func init() {
250251
viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression"))
251252
viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout"))
252253
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
254+
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
253255
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
254256
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
255257
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))

configs/config.example.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,11 @@ api:
190190
publisher:
191191
# Whether the publisher is enabled
192192
enabled: true
193+
# Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing
194+
mode: default
193195
# Kafka broker addresses (comma-separated)
194196
brokers: localhost:9092
195-
197+
196198
# Block publishing configuration
197199
blocks:
198200
# Whether to publish block data

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ type EventPublisherConfig struct {
172172

173173
type PublisherConfig struct {
174174
Enabled bool `mapstructure:"enabled"`
175+
Mode string `mapstructure:"mode"`
175176
Brokers string `mapstructure:"brokers"`
176177
Username string `mapstructure:"username"`
177178
Password string `mapstructure:"password"`

configs/test_config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ api:
6464

6565
publisher:
6666
enabled: false
67+
mode: default
6768

6869
validation:
6970
mode: minimal

internal/orchestrator/committer.go

Lines changed: 190 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"math/big"
77
"sort"
8+
"sync"
9+
"sync/atomic"
810
"time"
911

1012
"github.com/rs/zerolog/log"
@@ -25,7 +27,8 @@ type Committer struct {
2527
storage storage.IStorage
2628
commitFromBlock *big.Int
2729
rpc rpc.IRPCClient
28-
lastCommittedBlock *big.Int
30+
lastCommittedBlock atomic.Uint64
31+
lastPublishedBlock atomic.Uint64
2932
publisher *publisher.Publisher
3033
workMode WorkMode
3134
workModeChan chan WorkMode
@@ -58,15 +61,17 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
5861

5962
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
6063
committer := &Committer{
61-
triggerIntervalMs: triggerInterval,
62-
blocksPerCommit: blocksPerCommit,
63-
storage: storage,
64-
commitFromBlock: commitFromBlock,
65-
rpc: rpc,
66-
lastCommittedBlock: commitFromBlock,
67-
publisher: publisher.GetInstance(),
68-
workMode: "",
69-
}
64+
triggerIntervalMs: triggerInterval,
65+
blocksPerCommit: blocksPerCommit,
66+
storage: storage,
67+
commitFromBlock: commitFromBlock,
68+
rpc: rpc,
69+
publisher: publisher.GetInstance(),
70+
workMode: "",
71+
}
72+
cfb := commitFromBlock.Uint64()
73+
committer.lastCommittedBlock.Store(cfb)
74+
committer.lastPublishedBlock.Store(cfb)
7075

7176
for _, opt := range opts {
7277
opt(committer)
@@ -79,15 +84,63 @@ func (c *Committer) Start(ctx context.Context) {
7984
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
8085

8186
log.Debug().Msgf("Committer running")
87+
chainID := c.rpc.GetChainID()
88+
89+
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
90+
if err != nil {
91+
// It's okay to fail silently here; this value is only used for staging cleanup and
92+
// the worker loop will eventually correct the state and delete as needed.
93+
log.Error().Msgf("Error getting latest committed block number: %v", err)
94+
} else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
95+
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
96+
}
97+
98+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
99+
if err != nil {
100+
// It's okay to fail silently here; it's only used for staging cleanup and will be
101+
// corrected by the worker loop.
102+
log.Error().Err(err).Msg("failed to get last published block number")
103+
} else if lastPublished != nil && lastPublished.Sign() > 0 {
104+
c.lastPublishedBlock.Store(lastPublished.Uint64())
105+
} else {
106+
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
107+
}
108+
109+
c.cleanupProcessedStagingBlocks()
110+
111+
if config.Cfg.Publisher.Mode == "parallel" {
112+
var wg sync.WaitGroup
113+
publishInterval := interval / 2
114+
if publishInterval <= 0 {
115+
publishInterval = interval
116+
}
117+
wg.Add(2)
118+
go func() {
119+
defer wg.Done()
120+
c.runPublishLoop(ctx, publishInterval)
121+
}()
122+
// allow the publisher to start before the committer
123+
time.Sleep(publishInterval)
124+
go func() {
125+
defer wg.Done()
126+
c.runCommitLoop(ctx, interval)
127+
}()
128+
<-ctx.Done()
129+
wg.Wait()
130+
log.Info().Msg("Committer shutting down")
131+
c.publisher.Close()
132+
return
133+
}
82134

83-
// Clean up staging data before starting the committer
84-
c.cleanupStagingData()
135+
c.runCommitLoop(ctx, interval)
136+
log.Info().Msg("Committer shutting down")
137+
c.publisher.Close()
138+
}
85139

140+
func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
86141
for {
87142
select {
88143
case <-ctx.Done():
89-
log.Info().Msg("Committer shutting down")
90-
c.publisher.Close()
91144
return
92145
case workMode := <-c.workModeChan:
93146
if workMode != c.workMode && workMode != "" {
@@ -116,26 +169,46 @@ func (c *Committer) Start(ctx context.Context) {
116169
}
117170
}
118171

119-
func (c *Committer) cleanupStagingData() {
120-
// Get the last committed block number from main storage
121-
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
122-
if err != nil {
123-
log.Error().Msgf("Error getting latest committed block number: %v", err)
124-
return
172+
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
173+
for {
174+
select {
175+
case <-ctx.Done():
176+
return
177+
default:
178+
time.Sleep(interval)
179+
if c.workMode == "" {
180+
log.Debug().Msg("Committer work mode not set, skipping publish")
181+
continue
182+
}
183+
if err := c.publish(ctx); err != nil {
184+
log.Error().Err(err).Msg("Error publishing blocks")
185+
}
186+
}
125187
}
188+
}
126189

127-
if latestCommittedBlockNumber.Sign() == 0 {
128-
log.Debug().Msg("No blocks committed yet, skipping staging data cleanup")
190+
func (c *Committer) cleanupProcessedStagingBlocks() {
191+
committed := c.lastCommittedBlock.Load()
192+
published := c.lastPublishedBlock.Load()
193+
if published == 0 || committed == 0 {
129194
return
130195
}
131-
132-
// Delete all staging data older than the latest committed block number
133-
if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil {
134-
log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err)
196+
limit := committed
197+
if published < limit {
198+
limit = published
199+
}
200+
if limit == 0 {
135201
return
136202
}
137-
138-
log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber)
203+
chainID := c.rpc.GetChainID()
204+
blockNumber := new(big.Int).SetUint64(limit)
205+
stagingDeleteStart := time.Now()
206+
if err := c.storage.StagingStorage.DeleteOlderThan(chainID, blockNumber); err != nil {
207+
log.Error().Err(err).Msg("Failed to delete staging data")
208+
return
209+
}
210+
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds())
211+
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
139212
}
140213

141214
func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) {
@@ -155,8 +228,9 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
155228
// If no blocks have been committed yet, start from the fromBlock specified in the config
156229
latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
157230
} else {
158-
if latestCommittedBlockNumber.Cmp(c.lastCommittedBlock) < 0 {
159-
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), c.lastCommittedBlock.String())
231+
lastCommitted := new(big.Int).SetUint64(c.lastCommittedBlock.Load())
232+
if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 {
233+
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String())
160234
return []*big.Int{}, nil
161235
}
162236
}
@@ -293,13 +367,89 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
293367
return sequentialBlockData, nil
294368
}
295369

370+
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
371+
chainID := c.rpc.GetChainID()
372+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
373+
if err != nil {
374+
return nil, fmt.Errorf("failed to get last published block number: %v", err)
375+
}
376+
377+
startBlock := new(big.Int).Set(c.commitFromBlock)
378+
if lastPublished != nil && lastPublished.Sign() > 0 {
379+
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
380+
}
381+
382+
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
383+
384+
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
385+
ChainId: chainID,
386+
StartBlock: startBlock,
387+
EndBlock: endBlock,
388+
})
389+
if err != nil {
390+
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
391+
}
392+
if len(blocksData) == 0 {
393+
return nil, nil
394+
}
395+
396+
sort.Slice(blocksData, func(i, j int) bool {
397+
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
398+
})
399+
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
400+
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
401+
return nil, nil
402+
}
403+
404+
sequential := []common.BlockData{blocksData[0]}
405+
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
406+
for i := 1; i < len(blocksData); i++ {
407+
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
408+
continue
409+
}
410+
if blocksData[i].Block.Number.Cmp(expected) != 0 {
411+
break
412+
}
413+
sequential = append(sequential, blocksData[i])
414+
expected.Add(expected, big.NewInt(1))
415+
}
416+
417+
return sequential, nil
418+
}
419+
420+
func (c *Committer) publish(ctx context.Context) error {
421+
blockData, err := c.getSequentialBlockDataToPublish(ctx)
422+
if err != nil {
423+
return err
424+
}
425+
if len(blockData) == 0 {
426+
return nil
427+
}
428+
429+
if err := c.publisher.PublishBlockData(blockData); err != nil {
430+
return err
431+
}
432+
433+
chainID := c.rpc.GetChainID()
434+
highest := blockData[len(blockData)-1].Block.Number
435+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
436+
return err
437+
}
438+
c.lastPublishedBlock.Store(highest.Uint64())
439+
go c.cleanupProcessedStagingBlocks()
440+
return nil
441+
}
442+
296443
func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
297444
blockNumbers := make([]*big.Int, len(blockData))
445+
highestBlock := blockData[0].Block
298446
for i, block := range blockData {
299447
blockNumbers[i] = block.Block.Number
448+
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
449+
highestBlock = block.Block
450+
}
300451
}
301452
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
302-
303453
mainStorageStart := time.Now()
304454
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
305455
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
@@ -308,31 +458,20 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
308458
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
309459
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
310460

311-
go func() {
312-
if err := c.publisher.PublishBlockData(blockData); err != nil {
313-
log.Error().Err(err).Msg("Failed to publish block data to kafka")
314-
}
315-
}()
316-
317-
if c.workMode == WorkModeBackfill {
461+
if config.Cfg.Publisher.Mode == "default" {
462+
highest := highestBlock.Number.Uint64()
318463
go func() {
319-
stagingDeleteStart := time.Now()
320-
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
321-
log.Error().Err(err).Msg("Failed to delete staging data")
464+
if err := c.publisher.PublishBlockData(blockData); err != nil {
465+
log.Error().Err(err).Msg("Failed to publish block data to kafka")
466+
return
322467
}
323-
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
324-
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
468+
c.lastPublishedBlock.Store(highest)
469+
c.cleanupProcessedStagingBlocks()
325470
}()
326471
}
327472

328-
// Find highest block number from committed blocks
329-
highestBlock := blockData[0].Block
330-
for _, block := range blockData {
331-
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
332-
highestBlock = block.Block
333-
}
334-
}
335-
c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number)
473+
c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
474+
go c.cleanupProcessedStagingBlocks()
336475

337476
// Update metrics for successful commits
338477
metrics.SuccessfulCommits.Add(float64(len(blockData)))

0 commit comments

Comments
 (0)