Skip to content

Conversation

@jakeloo
Copy link
Member

@jakeloo jakeloo commented Aug 9, 2025

Summary

  • allow staging storage to track last published block
  • enable publishing before storage commit via new flag
  • add config flag publisher-mode
  • publish pre-commit in a goroutine and fall back to post-commit if cursor lookup fails
  • refine pre-commit publishing by looking up chain ID only when needed and using shouldPostCommitPublish flag

Testing

  • go test ./...

https://chatgpt.com/codex/tasks/task_b_6896dc37689c8332b734b4405875abf1

Summary by CodeRabbit

  • New Features

    • Introduced a configurable publisher mode with "default" and "parallel" options, selectable via command-line flag or config file.
    • Added support for parallel publishing with separate commit and publish loops for improved efficiency.
    • Implemented tracking and updating of the last published block number to ensure sequential publishing.
  • Tests

    • Added tests covering the new parallel publisher mode, verifying commit and publish behaviors.
  • Documentation

    • Updated example and test configuration files to include the new publisher mode setting.

@coderabbitai
Copy link

coderabbitai bot commented Aug 9, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

A new "publisher mode" configuration option was introduced to select between "default" (publish after commit) and "parallel" (publish concurrently with commit) modes. This change adds CLI flags, config fields, and modifies committer logic to run commit and publish loops either sequentially or in parallel. Storage connectors and interfaces were extended to track the last published block number, with corresponding tests and mocks updated.

Changes

Cohort / File(s) Change Summary
Publisher Mode Flag & Config
cmd/root.go, configs/config.example.yml, configs/config.go, configs/test_config.yml
Added a persistent CLI flag and configuration parameter publisher.mode with values "default" and "parallel". Updated config structs and example/test config files to include the new field.
Committer Logic
internal/orchestrator/committer.go
Added support for "parallel" publisher mode with separate commit and publish loops running concurrently. Modified Start method to conditionally run loops based on mode. Added methods to fetch sequential block data for publishing and to publish blocks. Adjusted commit logic to conditionally publish asynchronously only in "default" mode.
Committer Tests
internal/orchestrator/committer_test.go
Updated existing test to include chain ID in block data. Added tests for "parallel" publisher mode verifying commit without publishing and publish operation updating last published block number correctly.
Staging Storage Interface
internal/storage/connector.go
Added two new methods to IStagingStorage: GetLastPublishedBlockNumber(chainId *big.Int) and SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) to track last published block number.
ClickHouse Storage Implementation
internal/storage/clickhouse.go
Implemented GetLastPublishedBlockNumber and SetLastPublishedBlockNumber methods for ClickHouse connector to retrieve and update last published block number in the cursors table.
Postgres Storage Implementation
internal/storage/postgres.go
Implemented GetLastPublishedBlockNumber and SetLastPublishedBlockNumber methods for Postgres connector using queries on the cursors table with upsert logic.
Mocks for Staging Storage
test/mocks/MockIStagingStorage.go
Added mock methods and helpers for GetLastPublishedBlockNumber and SetLastPublishedBlockNumber to support new test scenarios involving last published block number management.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI/Config
    participant Committer
    participant StagingStorage
    participant MainStorage
    participant Publisher

    User->>CLI/Config: Set publisher.mode ("default" or "parallel")
    User->>Committer: Start()

    alt publisher.mode == "parallel"
        Committer->>Committer: runCommitLoop()
        Committer->>Committer: runPublishLoop()
        Committer->>MainStorage: Commit blockData (in commit loop)
        Committer->>StagingStorage: GetLastPublishedBlockNumber(chainId) (in publish loop)
        Committer->>Publisher: Publish sequential blocks (in publish loop)
        Committer->>StagingStorage: SetLastPublishedBlockNumber(chainId, blockNumber)
    else publisher.mode == "default"
        User->>Committer: commit(blockData)
        Committer->>MainStorage: Commit blockData
        Committer->>Publisher: Publish blockData asynchronously
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Note

🔌 MCP (Model Context Protocol) integration is now available in Early Access!

Pro users can now connect to remote MCP servers under the Integrations page to get reviews and chat conversations that understand additional development context.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4c5320e and d6362fe.

