-
Notifications
You must be signed in to change notification settings - Fork 28
block headers #283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
block headers #283
Conversation
|
We're building your pull request over on Zeet. |
WalkthroughAdded Kafka headers (chain_id, block_number) to block publish records; updated committer tests and implementation to accept a context and mock GetBlocksPerRequest returning rpc.BlocksPerRequestConfig{Blocks:100}. No exported/public signatures were changed. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant Publisher
participant Kafka
Caller->>Publisher: createBlockMessage(block)
note right of Publisher: Build kgo.Record (Topic/Key/Value)
rect rgba(220,245,235,0.6)
note right of Publisher: New: add Headers\n`chain_id`, `block_number`
end
Publisher->>Kafka: Produce(kgo.Record with headers)
Kafka-->>Publisher: Ack/Result
sequenceDiagram
autonumber
participant Test as Test Harness
participant Committer as Committer
participant RPC as mockRPC
Test->>RPC: mock GetBlocksPerRequest() -> BlocksPerRequestConfig{Blocks:100}
Test->>Committer: cleanupProcessedStagingBlocks(ctx)
rect rgba(255,245,230,0.6)
note right of Committer: Now receives context.Context\nand uses RPC config inside
end
Committer-->>Test: result/cleanup status
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
internal/publisher/publisher.go (5)
274-283: Also set a content_type header for easier broker-side routing and debugging.Many consumers key off a content_type header; adding it is low-cost and avoids payload peeks.
Apply this diff within the headers list:
Headers: []kgo.RecordHeader{ { Key: "chain_id", Value: []byte(block.ChainId.String()), }, { Key: "block_number", Value: []byte(block.Number.String()), }, + { + Key: "content_type", + Value: []byte("application/json"), + }, },
274-283: Deduplicate header keys via constants; reuse across producers.Defining header keys once prevents drift and enables reuse in tx/event/trace records.
Outside the shown lines, add:
const ( headerChainID = "chain_id" headerBlockNumber = "block_number" headerContentType = "content_type" )Then replace string literals here with the constants.
274-283: Consider adding chain_id header to tx/event/trace messages for uniform filtering.Even if block_number isn’t available everywhere, a consistent chain_id header across all topics simplifies consumer configs.
If you choose to do this, mirror the pattern:
// in createTransactionMessage(...) return &kgo.Record{ Topic: p.getTopicName("transactions"), Key: []byte(fmt.Sprintf("transaction-%s-%s-%s", status, tx.ChainId.String(), tx.Hash)), Value: msgJson, + Headers: []kgo.RecordHeader{ + {Key: headerChainID, Value: []byte(tx.ChainId.String())}, + {Key: headerContentType, Value: []byte("application/json")}, + }, }, nil// in createEventMessage(...) return &kgo.Record{ Topic: p.getTopicName("events"), Key: []byte(fmt.Sprintf("event-%s-%s-%s-%d", status, event.ChainId.String(), event.TransactionHash, event.LogIndex)), Value: msgJson, + Headers: []kgo.RecordHeader{ + {Key: headerChainID, Value: []byte(event.ChainId.String())}, + {Key: headerContentType, Value: []byte("application/json")}, + }, }, nil// in createTraceMessage(...) return &kgo.Record{ Topic: p.getTopicName("traces"), Key: []byte(fmt.Sprintf("trace-%s-%s-%s-%v", status, trace.ChainID.String(), trace.TransactionHash, strings.Join(traceAddressStr, ","))), Value: msgJson, + Headers: []kgo.RecordHeader{ + {Key: headerChainID, Value: []byte(trace.ChainID.String())}, + {Key: headerContentType, Value: []byte("application/json")}, + }, }, nil
274-283: Micro: avoid repeated String() conversions.Hoist chainIDStr and blockNumStr once and reuse for Key and Headers.
Outside the shown lines:
chainIDStr := block.ChainId.String() blockNumStr := block.Number.String() return &kgo.Record{ Topic: p.getTopicName("blocks"), Key: []byte(fmt.Sprintf("block-%s-%s-%s", status, chainIDStr, block.Hash)), Value: msgJson, Headers: []kgo.RecordHeader{ {Key: headerChainID, Value: []byte(chainIDStr)}, {Key: headerBlockNumber, Value: []byte(blockNumStr)}, {Key: headerContentType, Value: []byte("application/json")}, }, }, nil
274-283: Add a lightweight unit test to assert headers presence/values.Guard against regressions by asserting header keys and values for createBlockMessage.
I can add a test that builds a minimal common.Block and checks that the produced kgo.Record includes chain_id, block_number, and content_type with expected values—want me to open a follow-up PR?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/publisher/publisher.go(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test
🔇 Additional comments (1)
internal/publisher/publisher.go (1)
274-283: Good addition: block-level Kafka headers are clear and additive-only.Attaching chain_id and block_number as headers improves consumer-side routing/filters without breaking payload contracts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
internal/orchestrator/committer_test.go (4)
25-27: Good: stub GetBlocksPerRequest for NewWorker initialization. Consider constraining call count.To avoid accidental extra invocations (now or in future refactors), pin the expectation count (e.g., Times(1)/Once if available) so the test fails on unexpected retries.
71-73: Duplicate stubbing here is correct; same suggestion on call count.Apply the same Times(1)/Once guard to keep the test strict and non-flaky.
81-82: Context-aware path covered; consider a canceled-context subtest.Add a subtest that passes a canceled ctx and asserts no RPC/storage calls are made, to verify early-exit behavior.
I can draft the subtest if you’d like.
88-88: Nice: verifies cleanup when lastPublishedBlock > 0. Add explicit assertion on calls.Optionally assert the exact invocation count on DeleteStagingDataOlderThan (e.g., AssertNumberOfCalls or typed expecter Times(1)) for stronger guarantees.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/orchestrator/committer_test.go(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/orchestrator/committer_test.go (2)
internal/rpc/rpc.go (1)
BlocksPerRequestConfig(35-40)internal/orchestrator/committer.go (1)
NewCommitter(41-78)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test
🔇 Additional comments (2)
internal/orchestrator/committer_test.go (2)
4-4: Import of context is appropriate.Needed for passing ctx into cleanupProcessedStagingBlocks.
10-10: Import of rpc package is correct.Used for rpc.BlocksPerRequestConfig in mocks.
Summary by CodeRabbit
New Features
Tests
Chores