Skip to content

Conversation

@nmarcetic
Copy link

@nmarcetic nmarcetic commented Sep 29, 2025

This PR adds and rebases data-pipeline work into the base v0.50.x-inj branch.

Summary by CodeRabbit

  • New Features

    • Introduces publishable event streaming for blocks and transactions, including ordered mixing of ABCI and custom publish events.
    • Adds an opt-in switch to enable event publishing and exposes a stream for consumers.
    • Ensures block commit includes previous/new app hashes alongside emitted events.
    • Provides placeholder ABCI events mirroring published events for easier indexing.
  • Configuration

    • Adds streaming.mqpub settings: enabled, seed-brokers, topic-name, and control-port, with sensible defaults.
  • Tests

    • Adds comprehensive integration tests covering begin/end block and per-transaction event publishing.

Signed-off-by: 0x0f0d0 <goodbumsu@gmail.com>
@coderabbitai
Copy link

coderabbitai bot commented Sep 29, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Adds a publish-event subsystem across BaseApp and types: new event types, per-block and per-tx aggregation with true ordering, filtering of ABCI vs publish events, context plumbing via a PublishEventManager, placeholder event emission, commit-time flushing via a channel, and new streaming config for MQ publishing. Tests updated and added accordingly.

Changes

Cohort / File(s) Summary
BaseApp core publish integration
baseapp/abci.go, baseapp/baseapp.go, baseapp/publish_event.go
Wires publish-event aggregation into BeginBlock/DeliverTx/EndBlock, computes per-tx EventSet with TrueOrder, filters ABCI vs publish events, extends executeTxs signature, stores flush data, initializes channel and flag, and adds PublishBlockEvents method used at Commit.
Types: publish-event model and context plumbing
types/publish_events.go, types/events_placeholder.go, types/context.go
Introduces PublishEventManagerI, concrete manager, PublishEvent and PublishEvents types; adds EventPlaceholderManager emitting ABCI placeholders; integrates PublishEventManager into Context lifecycle, CacheContext, and writeCache.
Server config for MQ publishing
server/config/config.go, server/config/toml.go
Adds MQPubConfig and StreamingConfig.MQPub; initializes defaults; extends TOML template with [streaming.mqpub] block and related keys.
New publish event tests
baseapp/publish_event_test.go
Adds end-to-end tests for publish events across Begin/EndBlock and DeliverTx, including custom StringPublishEvent and verification of ABCI and publish event ordering and app hash chaining.
Test utilities and custom event emission
baseapp/utils_test.go
Adds TestPublishEvent type and helpers; threads emitCustomEvent through counter services and ante handler; emits custom publish events via PublishEventManager and placeholders.
Test updates for new struct fields
baseapp/abci_test.go, baseapp/baseapp_test.go, baseapp/streaming_test.go
Updates CounterServerImpl/Counter2ServerImpl literals to include new boolean field in multiple tests; no logic changes otherwise.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Cons as Consensus
  participant BA as BaseApp
  participant Ctx as Context
  participant PEM as PublishEventManager
  participant Exec as Tx Executor
  participant Store as State/Commit

  Cons->>BA: BeginBlock(req)
  BA->>Ctx: NewContext + WithPublishEventManager
  Ctx->>PEM: (initialized)
  BA->>BA: Emit ABCI + Publish events (begin)
  BA->>BA: filterOutPublishEvents(begin events) => EventSet(TrueOrder)

  loop for each Tx
    Cons->>BA: DeliverTx(tx)
    BA->>Exec: Run Ante + Msgs
    Exec->>PEM: Emit publish events
    BA->>BA: Collect ABCI events
    BA->>BA: filterOutPublishEvents(tx events) => EventSet(TrueOrder)
    BA-->>Cons: ExecTxResult (+EventSet)
  end

  Cons->>BA: EndBlock()
  BA->>BA: Emit ABCI + Publish events (end)
  BA->>BA: filterOutPublishEvents(end events) => EventSet(TrueOrder)

  Cons->>BA: Commit()
  BA->>Store: Write state, compute NewAppHash
  BA->>BA: Assemble PublishEventFlush {Height, PrevAppHash, NewAppHash, BlockEvents, TxEvents}
  BA->>BA: PublishBlockEvents(flush) [if EnablePublish]
  BA-->>Cons: CommitResp(NewAppHash)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I thump my paw—events in line,
