Skip to content

Commit 934a585

Browse files
Fix/primary key only table (#199)
* feat(CassandraClient): Adjust `selectFrom` logic in `buildSelectStatement` to use primary key columns when the projection is empty. Improve Javadoc comments. (cherry picked from commit 52814ae) * feat(CassandraSource): Add primary key only table check to optimize value converter logic and refine Javadoc comments (cherry picked from commit 2315d81) * feat(CassandraSourceTests): Add a unit test for a primary key only table scenario (cherry picked from commit 2358128) * feat(CassandraClient, CassandraSource): Add comments to clarify primary key handling and projection logic (cherry picked from commit 9b56d88) * ci: extend test job timeout to 360 minutes (cherry picked from commit 4a94cdb) * refactor(ChaosNetworkContainer): update tc-image to use ghcr.io/alexei-led/pumba-debian-nettools (cherry picked from commit 354338a) --------- Co-authored-by: Arkadip <[email protected]>
1 parent aa12550 commit 934a585

File tree

3 files changed

+92
-17
lines changed

3 files changed

+92
-17
lines changed

connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
4343
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
4444
import com.datastax.oss.driver.api.querybuilder.select.Select;
45+
import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
4546
import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
4647
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
4748
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
@@ -155,8 +156,8 @@ public CqlIdentifier[] buildPrimaryKeyClause(TableMetadata tableMetadata) {
155156

156157
/**
157158
* Build a SELECT prepared statement for the first <i>pkLength</i> primary key columns.
158-
* @param keyspaceName
159-
* @param tableName
159+
* @param keyspaceName keyspace name
160+
* @param tableName table name
160161
* @param projection columns
161162
* @param pk primary key columns
162163
* @param pkLength primary key length
@@ -166,7 +167,9 @@ public PreparedStatement prepareSelect(String keyspaceName, String tableName,
166167
CqlIdentifier[] projection,
167168
CqlIdentifier[] pk,
168169
int pkLength) {
169-
Select query = selectFrom(keyspaceName, tableName).columns(projection);
170+
// select columns according to projection array length
171+
Select query = selectFrom(keyspaceName, tableName)
172+
.columns(projection.length != 0 ? projection : pk);
170173
for (int i = 0; i < pkLength; i++)
171174
query = query.whereColumn(pk[i]).isEqualTo(bindMarker());
172175
query.limit(1);

connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,7 @@
7070
import java.lang.reflect.InvocationTargetException;
7171
import java.nio.charset.StandardCharsets;
7272
import java.time.Duration;
73-
import java.util.ArrayList;
74-
import java.util.Arrays;
75-
import java.util.Collections;
76-
import java.util.List;
77-
import java.util.Locale;
78-
import java.util.Map;
79-
import java.util.Objects;
80-
import java.util.Optional;
81-
import java.util.UUID;
73+
import java.util.*;
8274
import java.util.concurrent.ArrayBlockingQueue;
8375
import java.util.concurrent.Callable;
8476
import java.util.concurrent.CompletableFuture;
@@ -346,16 +338,29 @@ void initCassandraClient() throws InvocationTargetException, NoSuchMethodExcepti
346338
setValueConverterAndQuery(tuple._1, tuple._2);
347339
}
348340

341+
/**
342+
* Check if the table has only primary key columns.
343+
* @param tableMetadata the table metadata
344+
* @return true if the table has only primary key columns, false otherwise
345+
*/
346+
private boolean isPrimaryKeyOnlyTable(TableMetadata tableMetadata) {
347+
// if the table has no columns other than the primary key, we can skip the value converter
348+
return tableMetadata.getColumns().size() == tableMetadata.getPrimaryKey().size() &&
349+
new HashSet<>(tableMetadata.getPrimaryKey()).containsAll(tableMetadata.getColumns().values());
350+
}
351+
349352
synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata tableMetadata) {
350353
try {
354+
boolean isPrimaryKeyOnlyTable = isPrimaryKeyOnlyTable(tableMetadata);
351355
List<ColumnMetadata> columns = tableMetadata.getColumns().values().stream()
352356
// include primary keys in the json only output format options
353357
// TODO: PERF: Infuse the key values instead of reading from DB https://github.com/datastax/cdc-apache-cassandra/issues/84
354-
.filter(c -> config.isJsonOnlyOutputFormat() ? true : !tableMetadata.getPrimaryKey().contains(c))
358+
// If primary key only table, then add all the columns into the value schema.
359+
.filter(c -> config.isJsonOnlyOutputFormat() || isPrimaryKeyOnlyTable || !tableMetadata.getPrimaryKey().contains(c))
355360
.filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches())
356361
.collect(Collectors.toList());
357362
List<ColumnMetadata> staticColumns = tableMetadata.getColumns().values().stream()
358-
.filter(c -> c.isStatic())
363+
.filter(ColumnMetadata::isStatic)
359364
.filter(c -> !tableMetadata.getPrimaryKey().contains(c))
360365
.filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches())
361366
.collect(Collectors.toList());
@@ -379,9 +384,9 @@ synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata
379384

380385
/**
381386
* Build the CQL prepared statement for the specified where clause length.
382-
* NOTE: The prepared statement cannot be build from the schema listener thread to avoid a possible deadlock.
387+
* NOTE: The prepared statement cannot be built from the schema listener thread to avoid a possible deadlock.
383388
*
384-
* @param valueConverterAndQuery
389+
* @param valueConverterAndQuery the converter and query parameters
385390
* @param whereClauseLength the number of columns in the where clause
386391
* @return preparedStatement
387392
*/
@@ -392,7 +397,8 @@ synchronized PreparedStatement getSelectStatement(ConverterAndQuery valueConvert
392397
valueConverterAndQuery.tableName,
393398
valueConverterAndQuery.getProjectionClause(whereClauseLength),
394399
valueConverterAndQuery.primaryKeyClause,
395-
k));
400+
k
401+
));
396402
}
397403

