@@ -180,6 +180,11 @@ public void testClusteringKey() throws InterruptedException, IOException {
180180 public void testCompoundPk () throws InterruptedException , IOException {
181181 testCompoundPk ("ks1" );
182182 }
183+
184+ @ Test
185+ public void testOnlyPk () throws InterruptedException , IOException {
186+ testOnlyPk ("ks1" );
187+ }
183188
184189 @ Test
185190 public void testSchema () throws InterruptedException , IOException {
@@ -536,6 +541,67 @@ public void testCompoundPk(String ksName) throws InterruptedException, IOExcepti
536541 }
537542 }
538543
544+ public void testOnlyPk (String ksName ) throws InterruptedException , IOException {
545+ try {
546+ try (CqlSession cqlSession = cassandraContainer1 .getCqlSession ()) {
547+ cqlSession .execute ("CREATE KEYSPACE IF NOT EXISTS " + ksName +
548+ " WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};" );
549+ cqlSession .execute ("CREATE TABLE IF NOT EXISTS " + ksName + ".table6 (a text, b int, PRIMARY KEY(a,b)) WITH cdc=true" );
550+ cqlSession .execute ("INSERT INTO " + ksName + ".table6 (a,b) VALUES('1',1)" );
551+ cqlSession .execute ("INSERT INTO " + ksName + ".table6 (a,b) VALUES('2',2)" );
552+ cqlSession .execute ("INSERT INTO " + ksName + ".table6 (a,b) VALUES('3',3)" );
553+ }
554+ deployConnector (ksName , "table6" );
555+ try (PulsarClient pulsarClient = PulsarClient .builder ().serviceUrl (pulsarContainer .getPulsarBrokerUrl ()).build ()) {
556+ try (Consumer <GenericRecord > consumer = pulsarClient .newConsumer (org .apache .pulsar .client .api .Schema .AUTO_CONSUME ())
557+ .topic (String .format (Locale .ROOT , "data-%s.table6" , ksName ))
558+ .subscriptionName ("sub1" )
559+ .subscriptionType (SubscriptionType .Key_Shared )
560+ .subscriptionMode (SubscriptionMode .Durable )
561+ .subscriptionInitialPosition (SubscriptionInitialPosition .Earliest )
562+ .subscribe ()) {
563+ Message <GenericRecord > msg ;
564+ int receivedCount = 1 ;
565+ while ((msg = consumer .receive (60 , TimeUnit .SECONDS )) != null &&
566+ receivedCount < 5 ) {
567+ GenericRecord record = msg .getValue ();
568+ assertEquals (this .schemaType , record .getSchemaType ());
569+ Object key = getKey (msg );
570+ GenericRecord value = getValue (record );
571+ // assert key fields
572+ assertEquals (Integer .toString (receivedCount ) , getAndAssertKeyFieldAsString (key , "a" ));
573+ assertEquals (receivedCount , getAndAssertKeyFieldAsInt (key , "b" ));
574+ // assert value fields
575+ assertEquals (Integer .toString (receivedCount ), value .getField ("a" ));
576+ assertEquals (receivedCount , value .getField ("b" ));
577+ consumer .acknowledge (msg );
578+ receivedCount ++;
579+ }
580+
581+ // delete a row
582+ try (CqlSession cqlSession = cassandraContainer1 .getCqlSession ()) {
583+ cqlSession .execute ("DELETE FROM " + ksName + ".table6 WHERE a = '1' AND b = 1" );
584+ }
585+ while ((msg = consumer .receive (30 , TimeUnit .SECONDS )) != null &&
586+ receivedCount < 6 ) {
587+ GenericRecord record = msg .getValue ();
588+ assertEquals (this .schemaType , record .getSchemaType ());
589+ Object key = getKey (msg );
590+ GenericRecord value = getValue (record );
591+ assertEquals ("1" , getAndAssertKeyFieldAsString (key ,"a" ));
592+ assertEquals (1 , getAndAssertKeyFieldAsInt (key , "b" ));
593+ assertNullValue (value );
594+ consumer .acknowledge (msg );
595+ receivedCount ++;
596+ }
597+ }
598+ }
599+ } finally {
600+ dumpFunctionLogs ("cassandra-source-" + ksName + "-table6" );
601+ undeployConnector (ksName , "table6" );
602+ }
603+ }
604+
539605 // docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table3/cassandra-source-ks1-table3-0.log
540606 public void testSchema (String ksName ) throws InterruptedException , IOException {
541607 try {
0 commit comments