ABCI crumbs, and publish shine.
Begin to end, each carrot queued,
TrueOrder maps my garden’s mood.
A channel whispers: block’s complete!
Hash to hash, a tidy treat.
MQ dreams—next field to eat. 🥕

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “CP-608 Adds data-pipeline work into base.” is overly generic and uses internal jargon without clearly summarizing the primary additions, such as the new publish‐event subsystem and related BaseApp changes. Rename the title to succinctly convey the main change, for example “Integrate publish‐event streaming support into BaseApp,” to clearly reflect the added event‐publishing pipeline.
Docstring Coverage ⚠️ Warning Docstring coverage is 19.05% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines 711 to 717
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) {
var events []abci.Event
var publishEvents sdk.PublishEvents

if err := app.checkHalt(req.Height, req.Time); err != nil {
return nil, err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change potentially affects state.

Call sequence:

(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).internalFinalizeBlock (baseapp/abci.go:712)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).FinalizeBlock (baseapp/abci.go:893)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what? how?

@maxim-inj
Copy link
Collaborator

@nmarcetic FAIL github.com/cosmos/cosmos-sdk/server/config test fails, other CI checks can be ignored.

@maxim-inj maxim-inj marked this pull request as ready for review October 2, 2025 09:28
@github-actions
Copy link

github-actions bot commented Oct 2, 2025

@maxim-inj your pull request is missing a changelog!

@maxim-inj
Copy link
Collaborator

@maxim-inj your pull request is missing a changelog!

it's not my PR lol

@maxim-inj
Copy link
Collaborator

maxim-inj commented Oct 2, 2025

The changeset is big, not ready to merge that into mainline yet, needs more continuous testing under testnet and devnet conditions.
So I made a special branch v0.50.x-datapipe-inj and will merge there.

In core, we'll have

  • v1.16.x-datapipe
  • v1.17.x-datapipe
    ... etc

Until it's tested and adopted. Then it will be merged into main SDK fork that goes to validators, etc.

@maxim-inj maxim-inj changed the base branch from v0.50.x-inj to v0.50.x-datapipe-inj October 2, 2025 09:36
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
server/config/config.go (1)

185-191: Consider adding field-level documentation.

While the struct fields are reasonably self-explanatory, adding godoc comments for each field would improve maintainability, especially for SeedBrokers and ControlPort which may benefit from clarification about their purpose and expected format.

Example:

 // MQPubConfig defines application configuration for MessageQueue publish service
 MQPubConfig struct {
+	// Enabled determines whether the MessageQueue publish service is active
 	Enabled     bool     `mapstructure:"enabled"`
+	// SeedBrokers is a list of initial broker addresses for the message queue
 	SeedBrokers []string `mapstructure:"seed-brokers"`
+	// TopicName is the target topic for published messages
 	TopicName   string   `mapstructure:"topic-name"`
+	// ControlPort is the server port for message queue control operations
 	ControlPort int      `mapstructure:"control-port"`
 }
baseapp/publish_event_test.go (1)

195-197: Drop stray fmt.Println from the test

This fmt.Println will spam test output every block. Please remove it to keep the suite quiet.

baseapp/abci.go (1)

870-871: Preallocate txEventSet for fewer allocations

We already know the slice will end up with len(txs) elements—preallocating avoids growth reallocations in hot paths.

-	txEventSet := make([]EventSet, 0)
+	txEventSet := make([]EventSet, 0, len(txs))
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 615721d and 2ca23e7.

📒 Files selected for processing (13)
  • baseapp/abci.go (8 hunks)
  • baseapp/abci_test.go (3 hunks)
  • baseapp/baseapp.go (2 hunks)
  • baseapp/baseapp_test.go (3 hunks)
  • baseapp/publish_event.go (1 hunks)
  • baseapp/publish_event_test.go (1 hunks)
  • baseapp/streaming_test.go (1 hunks)
  • baseapp/utils_test.go (5 hunks)
  • server/config/config.go (2 hunks)
  • server/config/toml.go (1 hunks)
  • types/context.go (5 hunks)
  • types/events_placeholder.go (1 hunks)
  • types/publish_events.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
baseapp/publish_event.go (2)
types/publish_events.go (1)
  • PublishEvents (35-35)
baseapp/baseapp.go (1)
  • BaseApp (67-213)
types/context.go (3)
types/publish_events.go (3)
  • PublishEventManagerI (3-7)
  • PublishEventManager (13-15)
  • NewPublishEventManager (17-19)
types/events_placeholder.go (1)
  • EventPlaceholderManager (7-10)
types/events.go (3)
  • NewEventManager (39-41)
  • EventManager (35-37)
  • Events (167-167)
baseapp/streaming_test.go (2)
baseapp/testutil/messages.pb.go (1)
  • RegisterCounterServer (369-371)
baseapp/utils_test.go (1)
  • CounterServerImpl (171-177)
types/events_placeholder.go (2)
types/publish_events.go (3)
  • PublishEventManagerI (3-7)
  • PublishEvents (35-35)
  • PublishEvent (31-33)
types/events.go (2)
  • EventManagerI (18-25)
  • NewEvent (172-180)
baseapp/baseapp.go (2)
types/publish_events.go (1)
  • PublishEvents (35-35)
baseapp/publish_event.go (1)
  • PublishEventFlush (21-27)
baseapp/abci.go (4)
types/publish_events.go (3)
  • PublishEvents (35-35)
  • PublishEventManager (13-15)
  • NewPublishEventManager (17-19)
types/context.go (1)
  • Context (41-74)
baseapp/publish_event.go (5)
  • PublishEventFlush (21-27)
  • EventSet (15-19)
  • EventType (8-8)
  • EventTypePublish (12-12)
  • EventTypeAbci (11-11)
types/events_placeholder.go (1)
  • PlaceholderEventType (3-3)
baseapp/baseapp_test.go (2)
baseapp/testutil/messages.pb.go (1)
  • RegisterCounterServer (369-371)
baseapp/utils_test.go (1)
  • CounterServerImpl (171-177)
baseapp/abci_test.go (2)
baseapp/testutil/messages.pb.go (2)
  • RegisterCounterServer (369-371)
  • RegisterCounter2Server (441-443)
baseapp/utils_test.go (2)
  • CounterServerImpl (171-177)
  • Counter2ServerImpl (183-189)
baseapp/utils_test.go (2)
types/publish_events.go (2)
  • PublishEvent (31-33)
  • PublishEventManager (13-15)
types/context.go (1)
  • Context (41-74)
baseapp/publish_event_test.go (9)
types/publish_events.go (3)
  • PublishEvent (31-33)
  • PublishEvents (35-35)
  • PublishEventManager (13-15)
baseapp/baseapp.go (2)
  • BaseApp (67-213)
  • NewBaseApp (218-270)
baseapp/publish_event.go (4)
  • PublishEventFlush (21-27)
  • EventType (8-8)
  • EventTypeAbci (11-11)
  • EventTypePublish (12-12)
types/context.go (1)
  • Context (41-74)
types/abci.go (2)
  • BeginBlock (71-73)
  • EndBlock (65-68)
types/events.go (3)
  • EventManager (35-37)
  • Event (164-164)
  • MarkEventsToIndex (318-343)
baseapp/baseapp_test.go (1)
  • NewBaseAppSuite (64-92)
baseapp/testutil/messages.pb.go (1)
  • RegisterCounterServer (369-371)
baseapp/utils_test.go (1)
  • CounterServerImpl (171-177)
🔇 Additional comments (5)
server/config/config.go (2)

176-176: LGTM!

The field addition correctly extends StreamingConfig with the new MQPub configuration, using appropriate typing and mapstructure tags.


273-278: LGTM!

The default configuration appropriately disables the MQPub feature by default with safe empty/zero values for all fields, following the same pattern as other streaming configurations.

baseapp/abci.go (1)

911-925: Clean separation of ABCI and publish streams

Nice job stripping placeholders while preserving the true ordering metadata; this keeps ABCI responses clean without losing reconstruction fidelity.

types/events_placeholder.go (1)

16-20: Good placeholder wiring

Emitting the placeholder immediately after the publish event keeps ABCI ordering consistent and makes the downstream filtering straightforward.

baseapp/utils_test.go (1)

119-144: Helpful test fixture

The dedicated TestPublishEvent helper makes it painless to exercise the new publish pipeline in tests—nice touch.

enabled = {{ .Streaming.MQPub.Enabled }}
# seed-brokers defines where to be connected when the message gets published.
seed-brokers = [{{ range .Streaming.MQPub.SeedBrokers }}{{ printf "%q, " . }}{{end}}]
# topic-name defines the to be connected when message gets published.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix typo in comment.

The comment should read "topic-name defines the topic to be connected to when message gets published" instead of "to be connected when message gets published".

Apply this diff to fix the typo:

-# topic-name defines the to be connected when message gets published.
+# topic-name defines the topic to be connected to when message gets published.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# topic-name defines the to be connected when message gets published.
# topic-name defines the topic to be connected to when message gets published.
🤖 Prompt for AI Agents
In server/config/toml.go around line 243, update the comment text to fix the
typo: replace the existing comment "# topic-name defines the to be connected
when message gets published." with a corrected sentence such as "# topic-name
defines the topic to be connected to when a message gets published." — modify
only the comment line to the corrected wording.

Comment on lines +400 to +407
pem := c.PublishEventManager()
// EventPlaceholderManager already emitted event placeholders to the EventManager
// so we do not emit them again by unwrapping it.
if pem.(*EventPlaceholderManager) != nil {
pem = pem.(*EventPlaceholderManager).publishEventManager
}
pem.EmitEvents(cc.PublishEventManager().Events())

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Prevent CacheContext from panicking when no publish manager is set

pem.(*EventPlaceholderManager) != nil is always true because PublishEventManager() always returns a non-nil placeholder. In the case where the parent context was instantiated without a publish manager (e.g. sdk.Context{} in tests that only set an EventManager), placeholder.publishEventManager is nil, so the next line assigns pem = nil and pem.EmitEvents(...) panics. You can reproduce with:

ctx := sdk.Context{}.WithEventManager(sdk.NewEventManager())
_, write := ctx.CacheContext()
write() // panic

Please guard against a nil publishEventManager before calling EmitEvents.

-       pem := c.PublishEventManager()
-       if pem.(*EventPlaceholderManager) != nil {
-               pem = pem.(*EventPlaceholderManager).publishEventManager
-       }
-       pem.EmitEvents(cc.PublishEventManager().Events())
+       pem := c.PublishEventManager()
+       if placeholder, ok := pem.(*EventPlaceholderManager); ok && placeholder.publishEventManager != nil {
+               placeholder.publishEventManager.EmitEvents(cc.PublishEventManager().Events())
+       }
🤖 Prompt for AI Agents
In types/context.go around lines 400 to 407, the code unwraps an
EventPlaceholderManager and unconditionally assigns its publishEventManager,
which can be nil and causes a panic when EmitEvents is called; change the logic
to perform a safe type assertion and check that the resulting
publishEventManager is non-nil before overwriting pem and before calling
EmitEvents (i.e., if pem is an *EventPlaceholderManager and its
publishEventManager != nil then set pem to that inner manager, otherwise leave
pem as-is or skip EmitEvents if no manager is present).

Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com>
@maxim-inj maxim-inj merged commit b4e50a7 into v0.50.x-datapipe-inj Oct 13, 2025
42 of 48 checks passed
@maxim-inj maxim-inj deleted the CP-608/v0.50.x-inj-data-pipeline branch October 13, 2025 05:34
@coderabbitai coderabbitai bot mentioned this pull request Nov 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants