Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -56,6 +57,7 @@ public abstract class TargetSystemIntegrationTest {

private static Network network;
private static OtlpGrpcServer otlpServer;
private Collection<GenericContainer<?>> prerequisiteContainers;
private GenericContainer<?> target;
private JmxScraperContainer scraper;

Expand Down Expand Up @@ -86,12 +88,23 @@ static void afterAll() {

@AfterEach
void afterEach() {
if (scraper != null && scraper.isRunning()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] I rearranged containers shutdown sequence to be in reverse order than starting to avoid unnecessary errors in the logs

scraper.stop();
}

if (target != null && target.isRunning()) {
target.stop();
}
if (scraper != null && scraper.isRunning()) {
scraper.stop();

if (prerequisiteContainers != null) {
prerequisiteContainers.forEach(
container -> {
if (container.isRunning()) {
container.stop();
}
});
}

if (otlpServer != null) {
otlpServer.reset();
}
Expand All @@ -103,14 +116,31 @@ protected String scraperBaseImage() {

@Test
void endToEndTest(@TempDir Path tmpDir) {
startContainers(tmpDir);
verifyMetrics();
}

protected void startContainers(Path tmpDir) {
prerequisiteContainers = createPrerequisiteContainers();

target =
createTargetContainer(JMX_PORT)
.withLogConsumer(new Slf4jLogConsumer(targetSystemLogger))
.withNetwork(network)
.withNetworkAliases(TARGET_SYSTEM_NETWORK_ALIAS);

// If there are any containers that must be started before target then initialize them.
// Then make target depending on them, so it is started after dependencies
for (GenericContainer<?> container : prerequisiteContainers) {
container.withNetwork(network);
target.dependsOn(container);
}

// Target container must be running before scraper container is customized.
// It is necessary to allow interactions with the container, like file copying etc.
target.start();

// Create and initialize scraper container
scraper =
new JmxScraperContainer(otlpEndpoint, scraperBaseImage())
.withLogConsumer(new Slf4jLogConsumer(jmxScraperLogger))
Expand All @@ -119,14 +149,13 @@ void endToEndTest(@TempDir Path tmpDir) {

scraper = customizeScraperContainer(scraper, target, tmpDir);
scraper.start();

verifyMetrics();
}

protected void verifyMetrics() {
MetricsVerifier metricsVerifier = createMetricsVerifier();
await()
.atMost(Duration.ofSeconds(60))
.pollInterval(Duration.ofSeconds(1))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] Just to decrease CPU usage a bit if we do not have all metrics available immediately.

.untilAsserted(
() -> {
List<ExportMetricsServiceRequest> receivedMetrics = otlpServer.getMetrics();
Expand Down Expand Up @@ -158,6 +187,10 @@ protected JmxScraperContainer customizeScraperContainer(
return scraper;
}

protected Collection<GenericContainer<?>> createPrerequisiteContainers() {
return Collections.emptyList();
}

private static class OtlpGrpcServer extends ServerExtension {

private final BlockingQueue<ExportMetricsServiceRequest> metricRequests =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.jmxscraper.target_systems.kafka;

import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeWithAnyValue;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaConsumerContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaProducerContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer;

import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer;
import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier;
import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;

public class KafkaConsumerIntegrationTest extends TargetSystemIntegrationTest {

@Override
protected Collection<GenericContainer<?>> createPrerequisiteContainers() {
GenericContainer<?> zookeeper =
createZookeeperContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper")))
.withNetworkAliases("zookeeper");

GenericContainer<?> kafka =
createKafkaContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka")))
.withNetworkAliases("kafka")
.dependsOn(zookeeper);

GenericContainer<?> kafkaProducer =
createKafkaProducerContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka-producer")))
.withNetworkAliases("kafka-producer")
.dependsOn(kafka);

return Arrays.asList(zookeeper, kafka, kafkaProducer);
}

@Override
protected GenericContainer<?> createTargetContainer(int jmxPort) {
return createKafkaConsumerContainer()
.withEnv("JMX_PORT", Integer.toString(jmxPort))
.withExposedPorts(jmxPort)
.waitingFor(Wait.forListeningPorts(jmxPort));
}

@Override
protected JmxScraperContainer customizeScraperContainer(
JmxScraperContainer scraper, GenericContainer<?> target, Path tempDir) {
return scraper.withTargetSystem("kafka-consumer");
}

@Override
protected MetricsVerifier createMetricsVerifier() {
return MetricsVerifier.create()
.add(
"kafka.consumer.fetch-rate",
metric ->
metric
.hasDescription("The number of fetch requests for all topics per second")
.hasUnit("{request}")
.isGauge()
.hasDataPointsWithOneAttribute(
attributeWithAnyValue("client.id"))) // changed to follow semconv
.add(
"kafka.consumer.records-lag-max",
metric ->
metric
.hasDescription("Number of messages the consumer lags behind the producer")
.hasUnit("{message}")
.isGauge()
.hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id")))
.add(
"kafka.consumer.total.bytes-consumed-rate",
metric ->
metric
.hasDescription(
"The average number of bytes consumed for all topics per second")
.hasUnit("By")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] Changed from 'by' to follow semconv

.isGauge()
.hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id")))
.add(
"kafka.consumer.total.fetch-size-avg",
metric ->
metric
.hasDescription(
"The average number of bytes fetched per request for all topics")
.hasUnit("By")
.isGauge()
.hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id")))
.add(
"kafka.consumer.total.records-consumed-rate",
metric ->
metric
.hasDescription(
"The average number of records consumed for all topics per second")
.hasUnit("{record}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] Multiple units in this file have been changed from "1" to annotated form to follow semconv

.isGauge()
.hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id")))
.add(
"kafka.consumer.bytes-consumed-rate",
metric ->
metric
.hasDescription("The average number of bytes consumed per second")
.hasUnit("By")
.isGauge()
.hasDataPointsWithAttributes(
attributeGroup(
attributeWithAnyValue("client.id"),
attribute("topic", "test-topic-1"))))
.add(
"kafka.consumer.fetch-size-avg",
metric ->
metric
.hasDescription("The average number of bytes fetched per request")
.hasUnit("By")
.isGauge()
.hasDataPointsWithAttributes(
attributeGroup(
attributeWithAnyValue("client.id"),
attribute("topic", "test-topic-1"))))
.add(
"kafka.consumer.records-consumed-rate",
metric ->
metric
.hasDescription("The average number of records consumed per second")
.hasUnit("{record}")
.isGauge()
.hasDataPointsWithAttributes(
attributeGroup(
attributeWithAnyValue("client.id"),
attribute("topic", "test-topic-1"))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.jmxscraper.target_systems.kafka;

import java.time.Duration;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

public class KafkaContainerFactory {
private static final int KAFKA_PORT = 9092;
private static final String KAFKA_BROKER = "kafka:" + KAFKA_PORT;
private static final String KAFKA_DOCKER_IMAGE = "bitnami/kafka:2.8.1";

private KafkaContainerFactory() {}

public static GenericContainer<?> createZookeeperContainer() {
return new GenericContainer<>("zookeeper:3.5")
.withStartupTimeout(Duration.ofMinutes(2))
.waitingFor(Wait.forListeningPort());
}

public static GenericContainer<?> createKafkaContainer() {
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
.withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181")
.withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") // Removed in 3.5.1
.withStartupTimeout(Duration.ofMinutes(2))
.withExposedPorts(KAFKA_PORT)
// .waitingFor(Wait.forListeningPorts(KAFKA_PORT));
.waitingFor(
Wait.forLogMessage(".*KafkaServer.*started \\(kafka.server.KafkaServer\\).*", 1));
}

public static GenericContainer<?> createKafkaProducerContainer() {
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
// .withCopyFileToContainer(
// MountableFile.forClasspathResource("kafka-producer.sh"),
// "/usr/bin/kafka-producer.sh")
// .withCommand("/usr/bin/kafka-producer.sh")
.withCommand(
"sh",
"-c",
"echo 'Sending messages to test-topic-1'; "
+ "i=1; while true; do echo \"Message $i\"; sleep .25; i=$((i+1)); done | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server "
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] Topic is automatically created when messages are sent to it - no need to manually create it

+ KAFKA_BROKER
+ " --topic test-topic-1;")
.withStartupTimeout(Duration.ofMinutes(2))
.waitingFor(Wait.forLogMessage(".*Welcome to the Bitnami kafka container.*", 1));
}

public static GenericContainer<?> createKafkaConsumerContainer() {
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
.withCommand(
"kafka-console-consumer.sh",
"--bootstrap-server",
KAFKA_BROKER,
"--whitelist",
"test-topic-.*",
"--max-messages",
"100")
.withStartupTimeout(Duration.ofMinutes(2))
.waitingFor(Wait.forListeningPort());
}
}
Loading
Loading