2525import org .apache .kafka .clients .producer .ProducerConfig ;
2626import org .apache .kafka .clients .producer .ProducerRecord ;
2727import org .apache .kafka .clients .producer .RecordMetadata ;
28+ import org .apache .kafka .common .IsolationLevel ;
2829import org .apache .kafka .common .PartitionInfo ;
2930import org .apache .kafka .common .TopicPartition ;
3031import org .apache .kafka .common .errors .TimeoutException ;
32+ import org .apache .kafka .common .serialization .ByteArrayDeserializer ;
3133import org .apache .kafka .common .serialization .ByteArraySerializer ;
3234import org .apache .kafka .common .serialization .Deserializer ;
3335import org .apache .kafka .common .serialization .Serde ;
@@ -92,6 +94,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
9294 }
9395
9496 private static final int MAX_RECORD_EMPTY_RETRIES = 30 ;
97+ private static final long MAX_IDLE_TIME_MS = 600000L ;
9598
9699 private static class ValueList {
97100 public final String key ;
@@ -372,33 +375,52 @@ public Number deserialize(final String topic, final byte[] data) {
372375 }
373376 }
374377
375- public static VerificationResult verify (final String kafka ,
376- final Map <String , Set <Integer >> inputs ,
377- final int maxRecordsPerKey ,
378- final boolean eosEnabled ) {
379- final Properties props = new Properties ();
380- props .put (ConsumerConfig .CLIENT_ID_CONFIG , "verifier" );
381- props .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , kafka );
382- props .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class );
383- props .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , NumberDeserializer .class );
384- props .put (ConsumerConfig .ISOLATION_LEVEL_CONFIG , "read_committed" );
378+ private static class PollResult {
379+ final Map <String , Map <String , LinkedList <ConsumerRecord <String , Number >>>> events ;
380+ final int recordsProcessed ;
381+ final VerificationResult verificationResult ;
382+
383+ PollResult (final Map <String , Map <String , LinkedList <ConsumerRecord <String , Number >>>> events ,
384+ final int recordsProcessed ,
385+ final VerificationResult verificationResult ) {
386+ this .events = events ;
387+ this .recordsProcessed = recordsProcessed ;
388+ this .verificationResult = verificationResult ;
389+ }
390+ }
385391
386- final KafkaConsumer <String , Number > consumer = new KafkaConsumer <>(props );
387- final List <TopicPartition > partitions = getAllPartitions (consumer , NUMERIC_VALUE_TOPICS );
388- consumer .assign (partitions );
389- consumer .seekToBeginning (partitions );
392+ private static VerificationResult preVerifyTransactions (final String kafka , final boolean eosEnabled ) {
393+ if (!eosEnabled ) {
394+ return new VerificationResult (true , "EOS is disabled; skipping transaction verification" );
395+ }
396+
397+ final VerificationResult txnResult = verifyAllTransactionFinished (kafka );
398+ if (!txnResult .passed ()) {
399+ System .err .println ("Transaction verification failed: " + txnResult .result ());
400+ System .out .println ("FAILED" );
401+ }
402+ return txnResult ;
403+ }
390404
405+ private static PollResult pollAndCollect (
406+ final KafkaConsumer <String , Number > consumer ,
407+ final Map <String , Set <Integer >> inputs ,
408+ final int maxRecordsPerKey ,
409+ final boolean eosEnabled ) {
391410 final int recordsGenerated = inputs .size () * maxRecordsPerKey ;
411+ final Map <String , Map <String , LinkedList <ConsumerRecord <String , Number >>>> events = new HashMap <>();
412+ VerificationResult verificationResult = new VerificationResult (false , "no results yet" );
413+ final long start = System .currentTimeMillis ();
392414 int recordsProcessed = 0 ;
415+
416+ final List <TopicPartition > partitions = getAllPartitions (consumer , NUMERIC_VALUE_TOPICS );
417+ consumer .assign (partitions );
418+ consumer .seekToBeginning (partitions );
393419 final Map <String , AtomicInteger > processed =
394420 Stream .of (NUMERIC_VALUE_TOPICS )
395421 .collect (Collectors .toMap (t -> t , t -> new AtomicInteger (0 )));
396422
397- final Map <String , Map <String , LinkedList <ConsumerRecord <String , Number >>>> events = new HashMap <>();
398-
399- VerificationResult verificationResult = new VerificationResult (false , "no results yet" );
400423 int retry = 0 ;
401- final long start = System .currentTimeMillis ();
402424 while (System .currentTimeMillis () - start < TimeUnit .MINUTES .toMillis (6 )) {
403425 final ConsumerRecords <String , Number > records = consumer .poll (Duration .ofSeconds (5 ));
404426 if (records .isEmpty () && recordsProcessed >= recordsGenerated ) {
@@ -436,23 +458,32 @@ public static VerificationResult verify(final String kafka,
436458 System .out .println (processed );
437459 }
438460 }
439- consumer .close ();
440461
441- final long finished = System .currentTimeMillis () - start ;
462+ return new PollResult (events , recordsProcessed , verificationResult );
463+ }
464+
465+ private static VerificationResult reportAndFinalize (
466+ final Map <String , Set <Integer >> inputs ,
467+ final int maxRecordsPerKey ,
468+ final long startTime ,
469+ final boolean eosEnabled ,
470+ final PollResult pollResult ) {
471+ final int recordsGenerated = inputs .size () * maxRecordsPerKey ;
472+ final long finished = System .currentTimeMillis () - startTime ;
442473 System .out .println ("Verification time=" + finished );
443474 System .out .println ("-------------------" );
444475 System .out .println ("Result Verification" );
445476 System .out .println ("-------------------" );
446477 System .out .println ("recordGenerated=" + recordsGenerated );
447- System .out .println ("recordProcessed=" + recordsProcessed );
478+ System .out .println ("recordProcessed=" + pollResult . recordsProcessed );
448479
449- if (recordsProcessed > recordsGenerated ) {
480+ if (pollResult . recordsProcessed > recordsGenerated ) {
450481 System .out .println ("PROCESSED-MORE-THAN-GENERATED" );
451- } else if (recordsProcessed < recordsGenerated ) {
482+ } else if (pollResult . recordsProcessed < recordsGenerated ) {
452483 System .out .println ("PROCESSED-LESS-THAN-GENERATED" );
453484 }
454485
455- final Map <String , Set <Number >> received = parseRecordsForEchoTopic (events );
486+ final Map <String , Set <Number >> received = parseRecordsForEchoTopic (pollResult . events );
456487
457488 boolean success = inputs .equals (received );
458489
@@ -466,9 +497,10 @@ public static VerificationResult verify(final String kafka,
466497 System .out .println ("missedRecords=" + missedCount );
467498 }
468499
500+ VerificationResult verificationResult = pollResult .verificationResult ;
469501 // give it one more try if it's not already passing.
470502 if (!verificationResult .passed ()) {
471- verificationResult = verifyAll (inputs , events , true , eosEnabled );
503+ verificationResult = verifyAll (inputs , pollResult . events , true , eosEnabled );
472504 }
473505 success &= verificationResult .passed ();
474506
@@ -478,6 +510,29 @@ public static VerificationResult verify(final String kafka,
478510 return verificationResult ;
479511 }
480512
513+ public static VerificationResult verify (final String kafka ,
514+ final Map <String , Set <Integer >> inputs ,
515+ final int maxRecordsPerKey ,
516+ final boolean eosEnabled ) {
517+ final VerificationResult txnResult = preVerifyTransactions (kafka , eosEnabled );
518+ if (!txnResult .passed ()) {
519+ return txnResult ;
520+ }
521+
522+ final Properties props = new Properties ();
523+ props .put (ConsumerConfig .CLIENT_ID_CONFIG , "verifier" );
524+ props .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , kafka );
525+ props .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class );
526+ props .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , NumberDeserializer .class );
527+ props .put (ConsumerConfig .ISOLATION_LEVEL_CONFIG , "read_committed" );
528+
529+ final long start = System .currentTimeMillis ();
530+ try (final KafkaConsumer <String , Number > consumer = new KafkaConsumer <>(props )) {
531+ final PollResult pollResult = pollAndCollect (consumer , inputs , maxRecordsPerKey , eosEnabled );
532+ return reportAndFinalize (inputs , maxRecordsPerKey , start , eosEnabled , pollResult );
533+ }
534+ }
535+
481536 private static Map <String , Set <Number >> parseRecordsForEchoTopic (
482537 final Map <String , Map <String , LinkedList <ConsumerRecord <String , Number >>>> events ) {
483538 return events .containsKey ("echo" ) ?
@@ -491,24 +546,6 @@ private static Map<String, Set<Number>> parseRecordsForEchoTopic(
491546 .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue )) : Collections .emptyMap ();
492547 }
493548
494- public static class VerificationResult {
495- private final boolean passed ;
496- private final String result ;
497-
498- VerificationResult (final boolean passed , final String result ) {
499- this .passed = passed ;
500- this .result = result ;
501- }
502-
503- public boolean passed () {
504- return passed ;
505- }
506-
507- public String result () {
508- return result ;
509- }
510- }
511-
512549 private static VerificationResult verifyAll (final Map <String , Set <Integer >> inputs ,
513550 final Map <String , Map <String , LinkedList <ConsumerRecord <String , Number >>>> events ,
514551 final boolean printResults ,
@@ -732,4 +769,65 @@ private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> c
732769 return partitions ;
733770 }
734771
772+ private static Properties createConsumerPropsWithByteDeserializer (final String kafka , final String clientId ) {
773+ final Properties props = new Properties ();
774+ props .put (ConsumerConfig .CLIENT_ID_CONFIG , clientId );
775+ props .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , kafka );
776+ props .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , ByteArrayDeserializer .class );
777+ props .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , ByteArrayDeserializer .class );
778+ return props ;
779+ }
780+
781+ private static VerificationResult verifyAllTransactionFinished (final String kafka ) {
782+ final Properties txnProps = createConsumerPropsWithByteDeserializer (kafka , "verifier" );
783+ txnProps .put (ConsumerConfig .ISOLATION_LEVEL_CONFIG , IsolationLevel .READ_COMMITTED .toString ());
784+ try (final KafkaConsumer <byte [], byte []> consumer = new KafkaConsumer <>(txnProps )) {
785+ // Get all output topics except "data" (which is the input topic)
786+ final String [] outputTopics = Arrays .stream (NUMERIC_VALUE_TOPICS )
787+ .filter (topic -> !topic .equals ("data" ))
788+ .toArray (String []::new );
789+
790+ final List <TopicPartition > partitions = getAllPartitions (consumer , outputTopics );
791+ consumer .assign (partitions );
792+ consumer .seekToEnd (partitions );
793+ for (final TopicPartition tp : partitions ) {
794+ System .out .println (tp + " at position " + consumer .position (tp ));
795+ }
796+ final Properties consumerProps = createConsumerPropsWithByteDeserializer (kafka , "consumer-uncommitted" );
797+
798+ final long maxWaitTime = System .currentTimeMillis () + MAX_IDLE_TIME_MS ;
799+ try (final KafkaConsumer <byte [], byte []> consumerUncommitted = new KafkaConsumer <>(consumerProps )) {
800+ while (!partitions .isEmpty () && System .currentTimeMillis () < maxWaitTime ) {
801+ consumer .seekToEnd (partitions );
802+ final Map <TopicPartition , Long > topicEndOffsets = consumerUncommitted .endOffsets (partitions );
803+
804+ final java .util .Iterator <TopicPartition > iterator = partitions .iterator ();
805+ while (iterator .hasNext ()) {
806+ final TopicPartition topicPartition = iterator .next ();
807+ final long position = consumer .position (topicPartition );
808+
809+ if (position == topicEndOffsets .get (topicPartition )) {
810+ iterator .remove ();
811+ System .out .println ("Removing " + topicPartition + " at position " + position );
812+ } else if (position > topicEndOffsets .get (topicPartition )) {
813+ return new VerificationResult (false , "Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets .get (topicPartition ));
814+ } else {
815+ System .out .println ("Retry " + topicPartition + " at position " + position );
816+ }
817+ }
818+ sleep (1000L );
819+ }
820+ }
821+
822+ if (!partitions .isEmpty ()) {
823+ return new VerificationResult (false , "Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L ) + " sec." );
824+ }
825+ return new VerificationResult (true , "All transactions finished successfully" );
826+ } catch (final Exception e ) {
827+ e .printStackTrace (System .err );
828+ System .out .println ("FAILED" );
829+ return new VerificationResult (false , "Transaction verification failed: " + e .getMessage ());
830+ }
831+ }
832+
735833}
0 commit comments