Skip to content

Commit 69e3a2a

Browse files
committed
Set origin in BEGIN message
1 parent 70e9b87 commit 69e3a2a

File tree

3 files changed

+21
-14
lines changed

3 files changed

+21
-14
lines changed

src/yb/cdc/cdc_service.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ message RowMessage {
558558
// for other ops such as DDL/BEGIN/COMMIT/SAFEPOINT. Not populated in the Snapshot phase.
559559
optional bytes primary_key = 18;
560560

561-
// Replication origin id associated with the transaction. Set on DML and COMMIT Ops.
561+
// Replication origin id associated with the transaction. Set on BEGIN, DML and COMMIT Ops.
562562
optional uint32 xrepl_origin_id = 19;
563563
}
564564

src/yb/cdc/cdcsdk_producer.cc

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,13 +1323,16 @@ Status PopulateCDCSDKIntentRecord(
13231323
}
13241324

13251325
void FillBeginRecordForSingleShardTransaction(
1326-
const uint64_t& commit_timestamp, GetChangesResponsePB* resp,
1326+
const uint64_t& commit_timestamp, uint32_t xrepl_origin_id, GetChangesResponsePB* resp,
13271327
CDCThroughputMetrics* throughput_metrics) {
13281328
CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records();
13291329
RowMessage* row_message = proto_record->mutable_row_message();
13301330

13311331
row_message->set_op(RowMessage_Op_BEGIN);
13321332
row_message->set_commit_time(commit_timestamp);
1333+
if (xrepl_origin_id) {
1334+
row_message->set_xrepl_origin_id(xrepl_origin_id);
1335+
}
13331336
// No need to add record_time to the Begin record since it does not have any intent associated
13341337
// with it.
13351338

@@ -1371,8 +1374,14 @@ Status PopulateCDCSDKWriteRecord(
13711374
GetChangesResponsePB* resp,
13721375
client::YBClient* client,
13731376
CDCThroughputMetrics* throughput_metrics) {
1377+
uint32_t xrepl_origin_id = 0;
1378+
if (msg->write().has_xrepl_origin_id()) {
1379+
xrepl_origin_id = msg->write().xrepl_origin_id();
1380+
}
1381+
13741382
if (FLAGS_cdc_populate_end_markers_transactions) {
1375-
FillBeginRecordForSingleShardTransaction(msg->hybrid_time(), resp, throughput_metrics);
1383+
FillBeginRecordForSingleShardTransaction(
1384+
msg->hybrid_time(), xrepl_origin_id, resp, throughput_metrics);
13761385
}
13771386

13781387
auto tablet_ptr = VERIFY_RESULT(tablet_peer->shared_tablet());
@@ -1400,7 +1409,6 @@ Status PopulateCDCSDKWriteRecord(
14001409
auto table_name = tablet_ptr->metadata()->table_name();
14011410
auto table_id = tablet_ptr->metadata()->table_id();
14021411
SchemaPackingStorage* schema_packing_storage = &schema_packing_storages->at(table_id);
1403-
uint32_t xrepl_origin_id = 0;
14041412

14051413
// TODO: This function and PopulateCDCSDKIntentRecord have a lot of code in common. They should
14061414
// be refactored to use some common row-column iterator.
@@ -1506,11 +1514,6 @@ Status PopulateCDCSDKWriteRecord(
15061514
SetCDCSDKOpId(msg->id().term(), msg->id().index(), record_batch_idx, "", cdc_sdk_op_id_pb);
15071515
is_packed_row_record = false;
15081516

1509-
// Populate PostgreSQL replication origin id if available.
1510-
if (msg->write().has_xrepl_origin_id()) {
1511-
xrepl_origin_id = msg->write().xrepl_origin_id();
1512-
}
1513-
15141517
if (xrepl_origin_id) {
15151518
row_message->set_xrepl_origin_id(xrepl_origin_id);
15161519
}
@@ -1783,13 +1786,17 @@ void SetKeyWriteId(string key, int32_t write_id, CDCSDKCheckpointPB* checkpoint)
17831786

17841787
void FillBeginRecord(
17851788
const TransactionId& transaction_id, const uint64_t& commit_timestamp,
1786-
GetChangesResponsePB* resp, CDCThroughputMetrics* throughput_metrics) {
1789+
uint32_t xrepl_origin_id, GetChangesResponsePB* resp,
1790+
CDCThroughputMetrics* throughput_metrics) {
17871791
CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records();
17881792
RowMessage* row_message = proto_record->mutable_row_message();
17891793

17901794
row_message->set_op(RowMessage_Op_BEGIN);
17911795
row_message->set_transaction_id(transaction_id.ToString());
17921796
row_message->set_commit_time(commit_timestamp);
1797+
if (xrepl_origin_id) {
1798+
row_message->set_xrepl_origin_id(xrepl_origin_id);
1799+
}
17931800
// No need to add record_time to the Begin record since it does not have any intent associated
17941801
// with it.
17951802

@@ -1835,7 +1842,7 @@ Status ProcessIntents(
18351842
auto tablet = VERIFY_RESULT(tablet_peer->shared_tablet());
18361843
if (stream_state->key.empty() && stream_state->write_id == 0 &&
18371844
FLAGS_cdc_populate_end_markers_transactions) {
1838-
FillBeginRecord(transaction_id, commit_time.ToUint64(), resp, throughput_metrics);
1845+
FillBeginRecord(transaction_id, commit_time.ToUint64(), xrepl_origin_id, resp, throughput_metrics);
18391846
TEST_SYNC_POINT("AddBeginRecord::End");
18401847
}
18411848

src/yb/integration-tests/cdcsdk_ysql-test.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13001,13 +13001,13 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) {
1300113001
ASSERT_EQ(tablets.size(), 1);
1300213002
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream());
1300313003

13004-
// Helper: check that all DML records (INSERT/UPDATE/DELETE) in the response carry the expected
13005-
// xrepl_origin_id, and that COMMIT records also carry it (backwards compat).
13004+
// Helper: check that all BEGIN, DML (INSERT/UPDATE/DELETE), and COMMIT records in the response
13005+
// carry the expected xrepl_origin_id.
1300613006
auto verify_origin_id_on_all_records =
1300713007
[](const GetChangesResponsePB& resp, uint32_t expected_origin_id) {
1300813008
for (const auto& record : resp.cdc_sdk_proto_records()) {
1300913009
auto op = record.row_message().op();
13010-
if (op == RowMessage::INSERT || op == RowMessage::UPDATE ||
13010+
if (op == RowMessage::BEGIN || op == RowMessage::INSERT || op == RowMessage::UPDATE ||
1301113011
op == RowMessage::DELETE || op == RowMessage::COMMIT) {
1301213012
if (expected_origin_id != 0) {
1301313013
ASSERT_TRUE(record.row_message().has_xrepl_origin_id())

0 commit comments

Comments
 (0)