Skip to content

Commit 65b4f3b

Browse files
committed
Kafka broker, producer and consumer YAMLs and tests added
1 parent 6069c1d commit 65b4f3b

File tree

9 files changed

+962
-5
lines changed

9 files changed

+962
-5
lines changed

jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Duration;
2323
import java.util.ArrayList;
2424
import java.util.Collection;
25+
import java.util.Collections;
2526
import java.util.List;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ExecutionException;
@@ -56,6 +57,7 @@ public abstract class TargetSystemIntegrationTest {
5657

5758
private static Network network;
5859
private static OtlpGrpcServer otlpServer;
60+
private Collection<GenericContainer<?>> prerequisiteContainers;
5961
private GenericContainer<?> target;
6062
private JmxScraperContainer scraper;
6163

@@ -86,12 +88,23 @@ static void afterAll() {
8688

8789
@AfterEach
8890
void afterEach() {
91+
if (scraper != null && scraper.isRunning()) {
92+
scraper.stop();
93+
}
94+
8995
if (target != null && target.isRunning()) {
9096
target.stop();
9197
}
92-
if (scraper != null && scraper.isRunning()) {
93-
scraper.stop();
98+
99+
if (prerequisiteContainers != null) {
100+
prerequisiteContainers.forEach(
101+
container -> {
102+
if (container.isRunning()) {
103+
container.stop();
104+
}
105+
});
94106
}
107+
95108
if (otlpServer != null) {
96109
otlpServer.reset();
97110
}
@@ -103,14 +116,31 @@ protected String scraperBaseImage() {
103116

104117
@Test
105118
void endToEndTest(@TempDir Path tmpDir) {
119+
startContainers(tmpDir);
120+
verifyMetrics();
121+
}
122+
123+
protected void startContainers(Path tmpDir) {
124+
prerequisiteContainers = createPrerequisiteContainers();
106125

107126
target =
108127
createTargetContainer(JMX_PORT)
109128
.withLogConsumer(new Slf4jLogConsumer(targetSystemLogger))
110129
.withNetwork(network)
111130
.withNetworkAliases(TARGET_SYSTEM_NETWORK_ALIAS);
131+
132+
// If there are any containers that must be started before target then initialize them.
133+
// Then make target depending on them, so it is started after dependencies
134+
for (GenericContainer<?> container : prerequisiteContainers) {
135+
container.withNetwork(network);
136+
target.dependsOn(container);
137+
}
138+
139+
// Target container must be running before scraper container is customized.
140+
// It is necessary to allow interactions with the container, like file copying etc.
112141
target.start();
113142

143+
// Create and initialize scraper container
114144
scraper =
115145
new JmxScraperContainer(otlpEndpoint, scraperBaseImage())
116146
.withLogConsumer(new Slf4jLogConsumer(jmxScraperLogger))
@@ -119,14 +149,13 @@ void endToEndTest(@TempDir Path tmpDir) {
119149

120150
scraper = customizeScraperContainer(scraper, target, tmpDir);
121151
scraper.start();
122-
123-
verifyMetrics();
124152
}
125153

126154
protected void verifyMetrics() {
127155
MetricsVerifier metricsVerifier = createMetricsVerifier();
128156
await()
129157
.atMost(Duration.ofSeconds(60))
158+
.pollInterval(Duration.ofSeconds(1))
130159
.untilAsserted(
131160
() -> {
132161
List<ExportMetricsServiceRequest> receivedMetrics = otlpServer.getMetrics();
@@ -158,6 +187,10 @@ protected JmxScraperContainer customizeScraperContainer(
158187
return scraper;
159188
}
160189

190+
protected Collection<GenericContainer<?>> createPrerequisiteContainers() {
191+
return Collections.emptyList();
192+
}
193+
161194
private static class OtlpGrpcServer extends ServerExtension {
162195

163196
private final BlockingQueue<ExportMetricsServiceRequest> metricRequests =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.jmxscraper.target_systems.kafka;
7+
8+
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute;
9+
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup;
10+
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeWithAnyValue;
11+
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaConsumerContainer;
12+
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer;
13+
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaProducerContainer;
14+
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer;
15+
16+
import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer;
17+
import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier;
18+
import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest;
19+
import java.nio.file.Path;
20+
import java.util.Arrays;
21+
import java.util.Collection;
22+
import org.slf4j.LoggerFactory;
23+
import org.testcontainers.containers.GenericContainer;
24+
import org.testcontainers.containers.output.Slf4jLogConsumer;
25+
import org.testcontainers.containers.wait.strategy.Wait;
26+
27+
public class KafkaConsumerIntegrationTest extends TargetSystemIntegrationTest {
28+
29+
@Override
30+
protected Collection<GenericContainer<?>> createPrerequisiteContainers() {
31+
GenericContainer<?> zookeeper =
32+
createZookeeperContainer()
33+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper")))
34+
.withNetworkAliases("zookeeper");
35+
36+
GenericContainer<?> kafka =
37+
createKafkaContainer()
38+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka")))
39+
.withNetworkAliases("kafka")
40+
.dependsOn(zookeeper);
41+
42+
GenericContainer<?> kafkaProducer =
43+
createKafkaProducerContainer()
44+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka-producer")))
45+
.withNetworkAliases("kafka-producer")
46+
.dependsOn(kafka);
47+
48+
return Arrays.asList(zookeeper, kafka, kafkaProducer);
49+
}
50+
51+
@Override
52+
protected GenericContainer<?> createTargetContainer(int jmxPort) {
53+
return createKafkaConsumerContainer()
54+
.withEnv("JMX_PORT", Integer.toString(jmxPort))
55+
.withExposedPorts(jmxPort)
56+
.waitingFor(Wait.forListeningPorts(jmxPort));
57+
}
58+
59+
@Override
60+
protected JmxScraperContainer customizeScraperContainer(
61+
JmxScraperContainer scraper, GenericContainer<?> target, Path tempDir) {
62+
return scraper.withTargetSystem("kafka-consumer");
63+
}
64+
65+
@Override
66+
protected MetricsVerifier createMetricsVerifier() {
67+
return MetricsVerifier.create()
68+
.add(
69+
"kafka.consumer.fetch-rate",
70+
metric ->
71+
metric
72+
.hasDescription("The number of fetch requests for all topics per second")
73+
.hasUnit("{request}")
74+
.isGauge()
75+
.hasDataPointsWithOneAttribute(
76+
attributeWithAnyValue("client.id"))) // changed to follow semconv
77+
.add(
78+
"kafka.consumer.records-lag-max",
79+
metric ->
80+
metric
81+
.hasDescription("Number of messages the consumer lags behind the producer")
82+
.hasUnit("{message}")
83+
.isGauge()
84+
.hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id")))
85+
.add(
86+
"kafka.consumer.total.bytes-consumed-rate",
87+
metric ->
88+
metric
89+
.hasDescription(
90+
"The average number of bytes consumed for all topics per second")
91+
.hasUnit("By")
92+
.isGauge()
93+
.hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id")))
94+
.add(
95+
"kafka.consumer.total.fetch-size-avg",
96+
metric ->
97+
metric
98+
.hasDescription(
99+
"The average number of bytes fetched per request for all topics")
100+
.hasUnit("By")
101+
.isGauge()
102+
.hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id")))
103+
.add(
104+
"kafka.consumer.total.records-consumed-rate",
105+
metric ->
106+
metric
107+
.hasDescription(
108+
"The average number of records consumed for all topics per second")
109+
.hasUnit("{record}")
110+
.isGauge()
111+
.hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id")))
112+
.add(
113+
"kafka.consumer.bytes-consumed-rate",
114+
metric ->
115+
metric
116+
.hasDescription("The average number of bytes consumed per second")
117+
.hasUnit("By")
118+
.isGauge()
119+
.hasDataPointsWithAttributes(
120+
attributeGroup(
121+
attributeWithAnyValue("client.id"),
122+
attribute("topic", "test-topic-1"))))
123+
.add(
124+
"kafka.consumer.fetch-size-avg",
125+
metric ->
126+
metric
127+
.hasDescription("The average number of bytes fetched per request")
128+
.hasUnit("By")
129+
.isGauge()
130+
.hasDataPointsWithAttributes(
131+
attributeGroup(
132+
attributeWithAnyValue("client.id"),
133+
attribute("topic", "test-topic-1"))))
134+
.add(
135+
"kafka.consumer.records-consumed-rate",
136+
metric ->
137+
metric
138+
.hasDescription("The average number of records consumed per second")
139+
.hasUnit("{record}")
140+
.isGauge()
141+
.hasDataPointsWithAttributes(
142+
attributeGroup(
143+
attributeWithAnyValue("client.id"),
144+
attribute("topic", "test-topic-1"))));
145+
}
146+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.jmxscraper.target_systems.kafka;
7+
8+
import java.time.Duration;
9+
import org.testcontainers.containers.GenericContainer;
10+
import org.testcontainers.containers.wait.strategy.Wait;
11+
12+
public class KafkaContainerFactory {
13+
private static final int KAFKA_PORT = 9092;
14+
private static final String KAFKA_BROKER = "kafka:" + KAFKA_PORT;
15+
private static final String KAFKA_DOCKER_IMAGE = "bitnami/kafka:2.8.1";
16+
17+
private KafkaContainerFactory() {}
18+
19+
public static GenericContainer<?> createZookeeperContainer() {
20+
return new GenericContainer<>("zookeeper:3.5")
21+
.withStartupTimeout(Duration.ofMinutes(2))
22+
.waitingFor(Wait.forListeningPort());
23+
}
24+
25+
public static GenericContainer<?> createKafkaContainer() {
26+
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
27+
.withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181")
28+
.withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") // Removed in 3.5.1
29+
.withStartupTimeout(Duration.ofMinutes(2))
30+
.withExposedPorts(KAFKA_PORT)
31+
// .waitingFor(Wait.forListeningPorts(KAFKA_PORT));
32+
.waitingFor(
33+
Wait.forLogMessage(".*KafkaServer.*started \\(kafka.server.KafkaServer\\).*", 1));
34+
}
35+
36+
public static GenericContainer<?> createKafkaProducerContainer() {
37+
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
38+
// .withCopyFileToContainer(
39+
// MountableFile.forClasspathResource("kafka-producer.sh"),
40+
// "/usr/bin/kafka-producer.sh")
41+
// .withCommand("/usr/bin/kafka-producer.sh")
42+
.withCommand(
43+
"sh",
44+
"-c",
45+
"echo 'Sending messages to test-topic-1'; "
46+
+ "i=1; while true; do echo \"Message $i\"; sleep .25; i=$((i+1)); done | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server "
47+
+ KAFKA_BROKER
48+
+ " --topic test-topic-1;")
49+
.withStartupTimeout(Duration.ofMinutes(2))
50+
.waitingFor(Wait.forLogMessage(".*Welcome to the Bitnami kafka container.*", 1));
51+
}
52+
53+
public static GenericContainer<?> createKafkaConsumerContainer() {
54+
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
55+
.withCommand(
56+
"kafka-console-consumer.sh",
57+
"--bootstrap-server",
58+
KAFKA_BROKER,
59+
"--whitelist",
60+
"test-topic-.*",
61+
"--max-messages",
62+
"100")
63+
.withStartupTimeout(Duration.ofMinutes(2))
64+
.waitingFor(Wait.forListeningPort());
65+
}
66+
}

0 commit comments

Comments
 (0)