📒 Files selected for processing (6)
  • cmd/root.go (2 hunks)
  • configs/config.example.yml (1 hunks)
  • configs/test_config.yml (1 hunks)
  • internal/orchestrator/committer.go (5 hunks)
  • internal/orchestrator/committer_test.go (2 hunks)
  • internal/storage/clickhouse.go (1 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch codex/implement-publish-before-storage-mechanism

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (1)
configs/config.go (1)

281-283: Bug: Orchestrator chainBasedConfig env override writes into Main storage

STORAGE_ORCHESTRATOR_CLICKHOUSE_CHAINBASEDCONFIG is parsed, but assigned to Cfg.Storage.Main.Clickhouse instead of Cfg.Storage.Orchestrator.Clickhouse.

Apply:

-	if Cfg.Storage.Main.Clickhouse != nil {
-		Cfg.Storage.Main.Clickhouse.ChainBasedConfig = orchestratorChainConfig
-	}
+	if Cfg.Storage.Orchestrator.Clickhouse != nil {
+		Cfg.Storage.Orchestrator.Clickhouse.ChainBasedConfig = orchestratorChainConfig
+	}
🧹 Nitpick comments (6)
configs/config.example.yml (1)

193-197: Nit: clarify fallback behavior in example comments

Given the implementation falls back to post-commit if the pre-commit cursor lookup fails, consider noting that here to set operator expectations.

For example: “If pre-commit cursor lookup fails, the system automatically falls back to post-commit publishing.”

internal/storage/postgres.go (1)

347-364: Guard against nil chainId for robustness

If chainId ever propagates as nil, chainId.String() will panic. Add a fast-fail to ease debugging.

func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) {
+ if chainId == nil {
+   return nil, fmt.Errorf("chainId cannot be nil")
+ }
  query := `SELECT cursor_value FROM cursors WHERE cursor_type = 'publish' AND chain_id = $1`
  ...
internal/orchestrator/committer_test.go (2)

355-396: Pre-commit mode test: good async verification; minor improvement possible

Using a channel signaled by SetLastPublishedBlockNumber is a solid way to await the async path. Consider also asserting that post-commit publish path is skipped (e.g., by verifying no second cursor update if you later add it there).


397-429: Avoid fixed time.Sleep to reduce flakiness

Replace the fixed Sleep(100ms) with assert.Eventually (or require.Eventually) to robustly wait for "not called" condition without relying on arbitrary timing.

- time.Sleep(100 * time.Millisecond)
- mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
+ assert.Eventually(t, func() bool {
+   // Always true unless the method was called, in which case AssertNotCalled will fail below
+   return true
+ }, 2*time.Second, 25*time.Millisecond)
+ mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
internal/orchestrator/committer.go (2)

307-329: Pre-commit publish flow: behavior matches PR intent; consider edge-case note

  • Falls back to post-commit on cursor read error.
  • Skips duplicate publish when lastPublished >= highestBlock.
  • Updates cursor only after successful publish, which is correct.

Optional: document that in pre-commit failures, it’s possible to observe “published > committed” temporarily by design.


339-345: Post-commit publish retained; consider DRYing the publish call

The publish goroutine is duplicated across pre/post paths. Consider extracting a tiny helper (e.g., c.asyncPublish(blockData)) to keep logs and behavior consistent.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7bfa6de and 4c5320e.

📒 Files selected for processing (10)
  • cmd/root.go (2 hunks)
  • configs/config.example.yml (1 hunks)
  • configs/config.go (1 hunks)
  • configs/test_config.yml (1 hunks)
  • internal/orchestrator/committer.go (2 hunks)
  • internal/orchestrator/committer_test.go (3 hunks)
  • internal/storage/clickhouse.go (1 hunks)
  • internal/storage/connector.go (1 hunks)
  • internal/storage/postgres.go (1 hunks)
  • test/mocks/MockIStagingStorage.go (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-04T11:28:38.057Z
Learnt from: iuwqyir
PR: thirdweb-dev/insight#216
File: internal/orchestrator/chain_tracker.go:45-53
Timestamp: 2025-06-04T11:28:38.057Z
Learning: Go's math/big.Int type has a Float64() method that returns (float64, bool), where the boolean indicates if the conversion was exact.

Applied to files:

  • internal/orchestrator/committer_test.go
🧬 Code Graph Analysis (1)
internal/orchestrator/committer.go (3)
internal/common/block.go (1)
  • Block (8-33)
configs/config.go (1)
  • Cfg (208-208)
internal/publisher/publisher.go (1)
  • Publisher (21-24)
🔇 Additional comments (9)
configs/test_config.yml (1)

67-67: LGTM: test config declares publisher.mode

This keeps tests deterministic with explicit mode. Consider a separate test case toggling pre-commit path if not already covered.

cmd/root.go (1)

254-254: LGTM: viper binding for publisher.mode

Binding is correct and matches the flag name and config path.

internal/storage/clickhouse.go (1)

1079-1096: Cursor query is already safe and needs no further hardening

  • The ClickHouse cursors table is created as a ReplacingMergeTree ordered by (chain_id, cursor_type), so with the FINAL modifier it always returns at most one deduplicated row per key.
  • Calling QueryRow on that query will yield either exactly one row or sql.ErrNoRows—adding LIMIT 1 is therefore redundant.
  • Parameterization isn’t supported by the ClickHouse client here, and none of the interpolated values are user-supplied.
internal/storage/postgres.go (1)

366-374: Upsert looks correct and consistent with existing cursor handling

INSERT ... ON CONFLICT against (chain_id, cursor_type) is the right choice; passing numeric values as strings matches existing patterns.

internal/orchestrator/committer_test.go (2)

5-5: Import of errors is correct and used

Usage in fallback test justifies the new import.


328-332: Including ChainId in test block data is necessary

Commit/publish code reads chain ID; this makes tests closer to real execution paths.

internal/orchestrator/committer.go (1)

298-304: Highest-block detection is correct and efficient

Single pass with big.Int comparison is clean; no redundant sorts here.

test/mocks/MockIStagingStorage.go (2)

75-132: Mock: GetLastPublishedBlockNumber additions look correct

Signatures and generated helpers match the interface and test usage.


133-178: Mock: SetLastPublishedBlockNumber additions look correct

Covers Run/Return/RunAndReturn; aligns with tests.

cmd/root.go Outdated
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
rootCmd.PersistentFlags().String("publisher-mode", "post-commit", "Publisher mode: pre-commit or post-commit")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate --publisher-mode at startup to fail fast on invalid values

The help string documents allowed values, but invalid inputs will silently flow through as strings.

Add a validation step after config load (e.g., in initConfig() or inside configs.LoadConfig) to ensure publisher.mode ∈ {"pre-commit","post-commit"} and normalize case. Prefer returning an error (or exiting) if invalid to avoid unexpected runtime behavior.

🤖 Prompt for AI Agents
In cmd/root.go at line 138, the --publisher-mode flag accepts any string without
validation, which can cause silent errors. Add a validation step after loading
the configuration (such as in initConfig() or configs.LoadConfig) to check if
the publisher.mode value is either "pre-commit" or "post-commit"
(case-insensitive). Normalize the input to lowercase and if the value is
invalid, return an error or exit the program immediately to prevent unexpected
runtime behavior.


type PublisherConfig struct {
Enabled bool `mapstructure:"enabled"`
Mode string `mapstructure:"mode"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate and constrain publisher.mode to known values

Mode is a free-form string. Add typed constants and validate/normalize it during config load to prevent silent misconfiguration.

Example (outside this line range):

// Near other config-level constants
const (
	PublisherModePreCommit  = "pre-commit"
	PublisherModePostCommit = "post-commit"
)

// In LoadConfig(), after viper.Unmarshal(&Cfg) and before returning
Cfg.Publisher.Mode = strings.TrimSpace(strings.ToLower(Cfg.Publisher.Mode))
if Cfg.Publisher.Mode == "" {
	Cfg.Publisher.Mode = PublisherModePostCommit
}
switch Cfg.Publisher.Mode {
case PublisherModePreCommit, PublisherModePostCommit:
	// ok
default:
	return fmt.Errorf("invalid publisher.mode: %q (allowed: %q, %q)", Cfg.Publisher.Mode, PublisherModePreCommit, PublisherModePostCommit)
}

Optional: expose these constants to other packages to avoid stringly-typed comparisons across the codebase.

🤖 Prompt for AI Agents
In configs/config.go at line 175, the Mode field is a free-form string without
validation, which can lead to silent misconfiguration. Define typed constants
for allowed modes (e.g., PublisherModePreCommit and PublisherModePostCommit)
near other config constants. In the LoadConfig function, after unmarshaling the
config, normalize the Mode value by trimming spaces and converting to lowercase,
set a default if empty, and validate it against the allowed constants, returning
an error if invalid. Optionally, export these constants for use in other
packages to avoid stringly-typed comparisons.

Comment on lines +1098 to +1101
func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
return c.conn.Exec(context.Background(), query)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use placeholders instead of string interpolation; fix %s with non-string

fmt.Sprintf(... %s, chainId, ...) relies on %s with a non-string. Prefer placeholders and arg binding (consistent with other methods).

-func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
-  query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
-  return c.conn.Exec(context.Background(), query)
-}
+func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
+  query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (?, 'publish', ?)", c.cfg.Database)
+  return c.conn.Exec(context.Background(), query, chainId, blockNumber.String())
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
return c.conn.Exec(context.Background(), query)
}
func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf(
"INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (?, 'publish', ?)",
c.cfg.Database,
)
return c.conn.Exec(context.Background(), query, chainId, blockNumber.String())
}
🤖 Prompt for AI Agents
In internal/storage/clickhouse.go around lines 1098 to 1101, the SQL query uses
fmt.Sprintf with %s placeholders for non-string types like chainId, which is
incorrect and unsafe. Refactor the method to use parameter placeholders in the
query string and pass chainId and blockNumber as arguments to the Exec method,
ensuring proper argument binding and preventing SQL injection.

Comment on lines +86 to 88
GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Specify “not found” and concurrency semantics for publisher cursor methods

Ambiguity here can lead to bugs across drivers:

  • Define whether “no cursor yet” returns (nil, nil) vs (0, nil). Recommend returning (nil, nil) and documenting it to avoid accidental use of zero as a real block height.
  • Ensure SetLastPublishedBlockNumber is idempotent and implemented as an upsert to avoid races when pre-commit publishing runs in a goroutine.

To confirm coverage and consistency, run:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Search implementations of new cursor methods..."
rg -n "GetLastPublishedBlockNumber\(" -A 2
rg -n "SetLastPublishedBlockNumber\(" -A 2

echo "Check committer uses of publisher mode and cursor methods..."
rg -n "publisher\.mode|pre-commit|post-commit|GetLastPublishedBlockNumber|SetLastPublishedBlockNumber" -A 3 internal

Length of output: 10576


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Locate DeleteOlderThan implementations in storage connectors..."
rg -n "func .*DeleteOlderThan" -A 10 internal/storage

Length of output: 1746


Specify “not found” semantics and ensure idempotent upsert for publisher cursor methods

We need to be explicit about what “no cursor yet” means and make sure both drivers handle and document it uniformly, as well as guarantee that setting the cursor is safe under concurrent pre-commit runs.

internal/storage/connector.go:
• Add a doc comment to GetLastPublishedBlockNumber stating that it returns (nil, nil) when no record exists.
• Note in the interface that SetLastPublishedBlockNumber must be idempotent (an upsert).

internal/storage/postgres.go:
• Catch sql.ErrNoRows in GetLastPublishedBlockNumber, and return (nil, nil) instead of propagating the error.
• (Already in place) SetLastPublishedBlockNumber uses ON CONFLICT … DO UPDATE—no change needed here.

internal/storage/clickhouse.go:
• In GetLastPublishedBlockNumber, detect when the query returns no rows (e.g. via rows.Next() or ErrNoRows) and return (nil, nil).
• Change SetLastPublishedBlockNumber to use a deduplicating insert (for example, a REPLACE INTO or a dedup‐capable MergeTree engine) so concurrent calls don’t create duplicate cursor entries; reading with FINAL alone is not sufficient to guarantee idempotence under race.

– Tests:
• Update any mocks to expect (nil, nil) instead of an error for the “no cursor” case.
• Add coverage for concurrent pre-commit calls to SetLastPublishedBlockNumber and verify that only one logical update occurs.

🤖 Prompt for AI Agents
In internal/storage/connector.go around lines 86 to 88, add a doc comment to
GetLastPublishedBlockNumber specifying it returns (nil, nil) when no record
exists, and note that SetLastPublishedBlockNumber must be idempotent as an
upsert. In internal/storage/postgres.go, modify GetLastPublishedBlockNumber to
catch sql.ErrNoRows and return (nil, nil) instead of an error. In
internal/storage/clickhouse.go, update GetLastPublishedBlockNumber to detect no
rows returned and return (nil, nil), and change SetLastPublishedBlockNumber to
use a deduplicating insert method like REPLACE INTO or a dedup-capable MergeTree
engine to ensure idempotence under concurrent calls. Finally, update tests and
mocks to expect (nil, nil) for no cursor cases and add tests verifying
idempotent behavior of concurrent SetLastPublishedBlockNumber calls.

jakeloo and others added 2 commits August 9, 2025 03:22
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Signed-off-by: Jake Loo <[email protected]>
@jakeloo jakeloo closed this Aug 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants