Skip to content

Commit 80399a8

Browse files
committed
Remove publisher mode, default to parallel
1 parent 00b5822 commit 80399a8

File tree

3 files changed

+44
-70
lines changed

3 files changed

+44
-70
lines changed

cmd/root.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ func init() {
173173
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
174174
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
175175
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
176-
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
177176
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
178177
rootCmd.PersistentFlags().String("publisher-username", "", "Kafka username for publisher")
179178
rootCmd.PersistentFlags().String("publisher-password", "", "Kafka password for publisher")
@@ -368,7 +367,6 @@ func init() {
368367
viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression"))
369368
viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout"))
370369
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
371-
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
372370
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
373371
viper.BindPFlag("publisher.username", rootCmd.PersistentFlags().Lookup("publisher-username"))
374372
viper.BindPFlag("publisher.password", rootCmd.PersistentFlags().Lookup("publisher-password"))

configs/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ type EventPublisherConfig struct {
227227

228228
type PublisherConfig struct {
229229
Enabled bool `mapstructure:"enabled"`
230-
Mode string `mapstructure:"mode"`
231230
Brokers string `mapstructure:"brokers"`
232231
Username string `mapstructure:"username"`
233232
Password string `mapstructure:"password"`

internal/orchestrator/committer.go

Lines changed: 44 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -72,26 +72,22 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller,
7272
func (c *Committer) Start(ctx context.Context) {
7373
log.Debug().Msgf("Committer running")
7474

75-
if config.Cfg.Publisher.Mode == "parallel" {
76-
var wg sync.WaitGroup
77-
wg.Add(2)
75+
var wg sync.WaitGroup
76+
wg.Add(2)
7877

79-
go func() {
80-
defer wg.Done()
81-
c.runPublishLoop(ctx)
82-
}()
78+
go func() {
79+
defer wg.Done()
80+
c.runPublishLoop(ctx)
81+
}()
8382

84-
go func() {
85-
defer wg.Done()
86-
c.runCommitLoop(ctx)
87-
}()
83+
go func() {
84+
defer wg.Done()
85+
c.runCommitLoop(ctx)
86+
}()
8887

89-
<-ctx.Done()
88+
<-ctx.Done()
9089

91-
wg.Wait()
92-
} else {
93-
c.runCommitLoop(ctx)
94-
}
90+
wg.Wait()
9591

9692
log.Info().Msg("Committer shutting down")
9793
c.publisher.Close()
@@ -112,6 +108,7 @@ func (c *Committer) initCommittedAndPublishedBlockNumbers() error {
112108
}
113109
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
114110

111+
// Initialize published block number
115112
lastPublished, err := c.storage.OrchestratorStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
116113
if err != nil {
117114
return err
@@ -145,16 +142,7 @@ func (c *Committer) runCommitLoop(ctx context.Context) {
145142
log.Info().Msgf("Committer reached configured toBlock %s, the last commit block is %d, stopping commits", c.commitToBlock.String(), c.lastCommittedBlock.Load())
146143
return
147144
}
148-
blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx)
149-
if err != nil {
150-
log.Error().Err(err).Msg("Error getting block data to commit")
151-
continue
152-
}
153-
if len(blockDataToCommit) == 0 {
154-
log.Debug().Msg("No block data to commit")
155-
continue
156-
}
157-
if err := c.commit(ctx, blockDataToCommit); err != nil {
145+
if err := c.commit(ctx); err != nil {
158146
log.Error().Err(err).Msg("Error committing blocks")
159147
}
160148
go c.cleanupProcessedStagingBlocks(ctx)
@@ -362,30 +350,16 @@ func (c *Committer) getSequentialBlockData(ctx context.Context, blockNumbers []*
362350
return sequentialBlockData, nil
363351
}
364352

365-
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
366-
blocksToCommit, err := c.getBlockNumbersToCommit(ctx)
367-
if err != nil {
368-
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
369-
}
370-
if len(blocksToCommit) == 0 {
371-
return nil, nil
372-
}
373-
return c.getSequentialBlockData(ctx, blocksToCommit)
374-
}
375-
376-
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
377-
blocksToPublish, err := c.getBlockNumbersToPublish(ctx)
353+
func (c *Committer) publish(ctx context.Context) error {
354+
blockNumbersToPublish, err := c.getBlockNumbersToPublish(ctx)
378355
if err != nil {
379-
return nil, fmt.Errorf("error determining blocks to publish: %v", err)
356+
return fmt.Errorf("error determining blocks to publish: %v", err)
380357
}
381-
if len(blocksToPublish) == 0 {
382-
return nil, nil
358+
if len(blockNumbersToPublish) == 0 {
359+
return nil
383360
}
384-
return c.getSequentialBlockData(ctx, blocksToPublish)
385-
}
386361

387-
func (c *Committer) publish(ctx context.Context) error {
388-
blockData, err := c.getSequentialBlockDataToPublish(ctx)
362+
blockData, err := c.getSequentialBlockData(ctx, blockNumbersToPublish)
389363
if err != nil {
390364
return err
391365
}
@@ -394,48 +368,51 @@ func (c *Committer) publish(ctx context.Context) error {
394368
}
395369

396370
if err := c.publisher.PublishBlockData(blockData); err != nil {
371+
log.Error().Err(err).Msgf("Failed to publish blocks: %v", blockNumbersToPublish)
397372
return err
398373
}
399374

400375
chainID := c.rpc.GetChainID()
401376
highest := blockData[len(blockData)-1].Block.Number
402377
if err := c.storage.OrchestratorStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
378+
log.Error().Err(err).Msgf("Failed to update last published block number to %s", highest.String())
403379
return err
404380
}
381+
405382
c.lastPublishedBlock.Store(highest.Uint64())
406383
return nil
407384
}
408385

409-
func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
410-
blockNumbers := make([]*big.Int, len(blockData))
411-
highestBlock := blockData[0].Block
412-
for i, block := range blockData {
413-
blockNumbers[i] = block.Block.Number
414-
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
415-
highestBlock = block.Block
416-
}
386+
func (c *Committer) commit(ctx context.Context) error {
387+
blockNumbersToCommit, err := c.getBlockNumbersToCommit(ctx)
388+
if err != nil {
389+
return fmt.Errorf("error determining blocks to commit: %v", err)
390+
}
391+
if len(blockNumbersToCommit) == 0 {
392+
return nil
393+
}
394+
395+
blockData, err := c.getSequentialBlockData(ctx, blockNumbersToCommit)
396+
if err != nil {
397+
log.Error().Err(err).Msg("Error getting block data to commit")
398+
return err
417399
}
418-
log.Debug().Msgf("Committing %d blocks from %s to %s", len(blockNumbers), blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String())
400+
if len(blockData) == 0 {
401+
return nil
402+
}
403+
404+
highestBlock := blockData[len(blockData)-1].Block
405+
406+
log.Debug().Msgf("Committing %d blocks from %s to %s", len(blockData), blockData[0].Block.Number.String(), highestBlock.Number.String())
419407

420408
mainStorageStart := time.Now()
421409
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
422-
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
410+
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbersToCommit)
423411
return fmt.Errorf("error saving data to main storage: %v", err)
424412
}
425413
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
426414
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
427415

428-
if config.Cfg.Publisher.Mode == "default" {
429-
highest := highestBlock.Number.Uint64()
430-
go func() {
431-
if err := c.publisher.PublishBlockData(blockData); err != nil {
432-
log.Error().Err(err).Msg("Failed to publish block data to kafka")
433-
return
434-
}
435-
c.lastPublishedBlock.Store(highest)
436-
}()
437-
}
438-
439416
c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
440417

441418
// Update metrics for successful commits

0 commit comments

Comments
 (0)