398404
Class<?> getKeyConverterClass() {

connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ public void testClusteringKey() throws InterruptedException, IOException {
175175
public void testCompoundPk() throws InterruptedException, IOException {
176176
testCompoundPk("ks1");
177177
}
178+
179+
@Test
180+
public void testOnlyPk() throws InterruptedException, IOException {
181+
testOnlyPk("ks1");
182+
}
178183

179184
@Test
180185
public void testSchema() throws InterruptedException, IOException {
@@ -536,6 +541,67 @@ public void testCompoundPk(String ksName) throws InterruptedException, IOExcepti
536541
}
537542
}
538543

544+
public void testOnlyPk(String ksName) throws InterruptedException, IOException {
545+
try {
546+
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
547+
cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName +
548+
" WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};");
549+
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table6 (a text, b int, PRIMARY KEY(a,b)) WITH cdc=true");
550+
cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('1',1)");
551+
cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('2',2)");
552+
cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('3',3)");
553+
}
554+
deployConnector(ksName, "table6");
555+
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) {
556+
try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME())
557+
.topic(String.format(Locale.ROOT, "data-%s.table6", ksName))
558+
.subscriptionName("sub1")
559+
.subscriptionType(SubscriptionType.Key_Shared)
560+
.subscriptionMode(SubscriptionMode.Durable)
561+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
562+
.subscribe()) {
563+
Message<GenericRecord> msg;
564+
int receivedCount = 1;
565+
while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
566+
receivedCount < 5) {
567+
GenericRecord record = msg.getValue();
568+
assertEquals(this.schemaType, record.getSchemaType());
569+
Object key = getKey(msg);
570+
GenericRecord value = getValue(record);
571+
// assert key fields
572+
assertEquals(Integer.toString(receivedCount) , getAndAssertKeyFieldAsString(key, "a"));
573+
assertEquals(receivedCount, getAndAssertKeyFieldAsInt(key, "b"));
574+
// assert value fields
575+
assertEquals(Integer.toString(receivedCount), value.getField("a"));
576+
assertEquals(receivedCount, value.getField("b"));
577+
consumer.acknowledge(msg);
578+
receivedCount++;
579+
}
580+
581+
// delete a row
582+
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
583+
cqlSession.execute("DELETE FROM " + ksName + ".table6 WHERE a = '1' AND b = 1");
584+
}
585+
while ((msg = consumer.receive(30, TimeUnit.SECONDS)) != null &&
586+
receivedCount < 6) {
587+
GenericRecord record = msg.getValue();
588+
assertEquals(this.schemaType, record.getSchemaType());
589+
Object key = getKey(msg);
590+
GenericRecord value = getValue(record);
591+
assertEquals("1", getAndAssertKeyFieldAsString(key,"a"));
592+
assertEquals(1, getAndAssertKeyFieldAsInt(key, "b"));
593+
assertNullValue(value);
594+
consumer.acknowledge(msg);
595+
receivedCount++;
596+
}
597+
}
598+
}
599+
} finally {
600+
dumpFunctionLogs("cassandra-source-" + ksName + "-table6");
601+
undeployConnector(ksName, "table6");
602+
}
603+
}
604+
539605
// docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table3/cassandra-source-ks1-table3-0.log
540606
public void testSchema(String ksName) throws InterruptedException, IOException {
541607
try {

0 commit comments

Comments
 (0)