Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/yb/cdc/cdc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ message RowMessage {
// for other ops such as DDL/BEGIN/COMMIT/SAFEPOINT. Not populated in the Snapshot phase.
optional bytes primary_key = 18;

// Replication origin id associated with the transaction. Only set for COMMIT Ops.
// Replication origin id associated with the transaction. Set on BEGIN, DML and COMMIT Ops.
optional uint32 xrepl_origin_id = 19;
}

Expand Down
41 changes: 29 additions & 12 deletions src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ Result<tablet::TableInfoPtr> GetTableInfoForSysCatalogTable(
Status PopulateCDCSDKIntentRecord(
const OpId& op_id,
const TransactionId& transaction_id,
uint32_t xrepl_origin_id,
const std::vector<docdb::IntentKeyValueForCDC>& intents,
const StreamMetadata& metadata,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
Expand Down Expand Up @@ -1148,6 +1149,10 @@ Status PopulateCDCSDKIntentRecord(
row_message->set_commit_time(commit_time.ToPB());
row_message->set_record_time(intent.intent_ht.hybrid_time().ToUint64());

if (xrepl_origin_id) {
row_message->set_xrepl_origin_id(xrepl_origin_id);
}

if (IsOldRowNeededOnDelete(record_type) &&
(row_message->op() == RowMessage_Op_DELETE)) {
auto read_time = FLAGS_cdc_enable_intra_transactional_before_image
Expand Down Expand Up @@ -1318,13 +1323,16 @@ Status PopulateCDCSDKIntentRecord(
}

void FillBeginRecordForSingleShardTransaction(
const uint64_t& commit_timestamp, GetChangesResponsePB* resp,
const uint64_t& commit_timestamp, uint32_t xrepl_origin_id, GetChangesResponsePB* resp,
CDCThroughputMetrics* throughput_metrics) {
CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records();
RowMessage* row_message = proto_record->mutable_row_message();

row_message->set_op(RowMessage_Op_BEGIN);
row_message->set_commit_time(commit_timestamp);
if (xrepl_origin_id) {
row_message->set_xrepl_origin_id(xrepl_origin_id);
}
// No need to add record_time to the Begin record since it does not have any intent associated
// with it.

Expand Down Expand Up @@ -1366,8 +1374,14 @@ Status PopulateCDCSDKWriteRecord(
GetChangesResponsePB* resp,
client::YBClient* client,
CDCThroughputMetrics* throughput_metrics) {
uint32_t xrepl_origin_id = 0;
if (msg->write().has_xrepl_origin_id()) {
xrepl_origin_id = msg->write().xrepl_origin_id();
}

if (FLAGS_cdc_populate_end_markers_transactions) {
FillBeginRecordForSingleShardTransaction(msg->hybrid_time(), resp, throughput_metrics);
FillBeginRecordForSingleShardTransaction(
msg->hybrid_time(), xrepl_origin_id, resp, throughput_metrics);
}

auto tablet_ptr = VERIFY_RESULT(tablet_peer->shared_tablet());
Expand Down Expand Up @@ -1395,7 +1409,6 @@ Status PopulateCDCSDKWriteRecord(
auto table_name = tablet_ptr->metadata()->table_name();
auto table_id = tablet_ptr->metadata()->table_id();
SchemaPackingStorage* schema_packing_storage = &schema_packing_storages->at(table_id);
uint32_t xrepl_origin_id = 0;

// TODO: This function and PopulateCDCSDKIntentRecord have a lot of code in common. They should
// be refactored to use some common row-column iterator.
Expand Down Expand Up @@ -1501,9 +1514,8 @@ Status PopulateCDCSDKWriteRecord(
SetCDCSDKOpId(msg->id().term(), msg->id().index(), record_batch_idx, "", cdc_sdk_op_id_pb);
is_packed_row_record = false;

// Populate PostgreSQL replication origin id if available.
if (msg->write().has_xrepl_origin_id()) {
xrepl_origin_id = msg->write().xrepl_origin_id();
if (xrepl_origin_id) {
row_message->set_xrepl_origin_id(xrepl_origin_id);
}

// Check whether operation is WRITE or DELETE.
Expand Down Expand Up @@ -1774,13 +1786,17 @@ void SetKeyWriteId(string key, int32_t write_id, CDCSDKCheckpointPB* checkpoint)

void FillBeginRecord(
const TransactionId& transaction_id, const uint64_t& commit_timestamp,
GetChangesResponsePB* resp, CDCThroughputMetrics* throughput_metrics) {
uint32_t xrepl_origin_id, GetChangesResponsePB* resp,
CDCThroughputMetrics* throughput_metrics) {
CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records();
RowMessage* row_message = proto_record->mutable_row_message();

row_message->set_op(RowMessage_Op_BEGIN);
row_message->set_transaction_id(transaction_id.ToString());
row_message->set_commit_time(commit_timestamp);
if (xrepl_origin_id) {
row_message->set_xrepl_origin_id(xrepl_origin_id);
}
// No need to add record_time to the Begin record since it does not have any intent associated
// with it.

Expand Down Expand Up @@ -1826,7 +1842,8 @@ Status ProcessIntents(
auto tablet = VERIFY_RESULT(tablet_peer->shared_tablet());
if (stream_state->key.empty() && stream_state->write_id == 0 &&
FLAGS_cdc_populate_end_markers_transactions) {
FillBeginRecord(transaction_id, commit_time.ToUint64(), resp, throughput_metrics);
FillBeginRecord(
transaction_id, commit_time.ToUint64(), xrepl_origin_id, resp, throughput_metrics);
TEST_SYNC_POINT("AddBeginRecord::End");
}

Expand Down Expand Up @@ -1868,10 +1885,10 @@ Status ProcessIntents(
// Need to populate the CDCSDKRecords
if (!keyValueIntents->empty()) {
RETURN_NOT_OK(PopulateCDCSDKIntentRecord(
op_id, transaction_id, *keyValueIntents, metadata, tablet_peer, enum_oid_label_map,
composite_atts_map, request_source, cached_schema_details, schema_packing_storages, resp,
consumption, &write_id, &reverse_index_key, commit_time, client, end_of_transaction,
throughput_metrics));
op_id, transaction_id, xrepl_origin_id, *keyValueIntents, metadata, tablet_peer,
enum_oid_label_map, composite_atts_map, request_source, cached_schema_details,
schema_packing_storages, resp, consumption, &write_id, &reverse_index_key, commit_time,
client, end_of_transaction, throughput_metrics));
}

if (end_of_transaction) {
Expand Down
89 changes: 89 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12987,6 +12987,95 @@ TEST_F(CDCSDKYsqlTest, TestOriginId) {
cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint();
}

TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_populate_end_markers_transactions) = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed since the flag is true by default.

ASSERT_OK(SetUpWithParams(1 /* rf */, 1 /* num_masters*/));
const auto kOrigin1 = "origin1";
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));

ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_create('$0');", kOrigin1));

auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 1);
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream());

// Helper: check that all BEGIN, DML (INSERT/UPDATE/DELETE), and COMMIT records in the response
// carry the expected xrepl_origin_id.
auto verify_origin_id_on_all_records =
[](const GetChangesResponsePB& resp, uint32_t expected_origin_id) {
for (const auto& record : resp.cdc_sdk_proto_records()) {
auto op = record.row_message().op();
if (op == RowMessage::BEGIN || op == RowMessage::INSERT || op == RowMessage::UPDATE ||
op == RowMessage::DELETE || op == RowMessage::COMMIT) {
if (expected_origin_id != 0) {
ASSERT_TRUE(record.row_message().has_xrepl_origin_id())
<< "Expected xrepl_origin_id on op=" << RowMessage::Op_Name(op);
ASSERT_EQ(record.row_message().xrepl_origin_id(), expected_origin_id)
<< "Wrong xrepl_origin_id on op=" << RowMessage::Op_Name(op);
} else {
// origin_id 0 means local - field should be absent or zero.
ASSERT_TRUE(!record.row_message().has_xrepl_origin_id() ||
record.row_message().xrepl_origin_id() == 0)
<< "Expected no xrepl_origin_id on op=" << RowMessage::Op_Name(op);
}
}
}
};

// Insert a row without any replication origin and consume the records.
// These records should not carry any origin id.
ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (0, 0)", kTableName));
auto change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets));
ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 0));
auto cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint();

// --- Single-shard (autocommit) path ---
// INSERT with origin.
ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1));
ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1, 100)", kTableName));
ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()"));
change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint));
ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1));
cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint();

// UPDATE with origin.
ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1));
ASSERT_OK(conn.ExecuteFormat(
"UPDATE $0 SET $1 = 200 WHERE $2 = 1", kTableName, kValueColumnName, kKeyColumnName));
ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()"));
change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint));
ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1));
cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint();

// DELETE with origin.
ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1));
ASSERT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = 1", kTableName, kKeyColumnName));
ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()"));
change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint));
ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1));
cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint();

// --- Multi-shard (explicit transaction) path ---
ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1));
ASSERT_OK(conn.Execute("BEGIN"));
ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (2, 200)", kTableName));
ASSERT_OK(conn.ExecuteFormat(
"UPDATE $0 SET $1 = 300 WHERE $2 = 2", kTableName, kValueColumnName, kKeyColumnName));
ASSERT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = 0", kTableName, kKeyColumnName));
ASSERT_OK(conn.Execute("COMMIT"));
ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()"));
change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint));
ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1));
cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint();

// --- Local (no origin) path - verify origin_id is 0/absent ---
ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (3, 300)", kTableName));
change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint));
ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 0));
}

TEST_F(CDCSDKYsqlTest, TestUPAMNotStuckWithIndexInColocatedTablet) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
Expand Down