Skip to content

Commit 33d2e40

Browse files
authored
Fix flaky kafka metrics test (#6511)
1 parent dd75281 commit 33d2e40

File tree

2 files changed

+12
-36
lines changed

2 files changed

+12
-36
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -273,18 +273,8 @@ void observeMetrics() {
273273
"kafka.consumer.successful_reauthentication_total",
274274
"kafka.consumer.time_between_poll_avg",
275275
"kafka.consumer.time_between_poll_max",
276-
"kafka.consumer.incoming_byte_rate",
277-
"kafka.consumer.incoming_byte_total",
278-
"kafka.consumer.outgoing_byte_rate",
279-
"kafka.consumer.outgoing_byte_total",
280276
"kafka.consumer.request_latency_avg",
281277
"kafka.consumer.request_latency_max",
282-
"kafka.consumer.request_rate",
283-
"kafka.consumer.request_size_avg",
284-
"kafka.consumer.request_size_max",
285-
"kafka.consumer.request_total",
286-
"kafka.consumer.response_rate",
287-
"kafka.consumer.response_total",
288278
"kafka.producer.batch_size_avg",
289279
"kafka.producer.batch_size_max",
290280
"kafka.producer.batch_split_rate",
@@ -350,27 +340,9 @@ void observeMetrics() {
350340
"kafka.producer.successful_reauthentication_rate",
351341
"kafka.producer.successful_reauthentication_total",
352342
"kafka.producer.waiting_threads",
353-
"kafka.producer.incoming_byte_rate",
354-
"kafka.producer.incoming_byte_total",
355-
"kafka.producer.outgoing_byte_rate",
356-
"kafka.producer.outgoing_byte_total",
357-
"kafka.producer.request_latency_avg",
358-
"kafka.producer.request_latency_max",
359-
"kafka.producer.request_rate",
360-
"kafka.producer.request_size_avg",
361-
"kafka.producer.request_size_max",
362-
"kafka.producer.request_total",
363-
"kafka.producer.response_rate",
364-
"kafka.producer.response_total",
365343
"kafka.producer.byte_rate",
366344
"kafka.producer.byte_total",
367-
"kafka.producer.compression_rate",
368-
"kafka.producer.record_error_rate",
369-
"kafka.producer.record_error_total",
370-
"kafka.producer.record_retry_rate",
371-
"kafka.producer.record_retry_total",
372-
"kafka.producer.record_send_rate",
373-
"kafka.producer.record_send_total"));
345+
"kafka.producer.compression_rate"));
374346

375347
List<MetricData> metrics = testing.metrics();
376348
Set<String> metricNames = metrics.stream().map(MetricData::getName).collect(toSet());

testing-common/src/main/java/io/opentelemetry/instrumentation/testing/LibraryTestRunner.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
1818
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
1919
import io.opentelemetry.sdk.metrics.data.MetricData;
20+
import io.opentelemetry.sdk.metrics.export.MetricReader;
2021
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
2122
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter;
2223
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
@@ -40,6 +41,7 @@ public final class LibraryTestRunner extends InstrumentationTestRunner {
4041
private static final OpenTelemetrySdk openTelemetry;
4142
private static final InMemorySpanExporter testSpanExporter;
4243
private static final InMemoryMetricExporter testMetricExporter;
44+
private static final MetricReader metricReader;
4345
private static boolean forceFlushCalled;
4446

4547
static {
@@ -48,6 +50,13 @@ public final class LibraryTestRunner extends InstrumentationTestRunner {
4850
testSpanExporter = InMemorySpanExporter.create();
4951
testMetricExporter = InMemoryMetricExporter.create(AggregationTemporality.DELTA);
5052

53+
metricReader =
54+
PeriodicMetricReader.builder(testMetricExporter)
55+
// Set really long interval. We'll call forceFlush when we need the metrics
56+
// instead of collecting them periodically.
57+
.setInterval(Duration.ofNanos(Long.MAX_VALUE))
58+
.build();
59+
5160
openTelemetry =
5261
OpenTelemetrySdk.builder()
5362
.setTracerProvider(
@@ -56,13 +65,7 @@ public final class LibraryTestRunner extends InstrumentationTestRunner {
5665
.addSpanProcessor(SimpleSpanProcessor.create(LoggingSpanExporter.create()))
5766
.addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter))
5867
.build())
59-
.setMeterProvider(
60-
SdkMeterProvider.builder()
61-
.registerMetricReader(
62-
PeriodicMetricReader.builder(testMetricExporter)
63-
.setInterval(Duration.ofMillis(100))
64-
.build())
65-
.build())
68+
.setMeterProvider(SdkMeterProvider.builder().registerMetricReader(metricReader).build())
6669
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
6770
.buildAndRegisterGlobal();
6871
}
@@ -114,6 +117,7 @@ public List<SpanData> getExportedSpans() {
114117

115118
@Override
116119
public List<MetricData> getExportedMetrics() {
120+
metricReader.forceFlush().join(10, TimeUnit.SECONDS);
117121
return testMetricExporter.getFinishedMetricItems();
118122
}
119123

0 commit comments

Comments
 (0)