From 6f727242484f073e35bb839b0e19ff6b8cf12839 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 17 Jul 2025 11:01:52 +0530 Subject: [PATCH 1/6] feat(CassandraClient): Adjust `selectFrom` logic in `buildSelectStatement` to use primary key columns when the projection is empty. Improve Javadoc comments. (cherry picked from commit 52814ae951b20d4c67fbc8bfc8e7a8c49cf061e1) --- .../main/java/com/datastax/oss/cdc/CassandraClient.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java b/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java index 8cfa33e4..c67b466a 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java +++ b/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java @@ -42,6 +42,7 @@ import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.datastax.oss.driver.api.core.servererrors.UnavailableException; import com.datastax.oss.driver.api.querybuilder.select.Select; +import com.datastax.oss.driver.api.querybuilder.select.SelectFrom; import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider; import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader; import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder; @@ -155,8 +156,8 @@ public CqlIdentifier[] buildPrimaryKeyClause(TableMetadata tableMetadata) { /** * Build a SELECT prepared statement for the first pkLength primary key columns. - * @param keyspaceName - * @param tableName + * @param keyspaceName keyspace name + * @param tableName table name * @param projection columns * @param pk primary key columns * @param pkLength primary key length @@ -166,7 +167,8 @@ public PreparedStatement prepareSelect(String keyspaceName, String tableName, CqlIdentifier[] projection, CqlIdentifier[] pk, int pkLength) { - Select query = selectFrom(keyspaceName, tableName).columns(projection); + Select query = selectFrom(keyspaceName, tableName) + .columns(projection.length != 0 ? projection : pk); for (int i = 0; i < pkLength; i++) query = query.whereColumn(pk[i]).isEqualTo(bindMarker()); query.limit(1); From 72d1b529c206c08f48b7f50d47ea653c324f567b Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 17 Jul 2025 11:02:16 +0530 Subject: [PATCH 2/6] feat(CassandraSource): Add primary key only table check to optimize value converter logic and refine Javadoc comments (cherry picked from commit 2315d813e3c82b99b96ba198795dce89df1983fc) --- .../oss/pulsar/source/CassandraSource.java | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java index 21716a59..733c047b 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java @@ -70,15 +70,7 @@ import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -346,16 +338,28 @@ void initCassandraClient() throws InvocationTargetException, NoSuchMethodExcepti setValueConverterAndQuery(tuple._1, tuple._2); } + /** + * Check if the table has only primary key columns. + * @param tableMetadata the table metadata + * @return true if the table has only primary key columns, false otherwise + */ + private boolean isPrimaryKeyOnlyTable(TableMetadata tableMetadata) { + // if the table has no columns other than the primary key, we can skip the value converter + return tableMetadata.getColumns().size() == tableMetadata.getPrimaryKey().size() && + new HashSet<>(tableMetadata.getPrimaryKey()).containsAll(tableMetadata.getColumns().values()); + } + synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata tableMetadata) { try { + boolean isPrimaryKeyOnlyTable = isPrimaryKeyOnlyTable(tableMetadata); List columns = tableMetadata.getColumns().values().stream() // include primary keys in the json only output format options // TODO: PERF: Infuse the key values instead of reading from DB https://github.com/datastax/cdc-apache-cassandra/issues/84 - .filter(c -> config.isJsonOnlyOutputFormat() ? true : !tableMetadata.getPrimaryKey().contains(c)) + .filter(c -> config.isJsonOnlyOutputFormat() || isPrimaryKeyOnlyTable || !tableMetadata.getPrimaryKey().contains(c)) .filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches()) .collect(Collectors.toList()); List staticColumns = tableMetadata.getColumns().values().stream() - .filter(c -> c.isStatic()) + .filter(ColumnMetadata::isStatic) .filter(c -> !tableMetadata.getPrimaryKey().contains(c)) .filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches()) .collect(Collectors.toList()); @@ -379,9 +383,9 @@ synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata /** * Build the CQL prepared statement for the specified where clause length. - * NOTE: The prepared statement cannot be build from the schema listener thread to avoid a possible deadlock. + * NOTE: The prepared statement cannot be built from the schema listener thread to avoid a possible deadlock. * - * @param valueConverterAndQuery + * @param valueConverterAndQuery the converter and query parameters * @param whereClauseLength the number of columns in the where clause * @return preparedStatement */ @@ -392,7 +396,8 @@ synchronized PreparedStatement getSelectStatement(ConverterAndQuery valueConvert valueConverterAndQuery.tableName, valueConverterAndQuery.getProjectionClause(whereClauseLength), valueConverterAndQuery.primaryKeyClause, - k)); + k + )); } Class getKeyConverterClass() { From 2124aa570cbdeb618f6840ca8c358be7a52c2c75 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 17 Jul 2025 11:02:51 +0530 Subject: [PATCH 3/6] feat(CassandraSourceTests): Add a unit test for a primary key only table scenario (cherry picked from commit 235812846290da45aa0c1f2ac1e33a08024d78f7) --- .../source/PulsarCassandraSourceTests.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index 286e0ce7..ad115c77 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -180,6 +180,11 @@ public void testClusteringKey() throws InterruptedException, IOException { public void testCompoundPk() throws InterruptedException, IOException { testCompoundPk("ks1"); } + + @Test + public void testOnlyPk() throws InterruptedException, IOException { + testOnlyPk("ks1"); + } @Test public void testSchema() throws InterruptedException, IOException { @@ -536,6 +541,67 @@ public void testCompoundPk(String ksName) throws InterruptedException, IOExcepti } } + public void testOnlyPk(String ksName) throws InterruptedException, IOException { + try { + try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) { + cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName + + " WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};"); + cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table6 (a text, b int, PRIMARY KEY(a,b)) WITH cdc=true"); + cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('1',1)"); + cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('2',2)"); + cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('3',3)"); + } + deployConnector(ksName, "table6"); + try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) { + try (Consumer consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME()) + .topic(String.format(Locale.ROOT, "data-%s.table6", ksName)) + .subscriptionName("sub1") + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionMode(SubscriptionMode.Durable) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + Message msg; + int receivedCount = 1; + while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null && + receivedCount < 5) { + GenericRecord record = msg.getValue(); + assertEquals(this.schemaType, record.getSchemaType()); + Object key = getKey(msg); + GenericRecord value = getValue(record); + // assert key fields + assertEquals(Integer.toString(receivedCount) , getAndAssertKeyFieldAsString(key, "a")); + assertEquals(receivedCount, getAndAssertKeyFieldAsInt(key, "b")); + // assert value fields + assertEquals(Integer.toString(receivedCount), value.getField("a")); + assertEquals(receivedCount, value.getField("b")); + consumer.acknowledge(msg); + receivedCount++; + } + + // delete a row + try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) { + cqlSession.execute("DELETE FROM " + ksName + ".table6 WHERE a = '1' AND b = 1"); + } + while ((msg = consumer.receive(30, TimeUnit.SECONDS)) != null && + receivedCount < 6) { + GenericRecord record = msg.getValue(); + assertEquals(this.schemaType, record.getSchemaType()); + Object key = getKey(msg); + GenericRecord value = getValue(record); + assertEquals("1", getAndAssertKeyFieldAsString(key,"a")); + assertEquals(1, getAndAssertKeyFieldAsInt(key, "b")); + assertNullValue(value); + consumer.acknowledge(msg); + receivedCount++; + } + } + } + } finally { + dumpFunctionLogs("cassandra-source-" + ksName + "-table6"); + undeployConnector(ksName, "table6"); + } + } + // docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table3/cassandra-source-ks1-table3-0.log public void testSchema(String ksName) throws InterruptedException, IOException { try { From 180e361f285ff7b5917143a7bb3c3d03e76c785e Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 17 Jul 2025 11:24:50 +0530 Subject: [PATCH 4/6] feat(CassandraClient, CassandraSource): Add comments to clarify primary key handling and projection logic (cherry picked from commit 9b56d889e5a6d304309cc0f3e248e245df640b8f) --- .../src/main/java/com/datastax/oss/cdc/CassandraClient.java | 1 + .../java/com/datastax/oss/pulsar/source/CassandraSource.java | 1 + 2 files changed, 2 insertions(+) diff --git a/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java b/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java index c67b466a..7617f109 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java +++ b/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java @@ -167,6 +167,7 @@ public PreparedStatement prepareSelect(String keyspaceName, String tableName, CqlIdentifier[] projection, CqlIdentifier[] pk, int pkLength) { + // select columns according to projection array length Select query = selectFrom(keyspaceName, tableName) .columns(projection.length != 0 ? projection : pk); for (int i = 0; i < pkLength; i++) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java index 733c047b..75b2ecd1 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java @@ -355,6 +355,7 @@ synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata List columns = tableMetadata.getColumns().values().stream() // include primary keys in the json only output format options // TODO: PERF: Infuse the key values instead of reading from DB https://github.com/datastax/cdc-apache-cassandra/issues/84 + // If primary key only table, then add all the columns into the value schema. .filter(c -> config.isJsonOnlyOutputFormat() || isPrimaryKeyOnlyTable || !tableMetadata.getPrimaryKey().contains(c)) .filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches()) .collect(Collectors.toList()); From 45f816ca3b9998a3ab4ce93d1041559a8ec019d7 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Mon, 28 Jul 2025 09:56:33 +0530 Subject: [PATCH 5/6] ci: extend test job timeout to 360 minutes (cherry picked from commit 4a94cdbe20326d2488f00087185e7cd5b780cf90) --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 48b95fca..af4fa504 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,7 +33,7 @@ jobs: needs: build name: Test runs-on: ubuntu-latest - timeout-minutes: 120 + timeout-minutes: 360 strategy: fail-fast: false matrix: From 287c855d8929780d0500712137fce65961ddc55e Mon Sep 17 00:00:00 2001 From: Arkadip Date: Mon, 28 Jul 2025 18:07:56 +0530 Subject: [PATCH 6/6] refactor(ChaosNetworkContainer): update tc-image to use ghcr.io/alexei-led/pumba-debian-nettools (cherry picked from commit 354338a6a7e39a0d2b4d06ea239932e5034927ca) --- .../java/com/datastax/testcontainers/ChaosNetworkContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java b/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java index fda04663..a08c7249 100644 --- a/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java +++ b/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java @@ -34,7 +34,7 @@ public class ChaosNetworkContainer> ext public ChaosNetworkContainer(String targetContainer, String pause) { super(PUMBA_IMAGE); - setCommand("--log-level debug netem --tc-image gaiadocker/iproute2 --duration " + pause + " loss --percent 100 " + targetContainer); + setCommand("--log-level debug netem --tc-image ghcr.io/alexei-led/pumba-debian-nettools --duration " + pause + " loss --percent 100 " + targetContainer); addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE); setWaitStrategy(Wait.forLogMessage(".*tc container created.*", 1)); withLogConsumer(o -> {