|
18 | 18 | import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; |
19 | 19 | import io.opentelemetry.sdk.metrics.data.MetricData; |
20 | 20 | import io.opentelemetry.sdk.metrics.data.PointData; |
| 21 | +import java.lang.reflect.Field; |
| 22 | +import java.lang.reflect.Method; |
21 | 23 | import java.nio.charset.StandardCharsets; |
22 | 24 | import java.time.Duration; |
23 | 25 | import java.time.Instant; |
|
33 | 35 | import org.apache.kafka.clients.CommonClientConfigs; |
34 | 36 | import org.apache.kafka.clients.consumer.ConsumerConfig; |
35 | 37 | import org.apache.kafka.clients.consumer.KafkaConsumer; |
36 | | -import org.apache.kafka.clients.consumer.KafkaConsumerAccess; |
37 | 38 | import org.apache.kafka.clients.producer.KafkaProducer; |
38 | | -import org.apache.kafka.clients.producer.KafkaProducerAccess; |
39 | 39 | import org.apache.kafka.clients.producer.ProducerConfig; |
40 | 40 | import org.apache.kafka.clients.producer.ProducerRecord; |
41 | 41 | import org.apache.kafka.common.MetricName; |
42 | 42 | import org.apache.kafka.common.metrics.KafkaMetric; |
| 43 | +import org.apache.kafka.common.metrics.Metrics; |
43 | 44 | import org.apache.kafka.common.metrics.MetricsReporter; |
44 | 45 | import org.apache.kafka.common.serialization.ByteArrayDeserializer; |
45 | 46 | import org.apache.kafka.common.serialization.ByteArraySerializer; |
@@ -139,14 +140,36 @@ protected Map<String, Object> consumerConfig() { |
139 | 140 |
|
140 | 141 | @Test |
141 | 142 | void noDuplicateMetricsReporter() { |
142 | | - List<MetricsReporter> producerMetricsReporters = |
143 | | - KafkaProducerAccess.getMetricsReporters(producer); |
| 143 | + List<MetricsReporter> producerMetricsReporters = getMetricsReporters(producer); |
144 | 144 | assertThat(countOpenTelemetryMetricsReporters(producerMetricsReporters)).isEqualTo(1); |
145 | | - List<MetricsReporter> consumerMetricsReporters = |
146 | | - KafkaConsumerAccess.getMetricsReporters(consumer); |
| 145 | + List<MetricsReporter> consumerMetricsReporters = getMetricsReporters(consumer); |
147 | 146 | assertThat(countOpenTelemetryMetricsReporters(consumerMetricsReporters)).isEqualTo(1); |
148 | 147 | } |
149 | 148 |
|
| 149 | + private static List<MetricsReporter> getMetricsReporters(Object producerOrConsumer) { |
| 150 | + return getMetricsRegistry(producerOrConsumer).reporters(); |
| 151 | + } |
| 152 | + |
| 153 | + private static Metrics getMetricsRegistry(Object producerOrConsumer) { |
| 154 | + Class<?> clazz = producerOrConsumer.getClass(); |
| 155 | + try { |
| 156 | + Field field = clazz.getDeclaredField("metrics"); |
| 157 | + field.setAccessible(true); |
| 158 | + return (Metrics) field.get(producerOrConsumer); |
| 159 | + } catch (Exception ignored) { |
| 160 | + // Ignore |
| 161 | + } |
| 162 | + try { |
| 163 | + Method method = clazz.getDeclaredMethod("metricsRegistry"); |
| 164 | + method.setAccessible(true); |
| 165 | + return (Metrics) method.invoke(producerOrConsumer); |
| 166 | + } catch (Exception ignored) { |
| 167 | + // Ignore |
| 168 | + } |
| 169 | + throw new IllegalStateException( |
| 170 | + "Failed to get metrics registry from " + producerOrConsumer.getClass().getName()); |
| 171 | + } |
| 172 | + |
150 | 173 | private static long countOpenTelemetryMetricsReporters(List<MetricsReporter> metricsReporters) { |
151 | 174 | return metricsReporters.stream() |
152 | 175 | .filter(reporter -> reporter.getClass().getName().endsWith("OpenTelemetryMetricsReporter")) |
@@ -176,8 +199,6 @@ void observeMetrics() { |
176 | 199 | "kafka.consumer.join_total", |
177 | 200 | "kafka.consumer.last_heartbeat_seconds_ago", |
178 | 201 | "kafka.consumer.last_rebalance_seconds_ago", |
179 | | - "kafka.consumer.partition_assigned_latency_avg", |
180 | | - "kafka.consumer.partition_assigned_latency_max", |
181 | 202 | "kafka.consumer.rebalance_latency_avg", |
182 | 203 | "kafka.consumer.rebalance_latency_max", |
183 | 204 | "kafka.consumer.rebalance_latency_total", |
|
0 commit comments