Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func init() {
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
rootCmd.PersistentFlags().String("publisher-mode", "post-commit", "Publisher mode: pre-commit or post-commit")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate --publisher-mode at startup to fail fast on invalid values

The help string documents allowed values, but invalid inputs will silently flow through as strings.

Add a validation step after config load (e.g., in initConfig() or inside configs.LoadConfig) to ensure publisher.mode ∈ {"pre-commit","post-commit"} and normalize case. Prefer returning an error (or exiting) if invalid to avoid unexpected runtime behavior.

🤖 Prompt for AI Agents
In cmd/root.go at line 138, the --publisher-mode flag accepts any string without
validation, which can cause silent errors. Add a validation step after loading
the configuration (such as in initConfig() or configs.LoadConfig) to check if
the publisher.mode value is either "pre-commit" or "post-commit"
(case-insensitive). Normalize the input to lowercase and if the value is
invalid, return an error or exit the program immediately to prevent unexpected
runtime behavior.

rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
Expand Down Expand Up @@ -250,6 +251,7 @@ func init() {
viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression"))
viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout"))
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))
Expand Down
4 changes: 3 additions & 1 deletion configs/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ api:
publisher:
# Whether the publisher is enabled
enabled: true
# Publisher mode: "pre-commit" publishes before writing to storage, "post-commit" publishes after commit
mode: post-commit
# Kafka broker addresses (comma-separated)
brokers: localhost:9092

