Attach xrepl_origin_id to every DML CDC event#30493
Attach xrepl_origin_id to every DML CDC event#30493kgalieva wants to merge 4 commits intoyugabyte:masterfrom
xrepl_origin_id to every DML CDC event#30493Conversation
|
@kgalieva tha is for putting this PR out. |
You're right that in PG logical replication, the full transaction is buffered and decoded as a unit — the output plugin sees all changes together with the COMMIT metadata, so COMMIT-level origin_id is sufficient. But CDCSDK consumers receive events as a stream of protobuf records via GetChanges RPCs. The consumer processes RowMessage records one at a time as they arrive. When the consumer's goal is to filter or route events by origin (e.g., discard events from a specific replication origin, or route events to different downstream systems based on origin), it has to hold every DML record in memory until the COMMIT record arrives to learn the origin_id — then go back and process/discard them. With origin_id on every DML record, the consumer can make routing/filtering decisions immediately per-record. For large transactions this avoids buffering entirely. This is a small change — the origin_id is already extracted from the WAL at every point where DML records are created, it just wasn't being set on the RowMessage. |
shishir2001-yb
left a comment
There was a problem hiding this comment.
Looks good to me. cc: @Sumukh-Phalgaonkar can you take a look too.
|
@Sumukh-Phalgaonkar have you had a chance to review this yet? |
|
|
||
| TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { | ||
| ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; | ||
| ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_populate_end_markers_transactions) = true; |
There was a problem hiding this comment.
This is not needed since the flag is true by default.
| ASSERT_EQ(tablets.size(), 1); | ||
| auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream()); | ||
|
|
||
| // Consume the initial schema record. |
There was a problem hiding this comment.
What do you mean by initial schema record. Shouldn't the get changes response below contain change records corresponding to the below insert?
Maybe, you can assert that these records don't contain any origin id.
| ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1, 100)", kTableName)); | ||
| ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); | ||
| change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); | ||
| LOG(INFO) << "Single-shard INSERT: " << change_resp.ShortDebugString(); |
There was a problem hiding this comment.
We generally do not keep any INFO level logs in the tests
|
With this change, the origin_id would be populated in the DMLs as well as the COMMIT messages. For the sake of completeness, I think we should add it in the BEGIN message as well. |
|
You have used all of your free Bugbot PR reviews. To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial. |
✅ Deploy Preview for infallible-bardeen-164bc9 ready!Built without sensitive environment variables
To edit notification comments on pull requests, go to your Netlify project configuration. |
Good suggestion. For multi-shard transactions this was straightforward — |
| ASSERT_EQ(record.row_message().xrepl_origin_id(), expected_origin_id) | ||
| << "Wrong xrepl_origin_id on op=" << RowMessage::Op_Name(op); | ||
| } else { | ||
| // origin_id 0 means local — field should be absent or zero. |
There was a problem hiding this comment.
Please replace the "—" with "-"
Error (TXT5) Bad Charset
Source code should contain only ASCII bytes with ordinal decimal values
between 32 and 126 inclusive, plus linefeed. Do not use UTF-8 or other
multibyte charsets.
13015 ASSERT_EQ(record.row_message().xrepl_origin_id(), expected_origin_id)
13016 << "Wrong xrepl_origin_id on op=" << RowMessage::Op_Name(op);
13017 } else {
>>> 13018 // origin_id 0 means local — field should be absent or zero.
13019 ASSERT_TRUE(!record.row_message().has_xrepl_origin_id() ||
13020 record.row_message().xrepl_origin_id() == 0)
13021 << "Expected no xrepl_origin_id on op=" << RowMessage::Op_Name(op);
| ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); | ||
| cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); | ||
|
|
||
| // --- Local (no origin) path — verify origin_id is 0/absent --- |
There was a problem hiding this comment.
Seme as above, replace it with "-"
|
Have triggered the Unit test suite for this, will update with the results. |
Summary
Attach
xrepl_origin_idto every DML CDC event (INSERT/UPDATE/DELETE), not just COMMIT records.Motivation
pg_replication_originsets an integerorigin_idper session. Today, this value only appears on COMMITRowMessagerecords in CDC output. Individual DML events carryxrepl_origin_id = 0. This forces CDC consumers to buffer all DML events, wait for the COMMIT to learn the origin, then retroactively attribute it — impractical when consuming from independent per-tablet streams.With this change, each DML record carries the origin_id immediately, enabling CDC consumers to route, filter, or tag events on the fly without buffering.
Changes
The
xrepl_origin_idis already extracted from the WAL at every point where DML records are created — it just wasn't being set on theRowMessage. This PR adds that.src/yb/cdc/cdcsdk_producer.ccSingle-shard path (
PopulateCDCSDKWriteRecord): After extractingxrepl_origin_idfrommsg->write(), set it on the DMLrow_messageimmediately.Multi-shard path (
PopulateCDCSDKIntentRecord): Addedxrepl_origin_idparameter to the function signature. The value was already available in the caller (ProcessIntents) but wasn't passed through. Now each DMLrow_messagecreated from intents carries the origin_id.src/yb/cdc/cdc_service.protoUpdated comment on
xrepl_origin_idfield from "Only set for COMMIT Ops" to "Set on DML and COMMIT Ops". No wire format change.Test plan
TestOriginIdpasses unchanged (theget_xrepl_origin_idlambda finds the first non-zero origin_id, which is now a DML record instead of COMMIT — same value)TestOriginIdOnDMLRecordsverifies:xrepl_origin_idxrepl_origin_idxrepl_origin_id(backwards compat)xrepl_origin_idper transactionNote
Medium Risk
Changes CDC output semantics by adding an additional field on high-volume DML records, which could affect downstream consumers that assume it is only present on COMMIT, but the change is additive and tested.
Overview
CDCSDK
RowMessagerecords for INSERT/UPDATE/DELETE now populatexrepl_origin_idimmediately, for both the single-shard write path and the multi-shard intent path (by threadingxrepl_origin_idintoPopulateCDCSDKIntentRecord).Adds an integration test
TestOriginIdOnDMLRecordscovering single-shard and multi-shard transactions and verifying local writes omit/zero the field; updates the proto comment to reflect the expanded semantics (no wire change).Written by Cursor Bugbot for commit c63e4c2. This will update automatically on new commits. Configure here.