Skip to content

Commit 500bd70

Browse files
lucasbrumjsax
authored andcommitted
KAFKA-19429: Deflake streams_smoke_test, again (#20070)
It looks like we are checking for properties that are not guaranteed under at_least_once, for example, exact counting (not allowing for overcounting). This change relaxes the validation constraint: The TAGG topic contains effectively count-by-count results. So for example, if we have the input without duplication 0 -> 1,2,3 we will get in TAGG 3 -> 1, since 1 key had 3 values. with duplication: 0 -> 1,1,2,3 we will get in TAGG 4 -> 1, since 1 key had 4 values. This makes the result difficult to compare. Since we run the smoke test also with Exactly_Once, I propose to disable validation off TAGG under ALOS. Similarly, the topic AVG may overcount or undercount. The test case is extremely similar to DIF, both performing a join and two streams, the only difference being the mathematical operation performed, so we can also disable this validation under ALOS with minimal loss of coverage. Finally, the change fixes a bug that would throw a NPE when validation of a windowed stream would fail. Reviewers: Kirk True <[email protected]>, Matthias J. Sax <[email protected]>
1 parent abeebb3 commit 500bd70

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,11 @@ private static VerificationResult verifyAll(final Map<String, Set<Integer>> inpu
522522
}
523523
boolean pass;
524524
try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
525-
pass = verifyTAgg(resultStream, inputs, events.get("tagg"), validationPredicate, printResults);
525+
pass = true;
526+
if (eosEnabled) {
527+
// TAGG is computing "Count-by-count", which may produce keys that are not in the input data in ALOS, so we skip validation in this case.
528+
pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
529+
}
526530
pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
527531
pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
528532
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
@@ -534,7 +538,10 @@ private static VerificationResult verifyAll(final Map<String, Set<Integer>> inpu
534538
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), Object::equals, printResults, eosEnabled);
535539
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, validationPredicate, printResults, eosEnabled);
536540
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, validationPredicate, printResults, eosEnabled);
537-
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, validationPredicate, printResults, eosEnabled);
541+
if (eosEnabled) {
542+
// Average can overcount and undercount in ALOS, so we skip validation in that case.
543+
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, Object::equals, printResults, eosEnabled);
544+
}
538545
}
539546
return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
540547
}
@@ -580,7 +587,7 @@ private static boolean verify(final PrintStream resultStream,
580587
if (printResults) {
581588
resultStream.printf("\t inputEvents=%n%s%n\t" +
582589
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
583-
indent("\t\t", observedInputEvents.get(key)),
590+
indent("\t\t", observedInputEvents.getOrDefault(key, new LinkedList<>())),
584591
indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
585592
indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
586593
indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
@@ -662,7 +669,6 @@ private static Double getAvg(final String key) {
662669
private static boolean verifyTAgg(final PrintStream resultStream,
663670
final Map<String, Set<Integer>> allData,
664671
final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
665-
final BiPredicate<Number, Number> validationPredicate,
666672
final boolean printResults) {
667673
resultStream.println("verifying topic tagg");
668674
if (taggEvents == null) {
@@ -694,7 +700,7 @@ private static boolean verifyTAgg(final PrintStream resultStream,
694700
expectedCount = 0L;
695701
}
696702

697-
if (!validationPredicate.test(expectedCount, entry.getValue().getLast().value())) {
703+
if (entry.getValue().getLast().value().longValue() != expectedCount) {
698704
resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
699705

700706
if (printResults)

0 commit comments

Comments
 (0)