Skip to content

Commit 9ff34de

Browse files
authored
Fix jmx-metrics kafka test (#1822)
1 parent 8a09f8b commit 9ff34de

File tree

3 files changed

+39
-24
lines changed

3 files changed

+39
-24
lines changed

jmx-metrics/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ testing {
5050
implementation("com.linecorp.armeria:armeria-junit5")
5151
implementation("io.opentelemetry.proto:opentelemetry-proto:1.5.0-alpha")
5252
implementation("org.testcontainers:junit-jupiter")
53+
implementation("org.slf4j:slf4j-simple")
5354
}
5455
}
5556
}

jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/AbstractIntegrationTest.java

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.function.Consumer;
3333
import java.util.stream.Collectors;
3434
import org.assertj.core.api.MapAssert;
35+
import org.awaitility.core.ConditionTimeoutException;
3536
import org.junit.jupiter.api.AfterAll;
3637
import org.junit.jupiter.api.BeforeAll;
3738
import org.junit.jupiter.api.BeforeEach;
@@ -134,28 +135,41 @@ protected static GenericContainer<?> cassandraContainer() {
134135
}
135136

136137
protected final void waitAndAssertMetrics(Iterable<Consumer<Metric>> assertions) {
137-
await()
138-
.atMost(Duration.ofSeconds(30))
139-
.untilAsserted(
140-
() -> {
141-
List<Metric> metrics =
142-
otlpServer.getMetrics().stream()
143-
.map(ExportMetricsServiceRequest::getResourceMetricsList)
144-
.flatMap(rm -> rm.stream().map(ResourceMetrics::getScopeMetricsList))
145-
.flatMap(Collection::stream)
146-
.filter(
147-
sm ->
148-
sm.getScope().getName().equals("io.opentelemetry.contrib.jmxmetrics")
149-
&& sm.getScope().getVersion().equals(expectedMeterVersion()))
150-
.flatMap(sm -> sm.getMetricsList().stream())
151-
.collect(Collectors.toList());
152-
153-
assertThat(metrics).isNotEmpty();
154-
155-
for (Consumer<Metric> assertion : assertions) {
156-
assertThat(metrics).anySatisfy(assertion);
157-
}
158-
});
138+
waitAndAssertMetrics(
139+
() -> {
140+
List<Metric> metrics =
141+
otlpServer.getMetrics().stream()
142+
.map(ExportMetricsServiceRequest::getResourceMetricsList)
143+
.flatMap(rm -> rm.stream().map(ResourceMetrics::getScopeMetricsList))
144+
.flatMap(Collection::stream)
145+
.filter(
146+
sm ->
147+
sm.getScope().getName().equals("io.opentelemetry.contrib.jmxmetrics")
148+
&& sm.getScope().getVersion().equals(expectedMeterVersion()))
149+
.flatMap(sm -> sm.getMetricsList().stream())
150+
.collect(Collectors.toList());
151+
152+
assertThat(metrics).isNotEmpty();
153+
154+
for (Consumer<Metric> assertion : assertions) {
155+
assertThat(metrics).anySatisfy(assertion);
156+
}
157+
});
158+
}
159+
160+
private static void waitAndAssertMetrics(Runnable assertion) {
161+
try {
162+
await().atMost(Duration.ofSeconds(30)).untilAsserted(assertion::run);
163+
} catch (Throwable t) {
164+
if (t instanceof ConditionTimeoutException) {
165+
// Don't throw this failure since the stack is the awaitility thread, causing confusion.
166+
// Instead, just assert one more time on the test thread, which will fail with a better
167+
// stack trace - that is on the same thread as the test.
168+
assertion.run();
169+
} else {
170+
throw t;
171+
}
172+
}
159173
}
160174

161175
@SafeVarargs

jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Set<Startable> getDependencies() {
8080
};
8181

8282
protected GenericContainer<?> kafkaProducerContainer() {
83-
return new GenericContainer<>("bitnami/kafka:latest")
83+
return new GenericContainer<>("bitnami/kafka:2.8.1")
8484
.withNetwork(Network.SHARED)
8585
.withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181")
8686
.withEnv("ALLOW_PLAINTEXT_LISTENER", "yes")
@@ -207,7 +207,7 @@ static class KafkaConsumerIntegrationTest extends KafkaIntegrationTest {
207207

208208
@Container
209209
GenericContainer<?> consumer =
210-
new GenericContainer<>("bitnami/kafka:latest")
210+
new GenericContainer<>("bitnami/kafka:2.8.1")
211211
.withNetwork(Network.SHARED)
212212
.withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181")
213213
.withEnv("ALLOW_PLAINTEXT_LISTENER", "yes")

0 commit comments

Comments
 (0)