Add outbox-json protocol with per-row topic dispatch for Kafka sinks POC#4340
Add outbox-json protocol with per-row topic dispatch for Kafka sinks POC#4340benmeadowcroft wants to merge 14 commits intopingcap:masterfrom
Conversation
Add protocol parse/string and sink.outbox validation. Mirror outbox config in API v2 conversions. Include protocol/config unit tests. Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
Add {column:<name>} parsing and row-aware topic substitution.
Add DDL fallback/default-topic behavior for row-dependent rules.
Verify selector/topic/outbox required columns with tests.
Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
Resolve row topics after outbox eligibility filtering. Track runtime discovered topics and include them in checkpoint fanout. Add sink tests for checkpoint topic union behavior. Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
Implement outbox encoder with one-row-per-message semantics. Emit Id header first, then configured header mappings. Wire message headers through Kafka producer and add tests. Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
…alues, improved logs Tracks use of column placeholders in expression once to avoid repeated expression evaluation Replaced uses of `panic` with `log.Panic` and zap fields to align better with rest of the project. Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
…h Kafka The tests validate the basic protocol support, column topic expression handling, and kafka header support. Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
The first draft of this codec includes fallback handling for DDL and Checkpoint events. We will instead explicitly ignore those operations in the current outbox codec. Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @benmeadowcroft. Thanks for your PR. I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Welcome @benmeadowcroft! |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances TiCDC's capabilities by introducing first-class support for the transactional outbox pattern. It allows for flexible configuration of Kafka message content, including keys, values, and headers, directly from database table columns. A major improvement is the introduction of per-row topic dispatch, enabling dynamic routing of messages to different Kafka topics based on data within each row. These changes empower users to implement more sophisticated event-driven architectures with TiCDC, while robust validation and checkpointing mechanisms ensure reliability. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughAdds end-to-end support for an outbox JSON sink: new API/internal OutboxConfig and conversions, ProtocolOutboxJSON, outbox JSON encoder (INSERT-only, headers), tokenized topic expressions with column placeholders and row-based dispatch, selector/router validations, producer header propagation, discovered row topics with unified checkpointing, and integration tests. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant API as API Layer (api/v2)
participant Config as Config Layer (pkg/config)
participant Router as EventRouter
participant Encoder as Outbox Encoder
participant MQ as Message Queue
App->>API: Create ReplicaConfig with `Sink.OutboxConfig`
API->>Config: ToInternalReplicaConfig() (propagate OutboxConfig)
Config->>Config: Validate OutboxConfig & protocol
Config->>Router: Initialize router (topic expressions)
Note over App,Router: DML processing
App->>Router: Emit RowEvent (INSERT)
Router->>Router: Extract column values for topic placeholders
Router->>Encoder: Pass RowEvent with resolved topic
Encoder->>Encoder: Extract ID/Key/Value and header columns
Encoder->>MQ: Build Message with Headers and emit
MQ->>MQ: Convert headers to broker format & publish
sequenceDiagram
participant ColumnSel as ColumnSelector
participant Router as EventRouter
participant Config as SinkConfig
ColumnSel->>Router: GetTopicDispatchColumns(schema, table)
ColumnSel->>Config: OutboxRequiredColumns()
ColumnSel->>ColumnSel: required := topicCols ∪ outboxCols
ColumnSel->>ColumnSel: Verify retained columns include required
alt missing required column
ColumnSel-->>ColumnSel: Fail VerifyTables (ErrColumnSelectorFailed)
else
ColumnSel-->>ColumnSel: Accept selection
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces significant new functionality to support the transactional outbox pattern and per-row topic dispatch for Kafka sinks. The changes are extensive, including a major refactoring of topic expression parsing to support column-based placeholders, updates to the Kafka and Pulsar sinks to handle dynamic topics, and the addition of a new outbox-json protocol. The implementation is robust and well-supported by new unit and integration tests. My review includes one suggestion to refactor duplicated code to improve long-term maintainability.
Note: Security Review did not run due to the size of the PR.
|
Caution Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted. Error details |
…ional comments Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
… address review comments Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
downstreamadapter/sink/kafka/sink.go (1)
260-267: Add a static-topic fast path incalculateKeyPartitions.
GetTopicForRowChange+GetPartitionNumnow run per row even when the matched rule has no{column:...}placeholders and the topic is table-static. That adds avoidable hot-path overhead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/kafka/sink.go` around lines 260 - 267, The hot-path calls to s.comp.eventRouter.GetTopicForRowChange and s.comp.topicManager.GetPartitionNum should be avoided inside calculateKeyPartitions for rules that are table-static and have no {column:...} placeholders; detect that condition (use the matched rule metadata or pattern that indicates "table-static" and absence of column placeholders) and compute topic and partitionNum once outside the per-row loop (e.g., when the rule is matched or when calculateKeyPartitions is invoked) and reuse those values for all rows, falling back to the existing per-row GetTopicForRowChange/GetPartitionNum logic only when placeholders exist; update calculateKeyPartitions, and where it is called, to accept or read the precomputed topic/partitionNum so the per-row path no longer calls GetTopicForRowChange/GetPartitionNum.downstreamadapter/sink/pulsar/sink.go (1)
354-361: Add a static-topic fast path incalculateKeyPartitions.For non-placeholder topic rules, topic/partition lookup is invariant per table. Doing this per row adds avoidable hot-path overhead.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/pulsar/sink.go` around lines 354 - 361, The calculateKeyPartitions hot path currently calls s.comp.eventRouter.GetTopicForRowChange and s.comp.topicManager.GetPartitionNum per row; add a static-topic fast path by detecting non-placeholder topic rules (i.e., rules that do not depend on row data) and caching the resolved topic -> partition count per table (use a map keyed by table ID/name stored on the sink or s.comp with a sync.RWMutex or sync.Map for thread-safety). On first encounter of a table with a non-placeholder rule, call GetTopicForRowChange and GetPartitionNum once, store the result, and then in calculateKeyPartitions return the cached partitionNum without per-row lookups; still fall back to the existing per-row logic for placeholder/row-dependent rules and invalidate/update the cache if routing rules can change at runtime.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/kafka/sink.go`:
- Around line 557-599: discoveredRowTopics is only ever appended to (via
addDiscoveredRowTopic) causing unbounded growth and expensive checkpoint fanout;
change the representation from map[string]struct{} to map[string]time.Time (or
similar last-seen token) and add eviction: update addDiscoveredRowTopic to
record a timestamp when a topic is seen, implement a
pruneStaleDiscoveredRowTopics (or prune inside snapshotDiscoveredRowTopics) that
removes entries older than a configured retention window or when the map exceeds
a max capacity, and call that pruning under the same discoveredRowTopicsMu so
snapshotDiscoveredRowTopics returns a bounded set; ensure functions referenced
are addDiscoveredRowTopic, snapshotDiscoveredRowTopics and any new prune helper
and keep mutex usage consistent.
In `@downstreamadapter/sink/pulsar/sink.go`:
- Around line 535-579: The discoveredRowTopics map (used by
addDiscoveredRowTopic, snapshotDiscoveredRowTopics and read by
getCheckpointTopics) is unbounded and will grow without eviction; change the
implementation to cap growth by storing metadata (e.g., lastSeen timestamp or
insertion order) instead of a plain map and evict stale/least-recent topics on
add or during snapshot; for example maintain a map[string]time.Time or a small
LRU structure and in addDiscoveredRowTopic update lastSeen and, if the size
exceeds a configured max (or topics older than a configured TTL), remove entries
before returning so snapshotDiscoveredRowTopics only returns currently valid
topics and getCheckpointTopics stays bounded.
In `@pkg/config/sink.go`:
- Around line 1106-1114: The validate() logic in pkg/config/sink.go currently
trims only for emptiness checks but does not persist normalized values, causing
fields like IDColumn/KeyColumn/ValueColumn to retain surrounding whitespace;
update validate() to call strings.TrimSpace and assign the trimmed results back
to o.IDColumn, o.KeyColumn and o.ValueColumn before performing the empty checks
(and do the same for the duplicate block around the 1117-1124 section), so
stored config uses trimmed column names during offset resolution/encoding.
In `@pkg/sink/codec/outbox/encoder.go`:
- Around line 136-137: Clean() is currently a no-op but must clear the encoder's
buffered state: update Encoder.Clean to reset e.messages (e.g. set e.messages =
nil or e.messages = e.messages[:0] to release/clear buffered entries) and also
reset any related counters or length fields on the Encoder instance if present
so no stale messages remain (refer to the Encoder type and its e.messages field
and any associated buffer length fields).
- Around line 70-72: The guard that checks if !event.IsInsert() currently
returns early without invoking the post-flush callback, which can break txn
post-flush callback chaining; modify the branch in the function containing the
event.IsInsert() check so that it calls the provided callback (the txn
post-flush callback passed into this encoder method, e.g.,
cb/onFlush/postFlushCallback) before returning, passing the same nil error (or
appropriate context) as other success paths, then return; keep the rest of the
method semantics unchanged.
---
Nitpick comments:
In `@downstreamadapter/sink/kafka/sink.go`:
- Around line 260-267: The hot-path calls to
s.comp.eventRouter.GetTopicForRowChange and s.comp.topicManager.GetPartitionNum
should be avoided inside calculateKeyPartitions for rules that are table-static
and have no {column:...} placeholders; detect that condition (use the matched
rule metadata or pattern that indicates "table-static" and absence of column
placeholders) and compute topic and partitionNum once outside the per-row loop
(e.g., when the rule is matched or when calculateKeyPartitions is invoked) and
reuse those values for all rows, falling back to the existing per-row
GetTopicForRowChange/GetPartitionNum logic only when placeholders exist; update
calculateKeyPartitions, and where it is called, to accept or read the
precomputed topic/partitionNum so the per-row path no longer calls
GetTopicForRowChange/GetPartitionNum.
In `@downstreamadapter/sink/pulsar/sink.go`:
- Around line 354-361: The calculateKeyPartitions hot path currently calls
s.comp.eventRouter.GetTopicForRowChange and s.comp.topicManager.GetPartitionNum
per row; add a static-topic fast path by detecting non-placeholder topic rules
(i.e., rules that do not depend on row data) and caching the resolved topic ->
partition count per table (use a map keyed by table ID/name stored on the sink
or s.comp with a sync.RWMutex or sync.Map for thread-safety). On first encounter
of a table with a non-placeholder rule, call GetTopicForRowChange and
GetPartitionNum once, store the result, and then in calculateKeyPartitions
return the cached partitionNum without per-row lookups; still fall back to the
existing per-row logic for placeholder/row-dependent rules and invalidate/update
the cache if routing rules can change at runtime.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
downstreamadapter/sink/columnselector/column_selector.godownstreamadapter/sink/eventrouter/event_router.godownstreamadapter/sink/eventrouter/topic/expression.godownstreamadapter/sink/eventrouter/topic/topic.godownstreamadapter/sink/kafka/sink.godownstreamadapter/sink/pulsar/sink.gopkg/config/sink.gopkg/config/sink_test.gopkg/sink/codec/outbox/encoder.gopkg/sink/kafka/sarama_async_producer.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/config/sink_test.go
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
…ic rules For table-static topic rules (no column placeholders), GetTopicForRowChange and GetPartitionNum were called on every row of a DMLEvent even though they produce the same result for the whole event. Add EventRouter.GetTopicForTable, which resolves the topic from schema and table alone when no column placeholders are present. In calculateKeyPartitions for both the Kafka and Pulsar sinks, call GetTopicForTable once per event before the row loop and defer GetPartitionNum to the first row that will actually be emitted, falling back to the existing per-row path only for column-placeholder rules. Skip-only events (e.g. outbox-json update/delete transactions) therefore never trigger a partition lookup. Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
discoveredRowTopics only ever grew: every unique dynamic topic appended an entry that was never removed. For column-placeholder routing rules this causes unbounded memory growth and an ever-expanding set of checkpoint messages sent on every 5-second tick.
Fix by replacing map[string]struct{} with map[string]time.Time (last-seen timestamp). snapshotDiscoveredRowTopics now prunes entries older than discoveredTopicRetention (30 min) inline under the existing mutex, requiring no new goroutine. Topics that become active again are immediately re-added by addDiscoveredRowTopic, so eviction is self-healing.
Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
Signed-off-by: Ben Meadowcroft <ben.meadowcroft@pingcap.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
Can I get some guidance on the Docstring coverage check? I believe all the public functions I added are documented, I can add additional comments to other functions if necessary, but I haven't done that as I didn't want to introduce unnecessary churn on items that weren't touched by this PR. If adding additional comments is wanted I can certainly do that, but wanted to check first on what the expectations were. |
What problem does this PR solve?
Issue Number: ref #4325
What is changed and how it works?
To support the transactional outbox pattern this PR adds support for defining
outboxtable(s) that define columns for the payload, keys, and headers of Kafka messages to send downstream when a new row is inserted into theoutboxtable.Applications using the transactional outbox pattern write domain events to an outbox table in the same database transaction as the business change. TiCDC can then relay those events to a message broker using the application provided payload (this PR supports a JSON encoding specifically).
The other major change in this PR is per-row topic dispatch. This enables a single
outboxtable to have messages dispatched to different topics.Tests
Questions
Will it cause performance regression or break compatibility?
The per-row topic dispatch is one potential area of concern. Previously the topic expressions could be calculated per table, now they may be done per row (to allow per-row topic dispatch). The PR does attempt to avoid this additional cost in the case that no column placeholders are included in the topic dispatch expression.
In terms of compatibility, the new configuration does add to the API surface area. I don't believe it breaks compatibliity, but will defer to reviewers insights on this.
Do you need to update user documentation, design documentation or monitoring documentation?
Yes, user documentation would need to be updated to include the additional configuration related to the new
outbox-jsonprotocol.Release note
Summary by CodeRabbit
New Features
Bug Fixes / Validation
Tests