Skip to content

Commit 61a604a

Browse files
committed
[BACKPORT 2024.2][#28166] CDC: Add a batch index to every single shard record
Summary: **Backport description:** No merge conflicts were encountered. After generating unique record IDs, for comparing 2 records, we have checks to compare them using commit_time, record_time, write_id, etc to ascertain which record comes before in a sorted order. The check is performed in the method `CDCSDKUniqueRecordID::GreaterThanDistributedLSN`. Now when the GUC `yb_disable_transactional_writes` is set, we have multiple records inserted in a single `WRITE_OP` batch for a single shard transaction, there's a possibility that the records end up having the same `commit_time`, `record_time`, `write_id` and `table_id` so we fallback to comparing the primary keys. The core issue was that the VWAL expects each individual tablet to send records in order as per the order determined by `CDCSDKUniqueRecordID::GreaterThanDistributedLSN`. This was being violated and the situation leads us to a data loss scenario where we end up losing multiple records inserted in the same batch of a single shard transaction. The fix to this requires a mechanism to reliably have a fixed sorting order when all the other parameters end up with the same value. This PR adds the same mechanism by assigning a `write_id` to every single shard record based on the index of the wal records batch we are processing. By doing this, we'll be breaking the tie using `write_id` and we can reliably sort the records without filtering them. Jira: DB-17813 Original commit: c397d2f / D45857 Test Plan: The tests to reproduce the error and validate the fix has been added as a part of the logical replication connector's test suite in the following PR: yugabyte/debezium#182 Additionally, even though the issue will not be applicable to the gRPC connector, we are also adding a test prudently to gRPC connector as well: yugabyte/debezium-connector-yugabytedb#379 Reviewers: asrinivasan, sumukh.phalgaonkar, skumar Reviewed By: sumukh.phalgaonkar Subscribers: ycdcxcluster Differential Revision: https://phorge.dev.yugabyte.com/D46189
1 parent 900fd1a commit 61a604a

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

src/yb/cdc/cdcsdk_producer.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,7 +1326,9 @@ Status PopulateCDCSDKWriteRecord(
13261326
SchemaPackingStorage* schema_packing_storage = &schema_packing_storages->at(table_id);
13271327
// TODO: This function and PopulateCDCSDKIntentRecord have a lot of code in common. They should
13281328
// be refactored to use some common row-column iterator.
1329-
for (auto it = batch.write_pairs().cbegin(); it != batch.write_pairs().cend(); ++it) {
1329+
int record_batch_idx = 0;
1330+
for (auto it = batch.write_pairs().cbegin(); it != batch.write_pairs().cend();
1331+
++it, ++record_batch_idx) {
13301332
const yb::docdb::LWKeyValuePairPB& write_pair = *it;
13311333
Slice key = write_pair.key();
13321334
const auto key_size =
@@ -1442,7 +1444,7 @@ Status PopulateCDCSDKWriteRecord(
14421444
row_message->set_table_id(table_id);
14431445
row_message->set_primary_key(primary_key.ToBuffer());
14441446
CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
1445-
SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb);
1447+
SetCDCSDKOpId(msg->id().term(), msg->id().index(), record_batch_idx, "", cdc_sdk_op_id_pb);
14461448
is_packed_row_record = false;
14471449

14481450
// Check whether operation is WRITE or DELETE.

0 commit comments

Comments
 (0)