From c63e4c20db34f97b9dd6f3927e107654f1de6c0e Mon Sep 17 00:00:00 2001 From: Kate Galieva Date: Wed, 25 Feb 2026 23:59:33 -0500 Subject: [PATCH 1/4] Attach `xrepl_origin_id` to every DML CDC event --- src/yb/cdc/cdc_service.proto | 2 +- src/yb/cdc/cdcsdk_producer.cc | 17 +++- src/yb/integration-tests/cdcsdk_ysql-test.cc | 93 ++++++++++++++++++++ 3 files changed, 107 insertions(+), 5 deletions(-) diff --git a/src/yb/cdc/cdc_service.proto b/src/yb/cdc/cdc_service.proto index e30786c62330..85fbf3b215c2 100644 --- a/src/yb/cdc/cdc_service.proto +++ b/src/yb/cdc/cdc_service.proto @@ -558,7 +558,7 @@ message RowMessage { // for other ops such as DDL/BEGIN/COMMIT/SAFEPOINT. Not populated in the Snapshot phase. optional bytes primary_key = 18; - // Replication origin id associated with the transaction. Only set for COMMIT Ops. + // Replication origin id associated with the transaction. Set on DML and COMMIT Ops. optional uint32 xrepl_origin_id = 19; } diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index 112ee55cbc14..48ba83656351 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -959,6 +959,7 @@ Result GetTableInfoForSysCatalogTable( Status PopulateCDCSDKIntentRecord( const OpId& op_id, const TransactionId& transaction_id, + uint32_t xrepl_origin_id, const std::vector& intents, const StreamMetadata& metadata, const std::shared_ptr& tablet_peer, @@ -1148,6 +1149,10 @@ Status PopulateCDCSDKIntentRecord( row_message->set_commit_time(commit_time.ToPB()); row_message->set_record_time(intent.intent_ht.hybrid_time().ToUint64()); + if (xrepl_origin_id) { + row_message->set_xrepl_origin_id(xrepl_origin_id); + } + if (IsOldRowNeededOnDelete(record_type) && (row_message->op() == RowMessage_Op_DELETE)) { auto read_time = FLAGS_cdc_enable_intra_transactional_before_image @@ -1506,6 +1511,10 @@ Status PopulateCDCSDKWriteRecord( xrepl_origin_id = msg->write().xrepl_origin_id(); } + if (xrepl_origin_id) { + row_message->set_xrepl_origin_id(xrepl_origin_id); + } + // Check whether operation is WRITE or DELETE. if (value_type == dockv::ValueEntryType::kTombstone && decoded_key.num_subkeys() == 0) { SetOperation(row_message, OpType::DELETE, schema); @@ -1868,10 +1877,10 @@ Status ProcessIntents( // Need to populate the CDCSDKRecords if (!keyValueIntents->empty()) { RETURN_NOT_OK(PopulateCDCSDKIntentRecord( - op_id, transaction_id, *keyValueIntents, metadata, tablet_peer, enum_oid_label_map, - composite_atts_map, request_source, cached_schema_details, schema_packing_storages, resp, - consumption, &write_id, &reverse_index_key, commit_time, client, end_of_transaction, - throughput_metrics)); + op_id, transaction_id, xrepl_origin_id, *keyValueIntents, metadata, tablet_peer, + enum_oid_label_map, composite_atts_map, request_source, cached_schema_details, + schema_packing_storages, resp, consumption, &write_id, &reverse_index_key, commit_time, + client, end_of_transaction, throughput_metrics)); } if (end_of_transaction) { diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index ca37ebb72b44..8aac8cc08fc2 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -12987,6 +12987,99 @@ TEST_F(CDCSDKYsqlTest, TestOriginId) { cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); } +TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_populate_end_markers_transactions) = true; + ASSERT_OK(SetUpWithParams(1 /* rf */, 1 /* num_masters*/)); + const auto kOrigin1 = "origin1"; + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + + ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_create('$0');", kOrigin1)); + + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr)); + ASSERT_EQ(tablets.size(), 1); + auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream()); + + // Consume the initial schema record. + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (0, 0)", kTableName)); + auto change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + auto cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); + + // Helper: check that all DML records (INSERT/UPDATE/DELETE) in the response carry the expected + // xrepl_origin_id, and that COMMIT records also carry it (backwards compat). + auto verify_origin_id_on_all_records = + [](const GetChangesResponsePB& resp, uint32_t expected_origin_id) { + for (const auto& record : resp.cdc_sdk_proto_records()) { + auto op = record.row_message().op(); + if (op == RowMessage::INSERT || op == RowMessage::UPDATE || + op == RowMessage::DELETE || op == RowMessage::COMMIT) { + if (expected_origin_id != 0) { + ASSERT_TRUE(record.row_message().has_xrepl_origin_id()) + << "Expected xrepl_origin_id on op=" << RowMessage::Op_Name(op); + ASSERT_EQ(record.row_message().xrepl_origin_id(), expected_origin_id) + << "Wrong xrepl_origin_id on op=" << RowMessage::Op_Name(op); + } else { + // origin_id 0 means local — field should be absent or zero. + ASSERT_TRUE(!record.row_message().has_xrepl_origin_id() || + record.row_message().xrepl_origin_id() == 0) + << "Expected no xrepl_origin_id on op=" << RowMessage::Op_Name(op); + } + } + } + }; + + // --- Single-shard (autocommit) path --- + // INSERT with origin. + ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1)); + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1, 100)", kTableName)); + ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); + LOG(INFO) << "Single-shard INSERT: " << change_resp.ShortDebugString(); + ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); + cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); + + // UPDATE with origin. + ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1)); + ASSERT_OK(conn.ExecuteFormat( + "UPDATE $0 SET $1 = 200 WHERE $2 = 1", kTableName, kValueColumnName, kKeyColumnName)); + ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); + LOG(INFO) << "Single-shard UPDATE: " << change_resp.ShortDebugString(); + ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); + cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); + + // DELETE with origin. + ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1)); + ASSERT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = 1", kTableName, kKeyColumnName)); + ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); + LOG(INFO) << "Single-shard DELETE: " << change_resp.ShortDebugString(); + ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); + cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); + + // --- Multi-shard (explicit transaction) path --- + ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1)); + ASSERT_OK(conn.Execute("BEGIN")); + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (2, 200)", kTableName)); + ASSERT_OK(conn.ExecuteFormat( + "UPDATE $0 SET $1 = 300 WHERE $2 = 2", kTableName, kValueColumnName, kKeyColumnName)); + ASSERT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = 0", kTableName, kKeyColumnName)); + ASSERT_OK(conn.Execute("COMMIT")); + ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); + LOG(INFO) << "Multi-shard txn: " << change_resp.ShortDebugString(); + ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); + cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); + + // --- Local (no origin) path — verify origin_id is 0/absent --- + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (3, 300)", kTableName)); + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); + LOG(INFO) << "Local INSERT: " << change_resp.ShortDebugString(); + ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 0)); +} + TEST_F(CDCSDKYsqlTest, TestUPAMNotStuckWithIndexInColocatedTablet) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; From 70e9b8752848699f30ced9826e3a44bdd2f6f3e9 Mon Sep 17 00:00:00 2001 From: Kate Galieva Date: Mon, 23 Mar 2026 14:17:50 -0400 Subject: [PATCH 2/4] Address review comments --- src/yb/integration-tests/cdcsdk_ysql-test.cc | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 8aac8cc08fc2..44384ea03bd5 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -12988,7 +12988,6 @@ TEST_F(CDCSDKYsqlTest, TestOriginId) { } TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { - ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_populate_end_markers_transactions) = true; ASSERT_OK(SetUpWithParams(1 /* rf */, 1 /* num_masters*/)); const auto kOrigin1 = "origin1"; @@ -13002,11 +13001,6 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { ASSERT_EQ(tablets.size(), 1); auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream()); - // Consume the initial schema record. - ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (0, 0)", kTableName)); - auto change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); - auto cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); - // Helper: check that all DML records (INSERT/UPDATE/DELETE) in the response carry the expected // xrepl_origin_id, and that COMMIT records also carry it (backwards compat). auto verify_origin_id_on_all_records = @@ -13030,13 +13024,19 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { } }; + // Insert a row without any replication origin and consume the records. + // These records should not carry any origin id. + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (0, 0)", kTableName)); + auto change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 0)); + auto cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); + // --- Single-shard (autocommit) path --- // INSERT with origin. ASSERT_OK(conn.FetchFormat("SELECT pg_replication_origin_session_setup('$0');", kOrigin1)); ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1, 100)", kTableName)); ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); - LOG(INFO) << "Single-shard INSERT: " << change_resp.ShortDebugString(); ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); @@ -13046,7 +13046,6 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { "UPDATE $0 SET $1 = 200 WHERE $2 = 1", kTableName, kValueColumnName, kKeyColumnName)); ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); - LOG(INFO) << "Single-shard UPDATE: " << change_resp.ShortDebugString(); ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); @@ -13055,7 +13054,6 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { ASSERT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = 1", kTableName, kKeyColumnName)); ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); - LOG(INFO) << "Single-shard DELETE: " << change_resp.ShortDebugString(); ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); @@ -13069,14 +13067,12 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { ASSERT_OK(conn.Execute("COMMIT")); ASSERT_OK(conn.Fetch("SELECT pg_replication_origin_session_reset()")); change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); - LOG(INFO) << "Multi-shard txn: " << change_resp.ShortDebugString(); ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); // --- Local (no origin) path — verify origin_id is 0/absent --- ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (3, 300)", kTableName)); change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); - LOG(INFO) << "Local INSERT: " << change_resp.ShortDebugString(); ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 0)); } From c6e3b48047e875ac7d4726f4b2405034e81e74c6 Mon Sep 17 00:00:00 2001 From: Kate Galieva Date: Mon, 23 Mar 2026 15:13:15 -0400 Subject: [PATCH 3/4] Set origin in BEGIN message --- src/yb/cdc/cdc_service.proto | 2 +- src/yb/cdc/cdcsdk_producer.cc | 28 +++++++++++++------- src/yb/integration-tests/cdcsdk_ysql-test.cc | 6 ++--- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/yb/cdc/cdc_service.proto b/src/yb/cdc/cdc_service.proto index 85fbf3b215c2..bb8735997823 100644 --- a/src/yb/cdc/cdc_service.proto +++ b/src/yb/cdc/cdc_service.proto @@ -558,7 +558,7 @@ message RowMessage { // for other ops such as DDL/BEGIN/COMMIT/SAFEPOINT. Not populated in the Snapshot phase. optional bytes primary_key = 18; - // Replication origin id associated with the transaction. Set on DML and COMMIT Ops. + // Replication origin id associated with the transaction. Set on BEGIN, DML and COMMIT Ops. optional uint32 xrepl_origin_id = 19; } diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index 48ba83656351..2b7855a8c6c1 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -1323,13 +1323,16 @@ Status PopulateCDCSDKIntentRecord( } void FillBeginRecordForSingleShardTransaction( - const uint64_t& commit_timestamp, GetChangesResponsePB* resp, + const uint64_t& commit_timestamp, uint32_t xrepl_origin_id, GetChangesResponsePB* resp, CDCThroughputMetrics* throughput_metrics) { CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records(); RowMessage* row_message = proto_record->mutable_row_message(); row_message->set_op(RowMessage_Op_BEGIN); row_message->set_commit_time(commit_timestamp); + if (xrepl_origin_id) { + row_message->set_xrepl_origin_id(xrepl_origin_id); + } // No need to add record_time to the Begin record since it does not have any intent associated // with it. @@ -1371,8 +1374,14 @@ Status PopulateCDCSDKWriteRecord( GetChangesResponsePB* resp, client::YBClient* client, CDCThroughputMetrics* throughput_metrics) { + uint32_t xrepl_origin_id = 0; + if (msg->write().has_xrepl_origin_id()) { + xrepl_origin_id = msg->write().xrepl_origin_id(); + } + if (FLAGS_cdc_populate_end_markers_transactions) { - FillBeginRecordForSingleShardTransaction(msg->hybrid_time(), resp, throughput_metrics); + FillBeginRecordForSingleShardTransaction( + msg->hybrid_time(), xrepl_origin_id, resp, throughput_metrics); } auto tablet_ptr = VERIFY_RESULT(tablet_peer->shared_tablet()); @@ -1400,7 +1409,6 @@ Status PopulateCDCSDKWriteRecord( auto table_name = tablet_ptr->metadata()->table_name(); auto table_id = tablet_ptr->metadata()->table_id(); SchemaPackingStorage* schema_packing_storage = &schema_packing_storages->at(table_id); - uint32_t xrepl_origin_id = 0; // TODO: This function and PopulateCDCSDKIntentRecord have a lot of code in common. They should // be refactored to use some common row-column iterator. @@ -1506,11 +1514,6 @@ Status PopulateCDCSDKWriteRecord( SetCDCSDKOpId(msg->id().term(), msg->id().index(), record_batch_idx, "", cdc_sdk_op_id_pb); is_packed_row_record = false; - // Populate PostgreSQL replication origin id if available. - if (msg->write().has_xrepl_origin_id()) { - xrepl_origin_id = msg->write().xrepl_origin_id(); - } - if (xrepl_origin_id) { row_message->set_xrepl_origin_id(xrepl_origin_id); } @@ -1783,13 +1786,17 @@ void SetKeyWriteId(string key, int32_t write_id, CDCSDKCheckpointPB* checkpoint) void FillBeginRecord( const TransactionId& transaction_id, const uint64_t& commit_timestamp, - GetChangesResponsePB* resp, CDCThroughputMetrics* throughput_metrics) { + uint32_t xrepl_origin_id, GetChangesResponsePB* resp, + CDCThroughputMetrics* throughput_metrics) { CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records(); RowMessage* row_message = proto_record->mutable_row_message(); row_message->set_op(RowMessage_Op_BEGIN); row_message->set_transaction_id(transaction_id.ToString()); row_message->set_commit_time(commit_timestamp); + if (xrepl_origin_id) { + row_message->set_xrepl_origin_id(xrepl_origin_id); + } // No need to add record_time to the Begin record since it does not have any intent associated // with it. @@ -1835,7 +1842,8 @@ Status ProcessIntents( auto tablet = VERIFY_RESULT(tablet_peer->shared_tablet()); if (stream_state->key.empty() && stream_state->write_id == 0 && FLAGS_cdc_populate_end_markers_transactions) { - FillBeginRecord(transaction_id, commit_time.ToUint64(), resp, throughput_metrics); + FillBeginRecord( + transaction_id, commit_time.ToUint64(), xrepl_origin_id, resp, throughput_metrics); TEST_SYNC_POINT("AddBeginRecord::End"); } diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 44384ea03bd5..50df39bfc34b 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -13001,13 +13001,13 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { ASSERT_EQ(tablets.size(), 1); auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream()); - // Helper: check that all DML records (INSERT/UPDATE/DELETE) in the response carry the expected - // xrepl_origin_id, and that COMMIT records also carry it (backwards compat). + // Helper: check that all BEGIN, DML (INSERT/UPDATE/DELETE), and COMMIT records in the response + // carry the expected xrepl_origin_id. auto verify_origin_id_on_all_records = [](const GetChangesResponsePB& resp, uint32_t expected_origin_id) { for (const auto& record : resp.cdc_sdk_proto_records()) { auto op = record.row_message().op(); - if (op == RowMessage::INSERT || op == RowMessage::UPDATE || + if (op == RowMessage::BEGIN || op == RowMessage::INSERT || op == RowMessage::UPDATE || op == RowMessage::DELETE || op == RowMessage::COMMIT) { if (expected_origin_id != 0) { ASSERT_TRUE(record.row_message().has_xrepl_origin_id()) From fdadfc62a29ed45c9d4fd3213698878c1674b131 Mon Sep 17 00:00:00 2001 From: Kate Galieva Date: Tue, 24 Mar 2026 14:41:59 -0400 Subject: [PATCH 4/4] Fix dashes --- src/yb/integration-tests/cdcsdk_ysql-test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 50df39bfc34b..7c485931d368 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -13015,7 +13015,7 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { ASSERT_EQ(record.row_message().xrepl_origin_id(), expected_origin_id) << "Wrong xrepl_origin_id on op=" << RowMessage::Op_Name(op); } else { - // origin_id 0 means local — field should be absent or zero. + // origin_id 0 means local - field should be absent or zero. ASSERT_TRUE(!record.row_message().has_xrepl_origin_id() || record.row_message().xrepl_origin_id() == 0) << "Expected no xrepl_origin_id on op=" << RowMessage::Op_Name(op); @@ -13070,7 +13070,7 @@ TEST_F(CDCSDKYsqlTest, TestOriginIdOnDMLRecords) { ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 1)); cdc_sdk_checkpoint = change_resp.cdc_sdk_checkpoint(); - // --- Local (no origin) path — verify origin_id is 0/absent --- + // --- Local (no origin) path - verify origin_id is 0/absent --- ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (3, 300)", kTableName)); change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &cdc_sdk_checkpoint)); ASSERT_NO_FATAL_FAILURE(verify_origin_id_on_all_records(change_resp, 0));