Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func init() {
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
Expand Down Expand Up @@ -250,6 +251,7 @@ func init() {
viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression"))
viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout"))
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))
Expand Down
4 changes: 3 additions & 1 deletion configs/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ api:
publisher:
# Whether the publisher is enabled
enabled: true
# Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing
mode: default
# Kafka broker addresses (comma-separated)
brokers: localhost:9092

# Block publishing configuration
blocks:
# Whether to publish block data
Expand Down
1 change: 1 addition & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type EventPublisherConfig struct {

type PublisherConfig struct {
Enabled bool `mapstructure:"enabled"`
Mode string `mapstructure:"mode"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate and constrain publisher.mode to known values

Mode is a free-form string. Add typed constants and validate/normalize it during config load to prevent silent misconfiguration.

Example (outside this line range):

// Near other config-level constants
const (
	PublisherModePreCommit  = "pre-commit"
	PublisherModePostCommit = "post-commit"
)

// In LoadConfig(), after viper.Unmarshal(&Cfg) and before returning
Cfg.Publisher.Mode = strings.TrimSpace(strings.ToLower(Cfg.Publisher.Mode))
if Cfg.Publisher.Mode == "" {
	Cfg.Publisher.Mode = PublisherModePostCommit
}
switch Cfg.Publisher.Mode {
case PublisherModePreCommit, PublisherModePostCommit:
	// ok
default:
	return fmt.Errorf("invalid publisher.mode: %q (allowed: %q, %q)", Cfg.Publisher.Mode, PublisherModePreCommit, PublisherModePostCommit)
}

Optional: expose these constants to other packages to avoid stringly-typed comparisons across the codebase.

🤖 Prompt for AI Agents
In configs/config.go at line 175, the Mode field is a free-form string without
validation, which can lead to silent misconfiguration. Define typed constants
for allowed modes (e.g., PublisherModePreCommit and PublisherModePostCommit)
near other config constants. In the LoadConfig function, after unmarshaling the
config, normalize the Mode value by trimming spaces and converting to lowercase,
set a default if empty, and validate it against the allowed constants, returning
an error if invalid. Optionally, export these constants for use in other
packages to avoid stringly-typed comparisons.

Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Expand Down
1 change: 1 addition & 0 deletions configs/test_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ api:

publisher:
enabled: false
mode: default

validation:
mode: minimal
Expand Down
141 changes: 126 additions & 15 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -83,11 +84,33 @@ func (c *Committer) Start(ctx context.Context) {
// Clean up staging data before starting the committer
c.cleanupStagingData()

if config.Cfg.Publisher.Mode == "parallel" {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c.runCommitLoop(ctx, interval)
}()
go func() {
defer wg.Done()
c.runPublishLoop(ctx, interval)
}()
<-ctx.Done()
wg.Wait()
log.Info().Msg("Committer shutting down")
c.publisher.Close()
return
}

c.runCommitLoop(ctx, interval)
log.Info().Msg("Committer shutting down")
c.publisher.Close()
}

func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
for {
select {
case <-ctx.Done():
log.Info().Msg("Committer shutting down")
c.publisher.Close()
return
case workMode := <-c.workModeChan:
if workMode != c.workMode && workMode != "" {
Expand Down Expand Up @@ -116,6 +139,24 @@ func (c *Committer) Start(ctx context.Context) {
}
}

func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
for {
select {
case <-ctx.Done():
return
default:
time.Sleep(interval)
if c.workMode == "" {
log.Debug().Msg("Committer work mode not set, skipping publish")
continue
}
if err := c.publish(ctx); err != nil {
log.Error().Err(err).Msg("Error publishing blocks")
}
}
}
}

func (c *Committer) cleanupStagingData() {
// Get the last committed block number from main storage
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
Expand Down Expand Up @@ -293,13 +334,88 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
return sequentialBlockData, nil
}

func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
return nil, fmt.Errorf("failed to get last published block number: %v", err)
}

startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished != nil && lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
}

endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}

blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
}
if len(blocksData) == 0 {
return nil, nil
}

sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
return nil, nil
}

sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}

return sequential, nil
}

func (c *Committer) publish(ctx context.Context) error {
blockData, err := c.getSequentialBlockDataToPublish(ctx)
if err != nil {
return err
}
if len(blockData) == 0 {
return nil
}

if err := c.publisher.PublishBlockData(blockData); err != nil {
return err
}

chainID := c.rpc.GetChainID()
highest := blockData[len(blockData)-1].Block.Number
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
return err
}
return nil
}

func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
highestBlock := blockData[0].Block
for i, block := range blockData {
blockNumbers[i] = block.Block.Number
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
highestBlock = block.Block
}
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))

mainStorageStart := time.Now()
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
Expand All @@ -308,11 +424,13 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())

go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
}
}()
if config.Cfg.Publisher.Mode == "default" {
go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
}
}()
}

if c.workMode == WorkModeBackfill {
go func() {
Expand All @@ -325,13 +443,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
}()
}

// Find highest block number from committed blocks
highestBlock := blockData[0].Block
for _, block := range blockData {
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
highestBlock = block.Block
}
}
c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number)

// Update metrics for successful commits
Expand Down
79 changes: 77 additions & 2 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,10 @@ func TestCommit(t *testing.T) {
committer := NewCommitter(mockRPC, mockStorage)
committer.workMode = WorkModeBackfill

chainID := big.NewInt(1)
blockData := []common.BlockData{
{Block: common.Block{Number: big.NewInt(101)}},
{Block: common.Block{Number: big.NewInt(102)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
}

// Create a channel to signal when DeleteStagingData is called
Expand All @@ -350,6 +351,80 @@ func TestCommit(t *testing.T) {
}
}

func TestCommitParallelPublisherMode(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Publisher.Mode = "parallel"

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}
committer := NewCommitter(mockRPC, mockStorage)
committer.workMode = WorkModeLive

chainID := big.NewInt(1)
blockData := []common.BlockData{
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
}

mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)

err := committer.commit(context.Background(), blockData)
assert.NoError(t, err)

mockStagingStorage.AssertNotCalled(t, "GetLastPublishedBlockNumber", mock.Anything)
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
}

func TestPublishParallelMode(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Publisher.Mode = "parallel"

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}
committer := NewCommitter(mockRPC, mockStorage)
committer.workMode = WorkModeLive

chainID := big.NewInt(1)
blockData := []common.BlockData{
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
}

publishDone := make(chan struct{})

mockRPC.EXPECT().GetChainID().Return(chainID)
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
close(publishDone)
return nil
})

err := committer.publish(context.Background())
assert.NoError(t, err)

select {
case <-publishDone:
case <-time.After(2 * time.Second):
t.Fatal("SetLastPublishedBlockNumber was not called")
}
}

func TestHandleGap(t *testing.T) {
mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
Expand Down
25 changes: 25 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,31 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) {
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database)
if chainId != nil && chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
var blockNumberString string
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
if err != nil {
if err == sql.ErrNoRows {
return big.NewInt(0), nil
}
return nil, err
}
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
}
return blockNumber, nil
}

func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
return c.conn.Exec(context.Background(), query)
}
Comment on lines +1098 to +1101
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use placeholders instead of string interpolation; fix %s with non-string

fmt.Sprintf(... %s, chainId, ...) relies on %s with a non-string. Prefer placeholders and arg binding (consistent with other methods).

-func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
-  query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
-  return c.conn.Exec(context.Background(), query)
-}
+func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
+  query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (?, 'publish', ?)", c.cfg.Database)
+  return c.conn.Exec(context.Background(), query, chainId, blockNumber.String())
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
return c.conn.Exec(context.Background(), query)
}
func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf(
"INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (?, 'publish', ?)",
c.cfg.Database,
)
return c.conn.Exec(context.Background(), query, chainId, blockNumber.String())
}
🤖 Prompt for AI Agents
In internal/storage/clickhouse.go around lines 1098 to 1101, the SQL query uses
fmt.Sprintf with %s placeholders for non-string types like chainId, which is
incorrect and unsafe. Refactor the method to use parameter placeholders in the
query string and pass chainId and blockNumber as arguments to the Exec method,
ensuring proper argument binding and preventing SQL injection.


func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
if chainId.Sign() > 0 {
Expand Down
2 changes: 2 additions & 0 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type IStagingStorage interface {
GetStagingData(qf QueryFilter) (data []common.BlockData, err error)
DeleteStagingData(data []common.BlockData) error
GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
Comment on lines +86 to 88
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Specify “not found” and concurrency semantics for publisher cursor methods

Ambiguity here can lead to bugs across drivers:

  • Define whether “no cursor yet” returns (nil, nil) vs (0, nil). Recommend returning (nil, nil) and documenting it to avoid accidental use of zero as a real block height.
  • Ensure SetLastPublishedBlockNumber is idempotent and implemented as an upsert to avoid races when pre-commit publishing runs in a goroutine.

To confirm coverage and consistency, run:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Search implementations of new cursor methods..."
rg -n "GetLastPublishedBlockNumber\(" -A 2
rg -n "SetLastPublishedBlockNumber\(" -A 2

echo "Check committer uses of publisher mode and cursor methods..."
rg -n "publisher\.mode|pre-commit|post-commit|GetLastPublishedBlockNumber|SetLastPublishedBlockNumber" -A 3 internal

Length of output: 10576


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Locate DeleteOlderThan implementations in storage connectors..."
rg -n "func .*DeleteOlderThan" -A 10 internal/storage

Length of output: 1746


Specify “not found” semantics and ensure idempotent upsert for publisher cursor methods

We need to be explicit about what “no cursor yet” means and make sure both drivers handle and document it uniformly, as well as guarantee that setting the cursor is safe under concurrent pre-commit runs.

internal/storage/connector.go:
• Add a doc comment to GetLastPublishedBlockNumber stating that it returns (nil, nil) when no record exists.
• Note in the interface that SetLastPublishedBlockNumber must be idempotent (an upsert).

internal/storage/postgres.go:
• Catch sql.ErrNoRows in GetLastPublishedBlockNumber, and return (nil, nil) instead of propagating the error.
• (Already in place) SetLastPublishedBlockNumber uses ON CONFLICT … DO UPDATE—no change needed here.

internal/storage/clickhouse.go:
• In GetLastPublishedBlockNumber, detect when the query returns no rows (e.g. via rows.Next() or ErrNoRows) and return (nil, nil).
• Change SetLastPublishedBlockNumber to use a deduplicating insert (for example, a REPLACE INTO or a dedup‐capable MergeTree engine) so concurrent calls don’t create duplicate cursor entries; reading with FINAL alone is not sufficient to guarantee idempotence under race.

– Tests:
• Update any mocks to expect (nil, nil) instead of an error for the “no cursor” case.
• Add coverage for concurrent pre-commit calls to SetLastPublishedBlockNumber and verify that only one logical update occurs.

🤖 Prompt for AI Agents
In internal/storage/connector.go around lines 86 to 88, add a doc comment to
GetLastPublishedBlockNumber specifying it returns (nil, nil) when no record
exists, and note that SetLastPublishedBlockNumber must be idempotent as an
upsert. In internal/storage/postgres.go, modify GetLastPublishedBlockNumber to
catch sql.ErrNoRows and return (nil, nil) instead of an error. In
internal/storage/clickhouse.go, update GetLastPublishedBlockNumber to detect no
rows returned and return (nil, nil), and change SetLastPublishedBlockNumber to
use a deduplicating insert method like REPLACE INTO or a dedup-capable MergeTree
engine to ensure idempotence under concurrent calls. Finally, update tests and
mocks to expect (nil, nil) for no cursor cases and add tests verifying
idempotent behavior of concurrent SetLastPublishedBlockNumber calls.

}

Expand Down
Loading