Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func init() {
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate --publisher-mode against allowed values.

Add a fast-fail validation for default|parallel to avoid silent misconfigurations.

Example (place in init() after binds or in initConfig()):

mode := viper.GetString("publisher.mode")
if mode == "" {
  viper.Set("publisher.mode", "default")
} else if mode != "default" && mode != "parallel" {
  // Use your logger if preferred
  panic(fmt.Errorf("invalid --publisher-mode %q (allowed: default, parallel)", mode))
}
🤖 Prompt for AI Agents
In cmd/root.go at line 138, the --publisher-mode flag is defined but lacks
validation for allowed values. Add a validation step after flag binding or in
the initConfig() function to check if the value is either "default" or
"parallel". If the value is empty, set it to "default". If it is any other
value, immediately fail by panicking with a clear error message indicating the
invalid value and allowed options.

rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
Expand Down Expand Up @@ -250,6 +251,7 @@ func init() {
viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression"))
viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout"))
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))
Expand Down
4 changes: 3 additions & 1 deletion configs/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ api:
publisher:
# Whether the publisher is enabled
enabled: true
# Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing
mode: default
# Kafka broker addresses (comma-separated)
brokers: localhost:9092

# Block publishing configuration
blocks:
# Whether to publish block data
Expand Down
1 change: 1 addition & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type EventPublisherConfig struct {

type PublisherConfig struct {
Enabled bool `mapstructure:"enabled"`
Mode string `mapstructure:"mode"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Harden publisher.mode with validation (and consider a typed enum).

Right now Mode accepts any string. Add early validation to prevent misconfigurations; optionally use a typed enum for clarity.

Apply this minimal diff to use a typed enum in the struct:

 type PublisherConfig struct {
   Enabled      bool                       `mapstructure:"enabled"`
-  Mode         string                     `mapstructure:"mode"`
+  Mode         PublisherMode              `mapstructure:"mode"`
   Brokers      string                     `mapstructure:"brokers"`
   Username     string                     `mapstructure:"username"`
   Password     string                     `mapstructure:"password"`
   Blocks       BlockPublisherConfig       `mapstructure:"blocks"`
   Transactions TransactionPublisherConfig `mapstructure:"transactions"`
   Traces       TracePublisherConfig       `mapstructure:"traces"`
   Events       EventPublisherConfig       `mapstructure:"events"`
 }

Add these supporting declarations (outside the selected range) near the top-level types:

// PublisherMode defines allowed publisher modes.
type PublisherMode string

const (
  PublisherModeDefault  PublisherMode = "default"
  PublisherModeParallel PublisherMode = "parallel"
)

func (m PublisherMode) IsValid() bool {
  switch m {
  case PublisherModeDefault, PublisherModeParallel:
    return true
  default:
    return false
  }
}

And validate in LoadConfig (after viper.Unmarshal, before using Cfg):

// Default and validate publisher mode
if Cfg.Publisher.Mode == "" {
  Cfg.Publisher.Mode = PublisherModeDefault
}
if !Cfg.Publisher.Mode.IsValid() {
  return fmt.Errorf("invalid publisher.mode: %q (allowed: %q, %q)", Cfg.Publisher.Mode, PublisherModeDefault, PublisherModeParallel)
}
🤖 Prompt for AI Agents
In configs/config.go at line 175, replace the Mode field's type from string to a
new typed enum PublisherMode. Define the PublisherMode type and its allowed
constants (PublisherModeDefault and PublisherModeParallel) near the top-level
types, along with an IsValid() method to check validity. Then, in the LoadConfig
function, after unmarshaling the config, add validation to set a default mode if
empty and return an error if the mode is invalid, ensuring early detection of
misconfigurations.

Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Expand Down
1 change: 1 addition & 0 deletions configs/test_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ api:

publisher:
enabled: false
mode: default

validation:
mode: minimal
Expand Down
170 changes: 155 additions & 15 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -75,6 +76,34 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
return committer
}

func (c *Committer) initializeParallelPublisher() {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
log.Error().Err(err).Msg("failed to get last published block number")
return
}
mainMax, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
if err != nil {
log.Error().Err(err).Msg("failed to get max block number from main storage")
return
}
if lastPublished == nil || lastPublished.Sign() == 0 {
if mainMax != nil && mainMax.Sign() > 0 {
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil {
log.Error().Err(err).Msg("failed to set last published block number")
}
}
return
}
if lastPublished.Cmp(mainMax) < 0 {
log.Warn().Msgf("Publish block number seek ahead from %s to %s", lastPublished.String(), mainMax.String())
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil {
log.Error().Err(err).Msg("failed to set last published block number")
}
}
}

func (c *Committer) Start(ctx context.Context) {
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond

Expand All @@ -83,11 +112,34 @@ func (c *Committer) Start(ctx context.Context) {
// Clean up staging data before starting the committer
c.cleanupStagingData()

if config.Cfg.Publisher.Mode == "parallel" {
c.initializeParallelPublisher()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c.runCommitLoop(ctx, interval)
}()
go func() {
defer wg.Done()
c.runPublishLoop(ctx, interval)
}()
<-ctx.Done()
wg.Wait()
log.Info().Msg("Committer shutting down")
c.publisher.Close()
return
}

c.runCommitLoop(ctx, interval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in the else if not parallel else it'll run twice

log.Info().Msg("Committer shutting down")
c.publisher.Close()
}

func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
for {
select {
case <-ctx.Done():
log.Info().Msg("Committer shutting down")
c.publisher.Close()
return
case workMode := <-c.workModeChan:
if workMode != c.workMode && workMode != "" {
Expand Down Expand Up @@ -116,6 +168,24 @@ func (c *Committer) Start(ctx context.Context) {
}
}

func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
for {
select {
case <-ctx.Done():
return
default:
time.Sleep(interval)
if c.workMode == "" {
log.Debug().Msg("Committer work mode not set, skipping publish")
continue
}
if err := c.publish(ctx); err != nil {
log.Error().Err(err).Msg("Error publishing blocks")
}
}
}
}

func (c *Committer) cleanupStagingData() {
// Get the last committed block number from main storage
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
Expand Down Expand Up @@ -293,13 +363,88 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
return sequentialBlockData, nil
}

func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
return nil, fmt.Errorf("failed to get last published block number: %v", err)
}

startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished != nil && lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
}

endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}

blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting by block range would be better than passing all block numbers since this is doing the same thing

if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
}
if len(blocksData) == 0 {
return nil, nil
}

sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
return nil, nil
}

sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}

return sequential, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid DB round-trip on every publish iteration

getSequentialBlockDataToPublish re-queries GetLastPublishedBlockNumber for each loop, although the value is already maintained in c.lastPublishedBlock (atomic).
This adds an unnecessary hot-path DB hit and causes contention once the publisher is caught up.

Consider relying on the in-memory counter and persisting only after a successful publish:

-lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
-...
-if lastPublished != nil && lastPublished.Sign() > 0 {
-    startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
+lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
+if lastPublished.Sign() > 0 {
+    startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
 }

Still keep the initial sync in Start() to seed the counter from storage.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
return nil, fmt.Errorf("failed to get last published block number: %v", err)
}
startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished != nil && lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
}
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
}
if len(blocksData) == 0 {
return nil, nil
}
sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
return nil, nil
}
sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}
return sequential, nil
}
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
// Use in-memory counter instead of hitting the DB every iteration
lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
}
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
}
if len(blocksData) == 0 {
return nil, nil
}
sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf(
"First block to publish %s does not match expected %s",
blocksData[0].Block.Number.String(),
startBlock.String(),
)
return nil, nil
}
sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}
return sequential, nil
}
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go lines 374 to 423, the method
getSequentialBlockDataToPublish currently fetches the last published block
number from the database on every call, causing unnecessary DB hits and
contention. To fix this, modify the method to use the in-memory atomic counter
c.lastPublishedBlock instead of querying
storage.StagingStorage.GetLastPublishedBlockNumber each time. Ensure that
c.lastPublishedBlock is properly initialized once during the Start() method from
storage, and update it only after a successful publish, avoiding repeated DB
round-trips in the hot path.


func (c *Committer) publish(ctx context.Context) error {
blockData, err := c.getSequentialBlockDataToPublish(ctx)
if err != nil {
return err
}
if len(blockData) == 0 {
return nil
}

if err := c.publisher.PublishBlockData(blockData); err != nil {
return err
}

chainID := c.rpc.GetChainID()
highest := blockData[len(blockData)-1].Block.Number
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
return err
}
return nil
}

func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
highestBlock := blockData[0].Block
for i, block := range blockData {
blockNumbers[i] = block.Block.Number
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
highestBlock = block.Block
}
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))

mainStorageStart := time.Now()
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
Expand All @@ -308,11 +453,13 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())

go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
}
}()
if config.Cfg.Publisher.Mode == "default" {
go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
}
}()
}

if c.workMode == WorkModeBackfill {
go func() {
Expand All @@ -325,13 +472,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
}()
}

// Find highest block number from committed blocks
highestBlock := blockData[0].Block
for _, block := range blockData {
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
highestBlock = block.Block
}
}
c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number)

// Update metrics for successful commits
Expand Down
Loading