diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java new file mode 100644 index 00000000000..a8616d2d4bb --- /dev/null +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java @@ -0,0 +1,104 @@ +package com.example.kafkacluster; + +import org.apache.kafka.common.Uuid; +import org.awaitility.Awaitility; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.kafka.KafkaContainer; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ApacheKafkaContainerCluster implements Startable { + + private final int brokersNum; + + private final Network network; + + private final Collection brokers; + + public ApacheKafkaContainerCluster(String version, int brokersNum, int internalTopicsRf) { + if (brokersNum < 0) { + throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); + } + if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) { + throw new IllegalArgumentException( + "internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0" + ); + } + + this.brokersNum = brokersNum; + this.network = Network.newNetwork(); + + String controllerQuorumVoters = IntStream + .range(0, brokersNum) + .mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum)) + .collect(Collectors.joining(",")); + + String clusterId = Uuid.randomUuid().toString(); + + this.brokers = + IntStream + .range(0, brokersNum) + .mapToObj(brokerNum -> { + return new KafkaContainer(DockerImageName.parse("apache/kafka").withTag(version)) + .withNetwork(this.network) + .withNetworkAliases("broker-" + brokerNum) + .withEnv("CLUSTER_ID", clusterId) + .withEnv("KAFKA_BROKER_ID", brokerNum + "") + .withEnv("KAFKA_NODE_ID", brokerNum + "") + .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters) + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0") + .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "") + .withStartupTimeout(Duration.ofMinutes(1)); + }) + .collect(Collectors.toList()); + } + + public Collection getBrokers() { + return this.brokers; + } + + public String getBootstrapServers() { + return brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(",")); + } + + @Override + public void start() { + // Needs to start all the brokers at once + brokers.parallelStream().forEach(GenericContainer::start); + + Awaitility + .await() + .atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + Container.ExecResult result = + this.brokers.stream() + .findFirst() + .get() + .execInContainer( + "sh", + "-c", + "/opt/kafka/bin/kafka-log-dirs.sh --bootstrap-server localhost:9093 --describe | grep -o '\"broker\"' | wc -l" + ); + String brokers = result.getStdout().replace("\n", ""); + + assertThat(brokers).asInt().isEqualTo(this.brokersNum); + }); + } + + @Override + public void stop() { + this.brokers.stream().parallel().forEach(GenericContainer::stop); + } +} diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerClusterTest.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerClusterTest.java new file mode 100644 index 00000000000..38ac274706b --- /dev/null +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerClusterTest.java @@ -0,0 +1,94 @@ +package com.example.kafkacluster; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.tuple; + +class ApacheKafkaContainerClusterTest { + + @Test + void testKafkaContainerCluster() throws Exception { + try (ApacheKafkaContainerCluster cluster = new ApacheKafkaContainerCluster("3.8.0", 3, 2)) { + cluster.start(); + String bootstrapServers = cluster.getBootstrapServers(); + + assertThat(cluster.getBrokers()).hasSize(3); + + testKafkaFunctionality(bootstrapServers, 3, 2); + } + } + + protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception { + try ( + AdminClient adminClient = AdminClient.create( + ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + ); + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapServers, + ProducerConfig.CLIENT_ID_CONFIG, + UUID.randomUUID().toString() + ), + new StringSerializer(), + new StringSerializer() + ); + KafkaConsumer consumer = new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG, + "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest" + ), + new StringDeserializer(), + new StringDeserializer() + ); + ) { + String topicName = "messages"; + + Collection topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList(topicName)); + + producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); + + Awaitility + .await() + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); + }); + + consumer.unsubscribe(); + } + } +} diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerCluster.java new file mode 100644 index 00000000000..222050c76c0 --- /dev/null +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerCluster.java @@ -0,0 +1,105 @@ +package com.example.kafkacluster; + +import org.apache.kafka.common.Uuid; +import org.awaitility.Awaitility; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ConfluentKafkaContainerCluster implements Startable { + + private final int brokersNum; + + private final Network network; + + private final Collection brokers; + + public ConfluentKafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) { + if (brokersNum < 0) { + throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); + } + if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) { + throw new IllegalArgumentException( + "internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0" + ); + } + + this.brokersNum = brokersNum; + this.network = Network.newNetwork(); + + String controllerQuorumVoters = IntStream + .range(0, brokersNum) + .mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum)) + .collect(Collectors.joining(",")); + + String clusterId = Uuid.randomUuid().toString(); + + this.brokers = + IntStream + .range(0, brokersNum) + .mapToObj(brokerNum -> { + return new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion) + ) + .withNetwork(this.network) + .withNetworkAliases("broker-" + brokerNum) + .withEnv("CLUSTER_ID", clusterId) + .withEnv("KAFKA_BROKER_ID", brokerNum + "") + .withEnv("KAFKA_NODE_ID", brokerNum + "") + .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters) + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "") + .withStartupTimeout(Duration.ofMinutes(1)); + }) + .collect(Collectors.toList()); + } + + public Collection getBrokers() { + return this.brokers; + } + + public String getBootstrapServers() { + return brokers.stream().map(ConfluentKafkaContainer::getBootstrapServers).collect(Collectors.joining(",")); + } + + @Override + public void start() { + // Needs to start all the brokers at once + brokers.parallelStream().forEach(GenericContainer::start); + + Awaitility + .await() + .atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + Container.ExecResult result = + this.brokers.stream() + .findFirst() + .get() + .execInContainer( + "sh", + "-c", + "kafka-metadata-shell --snapshot /var/lib/kafka/data/__cluster_metadata-0/00000000000000000000.log ls /brokers | wc -l" + ); + String brokers = result.getStdout().replace("\n", ""); + + assertThat(brokers).asInt().isEqualTo(this.brokersNum); + }); + } + + @Override + public void stop() { + this.brokers.stream().parallel().forEach(GenericContainer::stop); + } +} diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerClusterTest.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerClusterTest.java new file mode 100644 index 00000000000..3bb38cb7152 --- /dev/null +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerClusterTest.java @@ -0,0 +1,94 @@ +package com.example.kafkacluster; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.tuple; + +class ConfluentKafkaContainerClusterTest { + + @Test + void testKafkaContainerCluster() throws Exception { + try (ConfluentKafkaContainerCluster cluster = new ConfluentKafkaContainerCluster("7.4.0", 3, 2)) { + cluster.start(); + String bootstrapServers = cluster.getBootstrapServers(); + + assertThat(cluster.getBrokers()).hasSize(3); + + testKafkaFunctionality(bootstrapServers, 3, 2); + } + } + + protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception { + try ( + AdminClient adminClient = AdminClient.create( + ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + ); + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapServers, + ProducerConfig.CLIENT_ID_CONFIG, + UUID.randomUUID().toString() + ), + new StringSerializer(), + new StringSerializer() + ); + KafkaConsumer consumer = new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG, + "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest" + ), + new StringDeserializer(), + new StringDeserializer() + ); + ) { + String topicName = "messages"; + + Collection topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList(topicName)); + + producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); + + Awaitility + .await() + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); + }); + + consumer.unsubscribe(); + } + } +} diff --git a/modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java index 8c2cce855bc..42942cdd3a5 100644 --- a/modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java @@ -44,9 +44,6 @@ public ConfluentKafkaContainer(DockerImageName dockerImageName) { @Override protected void configure() { KafkaHelper.resolveListeners(this, this.listeners); - - String controllerQuorumVoters = String.format("%s@localhost:9094", getEnvMap().get("KAFKA_NODE_ID")); - withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters); } @Override diff --git a/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java index 63e3a0d35cf..e946e0a8992 100644 --- a/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java @@ -2,7 +2,6 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.DockerImageName; @@ -44,16 +43,13 @@ public KafkaContainer(DockerImageName dockerImageName) { withExposedPorts(KAFKA_PORT); withEnv(KafkaHelper.envVars()); - withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT); - waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1)); + withCommand(KafkaHelper.COMMAND); + waitingFor(KafkaHelper.WAIT_STRATEGY); } @Override protected void configure() { KafkaHelper.resolveListeners(this, this.listeners); - - String controllerQuorumVoters = String.format("%s@localhost:9094", getEnvMap().get("KAFKA_NODE_ID")); - withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters); } @Override diff --git a/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java b/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java index 00b10a4d519..61e790d474f 100644 --- a/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java +++ b/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java @@ -50,6 +50,10 @@ static Map envVars() { envVars.put("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER"); envVars.put("KAFKA_NODE_ID", "1"); + + String controllerQuorumVoters = String.format("%s@localhost:9094", envVars.get("KAFKA_NODE_ID")); + envVars.put("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters); + envVars.put("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); envVars.put("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); envVars.put("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);