diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java index a8616d2d4bb..0e56bd9d758 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java @@ -25,12 +25,14 @@ public class ApacheKafkaContainerCluster implements Startable { private final Collection brokers; public ApacheKafkaContainerCluster(String version, int brokersNum, int internalTopicsRf) { - if (brokersNum < 0) { + if (brokersNum <= 0) { throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); } - if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) { + if (internalTopicsRf <= 0 || internalTopicsRf > brokersNum) { throw new IllegalArgumentException( - "internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0" + "internalTopicsRf '" + + internalTopicsRf + + "' must be less than or equal to brokersNum and greater than 0" ); } @@ -99,6 +101,6 @@ public void start() { @Override public void stop() { - this.brokers.stream().parallel().forEach(GenericContainer::stop); + this.brokers.parallelStream().forEach(GenericContainer::stop); } } diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerCluster.java index 222050c76c0..f09c3072ea3 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerCluster.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerCluster.java @@ -25,12 +25,14 @@ public class ConfluentKafkaContainerCluster implements Startable { private final Collection brokers; public ConfluentKafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) { - if (brokersNum < 0) { + if (brokersNum <= 0) { throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); } - if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) { + if (internalTopicsRf <= 0 || internalTopicsRf > brokersNum) { throw new IllegalArgumentException( - "internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0" + "internalTopicsRf '" + + internalTopicsRf + + "' must be less than or equal to brokersNum and greater than 0" ); } @@ -100,6 +102,6 @@ public void start() { @Override public void stop() { - this.brokers.stream().parallel().forEach(GenericContainer::stop); + this.brokers.parallelStream().forEach(GenericContainer::stop); } } diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java index 403d087af3e..aa22cceadb0 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java @@ -31,12 +31,14 @@ public class KafkaContainerCluster implements Startable { private final Collection brokers; public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) { - if (brokersNum < 0) { + if (brokersNum <= 0) { throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); } - if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) { + if (internalTopicsRf <= 0 || internalTopicsRf > brokersNum) { throw new IllegalArgumentException( - "internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0" + "internalTopicsRf '" + + internalTopicsRf + + "' must be less than or equal to brokersNum and greater than 0" ); } diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java index f42e3c5b1ba..264f751ecc6 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java @@ -25,12 +25,14 @@ public class KafkaContainerKraftCluster implements Startable { private final Collection brokers; public KafkaContainerKraftCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) { - if (brokersNum < 0) { + if (brokersNum <= 0) { throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); } - if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) { + if (internalTopicsRf <= 0 || internalTopicsRf > brokersNum) { throw new IllegalArgumentException( - "internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0" + "internalTopicsRf '" + + internalTopicsRf + + "' must be less than or equal to brokersNum and greater than 0" ); } @@ -101,6 +103,6 @@ public void start() { @Override public void stop() { - this.brokers.stream().parallel().forEach(GenericContainer::stop); + this.brokers.parallelStream().forEach(GenericContainer::stop); } }