Skip to content

Commit 4871810

Browse files
authored
Fix Kafka integration tests to not reuse message ids. (#291)
* Fix Kafka integration tests to not reuse message ids. * Fix checkstyle.
1 parent d97819f commit 4871810

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

src/test/java/com/arpnetworking/metrics/mad/sources/integration/KafkaSourceIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import java.util.concurrent.ExecutionException;
6565
import java.util.concurrent.TimeUnit;
6666
import java.util.concurrent.TimeoutException;
67+
import java.util.concurrent.atomic.AtomicInteger;
6768
import java.util.stream.Collectors;
6869

6970
/**
@@ -80,6 +81,7 @@ public class KafkaSourceIT {
8081
private static final Duration POLL_DURATION = Duration.ofSeconds(1);
8182
private static final int TIMEOUT = 10000;
8283
private static final int NUM_RECORDS = 500;
84+
private static final AtomicInteger RECORD_ID = new AtomicInteger(0);
8385

8486
private Map<String, Object> _consumerProps;
8587
private KafkaConsumer<Integer, String> _consumer;
@@ -268,7 +270,8 @@ private static void createTopic(final String topicName) throws TimeoutException
268270
private static List<ProducerRecord<Integer, String>> createProducerRecords(final String topic, final int num) {
269271
final List<ProducerRecord<Integer, String>> records = new ArrayList<>();
270272
for (int i = 0; i < num; i++) {
271-
records.add(new ProducerRecord<>(topic, i, "value" + i));
273+
final int id = RECORD_ID.getAndIncrement();
274+
records.add(new ProducerRecord<>(topic, id, "value" + id));
272275
}
273276
return records;
274277
}

0 commit comments

Comments
 (0)