# Block publishing configuration
blocks:
# Whether to publish block data
Expand Down
1 change: 1 addition & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type EventPublisherConfig struct {

type PublisherConfig struct {
Enabled bool `mapstructure:"enabled"`
Mode string `mapstructure:"mode"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate and constrain publisher.mode to known values

Mode is a free-form string. Add typed constants and validate/normalize it during config load to prevent silent misconfiguration.

Example (outside this line range):

// Near other config-level constants
const (
	PublisherModePreCommit  = "pre-commit"
	PublisherModePostCommit = "post-commit"
)

// In LoadConfig(), after viper.Unmarshal(&Cfg) and before returning
Cfg.Publisher.Mode = strings.TrimSpace(strings.ToLower(Cfg.Publisher.Mode))
if Cfg.Publisher.Mode == "" {
	Cfg.Publisher.Mode = PublisherModePostCommit
}
switch Cfg.Publisher.Mode {
case PublisherModePreCommit, PublisherModePostCommit:
	// ok
default:
	return fmt.Errorf("invalid publisher.mode: %q (allowed: %q, %q)", Cfg.Publisher.Mode, PublisherModePreCommit, PublisherModePostCommit)
}

Optional: expose these constants to other packages to avoid stringly-typed comparisons across the codebase.

🤖 Prompt for AI Agents
In configs/config.go at line 175, the Mode field is a free-form string without
validation, which can lead to silent misconfiguration. Define typed constants
for allowed modes (e.g., PublisherModePreCommit and PublisherModePostCommit)
near other config constants. In the LoadConfig function, after unmarshaling the
config, normalize the Mode value by trimming spaces and converting to lowercase,
set a default if empty, and validate it against the allowed constants, returning
an error if invalid. Optionally, export these constants for use in other
packages to avoid stringly-typed comparisons.

Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Expand Down
1 change: 1 addition & 0 deletions configs/test_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ api:

publisher:
enabled: false
mode: post-commit

validation:
mode: minimal
Expand Down
47 changes: 35 additions & 12 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,39 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo

func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
highestBlock := blockData[0].Block
for i, block := range blockData {
blockNumbers[i] = block.Block.Number
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
highestBlock = block.Block
}
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))

shouldPostCommitPublish := true

if config.Cfg.Publisher.Mode == "pre-commit" {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
log.Error().Err(err).Msg("Failed to get last published block number, falling back to post-commit")
} else if lastPublished == nil || lastPublished.Cmp(highestBlock.Number) < 0 {
go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
return
}
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil {
log.Error().Err(err).Msg("Failed to set last published block number")
}
}()
shouldPostCommitPublish = false
} else {
log.Debug().Msgf("Skipping publish, latest published block %s >= current %s", lastPublished.String(), highestBlock.Number.String())
shouldPostCommitPublish = false
}
}

mainStorageStart := time.Now()
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
Expand All @@ -308,11 +336,13 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())

go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
}
}()
if shouldPostCommitPublish {
go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
}
}()
}

if c.workMode == WorkModeBackfill {
go func() {
Expand All @@ -325,13 +355,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
}()
}

// Find highest block number from committed blocks
highestBlock := blockData[0].Block
for _, block := range blockData {
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
highestBlock = block.Block
}
}
c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number)

// Update metrics for successful commits
Expand Down
81 changes: 79 additions & 2 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orchestrator

import (
"context"
"errors"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -324,9 +325,10 @@ func TestCommit(t *testing.T) {
committer := NewCommitter(mockRPC, mockStorage)
committer.workMode = WorkModeBackfill

chainID := big.NewInt(1)
blockData := []common.BlockData{
{Block: common.Block{Number: big.NewInt(101)}},
{Block: common.Block{Number: big.NewInt(102)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
}

// Create a channel to signal when DeleteStagingData is called
Expand All @@ -350,6 +352,81 @@ func TestCommit(t *testing.T) {
}
}

func TestCommitPreCommitPublisherMode(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Publisher.Mode = "pre-commit"

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}
committer := NewCommitter(mockRPC, mockStorage)
committer.workMode = WorkModeLive

chainID := big.NewInt(1)
blockData := []common.BlockData{
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
}

publishDone := make(chan struct{})

mockRPC.EXPECT().GetChainID().Return(chainID)
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
close(publishDone)
return nil
})
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)

err := committer.commit(context.Background(), blockData)
assert.NoError(t, err)

select {
case <-publishDone:
case <-time.After(2 * time.Second):
t.Fatal("SetLastPublishedBlockNumber was not called")
}
}

func TestCommitPreCommitPublisherModeFallback(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Publisher.Mode = "pre-commit"

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}
committer := NewCommitter(mockRPC, mockStorage)
committer.workMode = WorkModeLive

chainID := big.NewInt(1)
blockData := []common.BlockData{
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
}

mockRPC.EXPECT().GetChainID().Return(chainID)
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(nil, errors.New("boom"))
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)

err := committer.commit(context.Background(), blockData)
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
}

func TestHandleGap(t *testing.T) {
mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
Expand Down
25 changes: 25 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,31 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) {
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database)
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
var blockNumberString string
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
if err != nil {
if err == sql.ErrNoRows {
return big.NewInt(0), nil
}
return nil, err
}
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
}
return blockNumber, nil
}

func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
return c.conn.Exec(context.Background(), query)
}
Comment on lines +1098 to +1101
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use placeholders instead of string interpolation; fix %s with non-string

fmt.Sprintf(... %s, chainId, ...) relies on %s with a non-string. Prefer placeholders and arg binding (consistent with other methods).

-func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
-  query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
-  return c.conn.Exec(context.Background(), query)
-}
+func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
+  query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (?, 'publish', ?)", c.cfg.Database)
+  return c.conn.Exec(context.Background(), query, chainId, blockNumber.String())
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
return c.conn.Exec(context.Background(), query)
}
func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf(
"INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (?, 'publish', ?)",
c.cfg.Database,
)
return c.conn.Exec(context.Background(), query, chainId, blockNumber.String())
}
🤖 Prompt for AI Agents
In internal/storage/clickhouse.go around lines 1098 to 1101, the SQL query uses
fmt.Sprintf with %s placeholders for non-string types like chainId, which is
incorrect and unsafe. Refactor the method to use parameter placeholders in the
query string and pass chainId and blockNumber as arguments to the Exec method,
ensuring proper argument binding and preventing SQL injection.


func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
if chainId.Sign() > 0 {
Expand Down
2 changes: 2 additions & 0 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type IStagingStorage interface {
GetStagingData(qf QueryFilter) (data []common.BlockData, err error)
DeleteStagingData(data []common.BlockData) error
GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
Comment on lines +86 to 88
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Specify “not found” and concurrency semantics for publisher cursor methods

Ambiguity here can lead to bugs across drivers:

  • Define whether “no cursor yet” returns (nil, nil) vs (0, nil). Recommend returning (nil, nil) and documenting it to avoid accidental use of zero as a real block height.
  • Ensure SetLastPublishedBlockNumber is idempotent and implemented as an upsert to avoid races when pre-commit publishing runs in a goroutine.

To confirm coverage and consistency, run:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Search implementations of new cursor methods..."
rg -n "GetLastPublishedBlockNumber\(" -A 2
rg -n "SetLastPublishedBlockNumber\(" -A 2

echo "Check committer uses of publisher mode and cursor methods..."
rg -n "publisher\.mode|pre-commit|post-commit|GetLastPublishedBlockNumber|SetLastPublishedBlockNumber" -A 3 internal

Length of output: 10576


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Locate DeleteOlderThan implementations in storage connectors..."
rg -n "func .*DeleteOlderThan" -A 10 internal/storage

Length of output: 1746


Specify “not found” semantics and ensure idempotent upsert for publisher cursor methods

We need to be explicit about what “no cursor yet” means and make sure both drivers handle and document it uniformly, as well as guarantee that setting the cursor is safe under concurrent pre-commit runs.

internal/storage/connector.go:
• Add a doc comment to GetLastPublishedBlockNumber stating that it returns (nil, nil) when no record exists.
• Note in the interface that SetLastPublishedBlockNumber must be idempotent (an upsert).

internal/storage/postgres.go:
• Catch sql.ErrNoRows in GetLastPublishedBlockNumber, and return (nil, nil) instead of propagating the error.
• (Already in place) SetLastPublishedBlockNumber uses ON CONFLICT … DO UPDATE—no change needed here.

internal/storage/clickhouse.go:
• In GetLastPublishedBlockNumber, detect when the query returns no rows (e.g. via rows.Next() or ErrNoRows) and return (nil, nil).
• Change SetLastPublishedBlockNumber to use a deduplicating insert (for example, a REPLACE INTO or a dedup‐capable MergeTree engine) so concurrent calls don’t create duplicate cursor entries; reading with FINAL alone is not sufficient to guarantee idempotence under race.

– Tests:
• Update any mocks to expect (nil, nil) instead of an error for the “no cursor” case.
• Add coverage for concurrent pre-commit calls to SetLastPublishedBlockNumber and verify that only one logical update occurs.

🤖 Prompt for AI Agents
In internal/storage/connector.go around lines 86 to 88, add a doc comment to
GetLastPublishedBlockNumber specifying it returns (nil, nil) when no record
exists, and note that SetLastPublishedBlockNumber must be idempotent as an
upsert. In internal/storage/postgres.go, modify GetLastPublishedBlockNumber to
catch sql.ErrNoRows and return (nil, nil) instead of an error. In
internal/storage/clickhouse.go, update GetLastPublishedBlockNumber to detect no
rows returned and return (nil, nil), and change SetLastPublishedBlockNumber to
use a deduplicating insert method like REPLACE INTO or a dedup-capable MergeTree
engine to ensure idempotence under concurrent calls. Finally, update tests and
mocks to expect (nil, nil) for no cursor cases and add tests verifying
idempotent behavior of concurrent SetLastPublishedBlockNumber calls.

}

Expand Down
29 changes: 29 additions & 0 deletions internal/storage/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,35 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error {
return err
}

func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) {
query := `SELECT cursor_value FROM cursors WHERE cursor_type = 'publish' AND chain_id = $1`

var blockNumberString string
err := p.db.QueryRow(query, chainId.String()).Scan(&blockNumberString)
if err != nil {
if err == sql.ErrNoRows {
return big.NewInt(0), nil
}
return nil, err
}

blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
}
return blockNumber, nil
}

func (p *PostgresConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := `INSERT INTO cursors (chain_id, cursor_type, cursor_value)
VALUES ($1, 'publish', $2)
ON CONFLICT (chain_id, cursor_type)
DO UPDATE SET cursor_value = EXCLUDED.cursor_value, updated_at = NOW()`

_, err := p.db.Exec(query, chainId.String(), blockNumber.String())
return err
}

func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) {
query := `SELECT MAX(block_number) FROM block_data WHERE 1=1`

Expand Down
Loading