Skip to content

Commit 8bdc738

Browse files
[#28744] CDC: Stream large transactions for unqualified tables correctly
Summary: In cdc producer, for colocated / co-tables, any intent corresponding to an unqualified table is skipped while populating records. However we do not move the reverse index key and write id forward for such intents. If a large transaction (requiring 2 or more GetChanges calls to ship completely), comprising entirely of records for an unqualified table is encountered, then virtual WAL gets stuck, as we do not populate the reverse index key and write id in the GetChanges response. Every time the virtual WAL calls GetChanges, we read the same set of intents and send back an empty response. This bug also affects the gRPC model of CDC, where the connector task would get stuck (analogous to the virtual WAL), due to non-movement of the reverse index key and write id. To fix this we move the reverse index key and write id forward while skipping intents for unqualified colocated / co-tables. This way we ensure that successive GetChanges calls from the virtual WAL read different sets of intents, eventually sending a record belonging to a qualified table or a safepoint record. Jira: DB-18452 Test Plan: ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestLargeTxnOnCatalogTablet Reviewers: skumar, asrinivasan Reviewed By: asrinivasan Subscribers: ycdcxcluster Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D47417
1 parent a4df8de commit 8bdc738

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

src/yb/cdc/cdcsdk_producer.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,8 @@ Status PopulateCDCSDKIntentRecord(
10501050
}
10511051
table_id = table_info->table_id;
10521052
if (!IsColocatedTableQualifiedForStreaming(table_id, metadata)) {
1053+
*write_id = intent.write_id;
1054+
*reverse_index_key = intent.reverse_index_key;
10531055
continue;
10541056
}
10551057

src/yb/integration-tests/cdcsdk_ysql-test.cc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12351,5 +12351,73 @@ TEST_F(CDCSDKYsqlTest, TestConcurrentStreamCreationOnCatalogTables) {
1235112351
tablet_peer->get_cdc_min_replicated_index());
1235212352
}
1235312353

12354+
TEST_F(CDCSDKYsqlTest, TestLargeTxnOnCatalogTablet) {
12355+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true;
12356+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_table_support) = false;
12357+
ANNOTATE_UNPROTECTED_WRITE(
12358+
FLAGS_ysql_yb_enable_implicit_dynamic_tables_logical_replication) = true;
12359+
12360+
// Reduce the number of intents fetched in a batch for CDC to simulate a large txn.
12361+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_max_stream_intent_records) = 2;
12362+
12363+
ASSERT_OK(SetUpWithParams(1 /* rf */, 3 /* num_masters*/));
12364+
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());
12365+
12366+
// Create a table after slot creation. This will result in a multi-shard txn on sys catalog
12367+
// tablet.
12368+
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
12369+
ASSERT_OK(conn.Execute("CREATE TABLE test_1 (a int primary key, b text)"));
12370+
12371+
// Call GetChanges on master and assert that we are getting a checkpoint record with valid
12372+
// write_id and reverse_index_key.
12373+
auto change_resp = ASSERT_RESULT(GetChangesFromMaster(stream_id));
12374+
12375+
ASSERT_FALSE(change_resp.has_error());
12376+
ASSERT_FALSE(change_resp.cdc_sdk_checkpoint().key().empty());
12377+
ASSERT_NE(change_resp.cdc_sdk_checkpoint().write_id(), 0);
12378+
}
12379+
12380+
TEST_F(CDCSDKYsqlTest, TestLargeTxnOnUnqualifiedColocatedTable) {
12381+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true;
12382+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
12383+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_max_stream_intent_records) = 2;
12384+
12385+
ASSERT_OK(SetUpWithParams(1 /* rf */, 1 /* num_masters */, true /* colocated */));
12386+
12387+
const vector<string> table_list_suffix = {"_1", "_2"};
12388+
const int kNumTables = 2;
12389+
vector<YBTableName> table(kNumTables);
12390+
int idx = 0;
12391+
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(kNumTables);
12392+
12393+
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
12394+
12395+
// Create 2 colocated tables. The test_table_1 will not have a primary key and as a result it will
12396+
// not be included in the stream metadata, hence making it an unqualified table.
12397+
for (idx = 0; idx < kNumTables; idx++) {
12398+
ASSERT_OK(conn.ExecuteFormat(
12399+
"CREATE TABLE $0$1(id1 int $2);", kTableName, table_list_suffix[idx],
12400+
idx ? "primary key" : ""));
12401+
table[idx] = ASSERT_RESULT(GetTable(
12402+
&test_cluster_, kNamespaceName, Format("$0$1", kTableName, table_list_suffix[idx])));
12403+
ASSERT_OK(test_client()->GetTablets(
12404+
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));
12405+
}
12406+
12407+
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream());
12408+
12409+
// Perform a large txn on the unqualified colocated table. Note that we have reduced the
12410+
// value of FLAGS_cdc_max_stream_intent_records for this txn to be considered a "large txn".
12411+
ASSERT_OK(
12412+
conn.ExecuteFormat("INSERT INTO $0 values (generate_series(1,100))", table[0].table_name()));
12413+
12414+
// Call GetChanges on the colocated tablet and assert that we receive a non-empty key and
12415+
// write-id.
12416+
auto change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets[0]));
12417+
ASSERT_FALSE(change_resp.has_error());
12418+
ASSERT_FALSE(change_resp.cdc_sdk_checkpoint().key().empty());
12419+
ASSERT_NE(change_resp.cdc_sdk_checkpoint().write_id(), 0);
12420+
}
12421+
1235412422
} // namespace cdc
1235512423
} // namespace yb

0 commit comments

Comments
 (0)