Skip to content

Commit 59a7f9b

Browse files
authored
avoid calling missing method on kafka-clients 4.x (#4136)
1 parent 0f837ce commit 59a7f9b

File tree

6 files changed

+52
-16
lines changed

6 files changed

+52
-16
lines changed

CHANGELOG.next-release.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ This file contains all changes which are not released yet.
1010
# Fixes
1111
<!--FIXES-START-->
1212
* Prevent potential memory pressure by limiting OpenTelemetry metrics bridge attribute cache sizes - [#4123](https://github.com/elastic/apm-agent-java/pull/4123)
13+
* Fix `NoSuchMethodError` for Kafka 4136clients - [#4136](https://github.com/elastic/apm-agent-java/pull/4136)
1314
<!--FIXES-END-->
1415
# Features and enhancements
1516
<!--ENHANCEMENTS-START-->

apm-agent-plugins/apm-kafka-plugin/apm-kafka-headers-plugin/src/main/java/co/elastic/apm/agent/kafka/KafkaProducerHeadersInstrumentation.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737

3838
import javax.annotation.Nullable;
3939

40+
import java.lang.invoke.MethodHandle;
41+
import java.lang.invoke.MethodHandles;
42+
import java.lang.invoke.MethodType;
43+
4044
import static net.bytebuddy.implementation.bytecode.assign.Assigner.Typing.DYNAMIC;
4145
import static net.bytebuddy.matcher.ElementMatchers.named;
4246
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
@@ -63,7 +67,8 @@ public String getAdviceClassName() {
6367
public static class KafkaProducerHeadersAdvice {
6468
private static final KafkaInstrumentationHelper helper = KafkaInstrumentationHelper.get();
6569
private static final KafkaInstrumentationHeadersHelper headersHelper = KafkaInstrumentationHeadersHelper.get();
66-
private static boolean headersSupported = true;
70+
@Nullable
71+
private static Boolean headersSupported = null;
6772

6873
@Nullable
6974
@Advice.AssignReturned.ToArguments(@ToArgument(value = 1, index = 1, typing = DYNAMIC))
@@ -73,9 +78,24 @@ public static Object[] beforeSend(@Advice.FieldValue("apiVersions") final ApiVer
7378
@Nullable @Advice.Argument(value = 1) Callback callback) {
7479
Span<?> span = helper.onSendStart(record);
7580

76-
// Avoid adding headers to records sent to a version older than 0.11.0 - see specifications in
77-
// https://kafka.apache.org/0110/documentation.html#messageformat
78-
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 && headersSupported) {
81+
if(headersSupported == null) {
82+
try {
83+
// using method handle because method is gone in 4.0.0
84+
MethodHandle maxUsableProduceMagic = MethodHandles.lookup()
85+
.findVirtual(ApiVersions.class, "maxUsableProduceMagic", MethodType.methodType(byte.class));
86+
87+
// Avoid adding headers to records sent to a version older than 0.11.0 - see specifications in
88+
// https://kafka.apache.org/0110/documentation.html#messageformat
89+
byte result = (byte)maxUsableProduceMagic.invoke(apiVersions);
90+
headersSupported = result >= RecordBatch.MAGIC_VALUE_V2;
91+
92+
} catch (Throwable e) {
93+
// method not present, assume 4.0.0 or later
94+
headersSupported = true;
95+
}
96+
}
97+
98+
if (headersSupported) {
7999
try {
80100
headersHelper.setOutgoingTraceContextHeaders(tracer.currentContext(), record);
81101
} catch (final IllegalStateException e) {

apm-agent-plugins/apm-kafka-plugin/apm-kafka-headers-plugin/src/test/java/co/elastic/apm/agent/kafka/KafkaIT.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959

6060
import javax.annotation.Nullable;
6161
import java.nio.charset.StandardCharsets;
62+
import java.time.Duration;
6263
import java.util.ArrayList;
6364
import java.util.Arrays;
6465
import java.util.Collections;
@@ -384,8 +385,7 @@ private List<ConsumerRecord<String, String>> awaitReplyRecords(long timeoutMs, i
384385
long start = System.currentTimeMillis();
385386
long pollTime = 100;
386387
while (System.currentTimeMillis() + pollTime - start < timeoutMs) {
387-
//noinspection deprecation - this poll overload is deprecated in newer clients, but enables testing of old ones
388-
ConsumerRecords<String, String> records = replyConsumer.poll(pollTime);
388+
ConsumerRecords<String, String> records = replyConsumer.poll(Duration.ofMillis(pollTime));
389389
if (!records.isEmpty()) {
390390
records.forEach(replies::add);
391391
}
@@ -537,8 +537,7 @@ public void run() {
537537
kafkaConsumer.subscribe(Collections.singletonList(REQUEST_TOPIC));
538538
while (running) {
539539
try {
540-
//noinspection deprecation - this poll overload is deprecated in newer clients, but enables testing of old ones
541-
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
540+
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
542541
if (records != null && !records.isEmpty()) {
543542
// Can't use switch because we run this test in a dedicated class loader, where the anonymous
544543
// class created by the enum switch cannot be loaded

apm-agent-plugins/apm-kafka-plugin/apm-kafka-headers-plugin/src/test/java/co/elastic/apm/agent/kafka/KafkaIT_RealReporter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.testcontainers.utility.DockerImageName;
5656

5757
import java.nio.charset.StandardCharsets;
58+
import java.time.Duration;
5859
import java.util.ArrayList;
5960
import java.util.Collections;
6061
import java.util.Iterator;
@@ -301,8 +302,7 @@ private List<ConsumerRecord<String, String>> awaitReplyRecords(long timeoutMs, i
301302
long start = System.currentTimeMillis();
302303
long pollTime = 100;
303304
while (System.currentTimeMillis() + pollTime - start < timeoutMs) {
304-
//noinspection deprecation - this poll overload is deprecated in newer clients, but enables testing of old ones
305-
ConsumerRecords<String, String> records = replyConsumer.poll(pollTime);
305+
ConsumerRecords<String, String> records = replyConsumer.poll(Duration.ofMillis(pollTime));
306306
if (!records.isEmpty()) {
307307
records.forEach(replies::add);
308308
}
@@ -350,8 +350,7 @@ public void run() {
350350
kafkaConsumer.subscribe(Collections.singletonList(REQUEST_TOPIC));
351351
while (running) {
352352
try {
353-
//noinspection deprecation - this poll overload is deprecated in newer clients, but enables testing of old ones
354-
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
353+
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
355354
if (records != null && !records.isEmpty()) {
356355
// Can't use switch because we run this test in a dedicated class loader, where the anonymous
357356
// class created by the enum switch cannot be loaded

apm-agent-plugins/apm-kafka-plugin/apm-kafka-headers-plugin/src/test/java/co/elastic/apm/agent/kafka/KafkaLegacyBrokerIT.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
5050

5151
import javax.annotation.Nullable;
52+
import java.time.Duration;
5253
import java.util.Collections;
5354
import java.util.Iterator;
5455
import java.util.List;
@@ -275,8 +276,7 @@ private void sendTwoRecordsAndConsumeReplies() {
275276
int expectedSpans = (testScenario == TestScenario.NO_CONTEXT_PROPAGATION) ? 2 : 4;
276277
await().atMost(500, MILLISECONDS).until(() -> reporter.getSpans().size() == expectedSpans);
277278
}
278-
//noinspection deprecation - this poll overload is deprecated in newer clients, but enables testing of old ones
279-
ConsumerRecords<String, String> replies = replyConsumer.poll(2000);
279+
ConsumerRecords<String, String> replies = replyConsumer.poll(Duration.ofMillis(2000));
280280
assertThat(callback).isNotEmpty();
281281
assertThat(replies.count()).isEqualTo(2);
282282
Iterator<ConsumerRecord<String, String>> iterator = replies.iterator();
@@ -395,8 +395,7 @@ public void run() {
395395
kafkaConsumer.subscribe(Collections.singletonList(REQUEST_TOPIC));
396396
while (running) {
397397
try {
398-
//noinspection deprecation - this poll overload is deprecated in newer clients, but enables testing of old ones
399-
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
398+
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
400399
if (records != null && !records.isEmpty()) {
401400
// Can't use switch because we run this test in a dedicated class loader, where the anonymous
402401
// class created by the enum switch cannot be loaded

apm-agent-plugins/apm-opentelemetry/apm-opentelemetry-metrics-bridge-parent/apm-opentelemetry-metrics-bridge-v1_14/src/test/java/co/elastic/apm/agent/opentelemetry/metrics/bridge/BridgeFactoryV1_14Test.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
119
package co.elastic.apm.agent.opentelemetry.metrics.bridge;
220

321
import io.opentelemetry.api.common.AttributeKey;

0 commit comments

Comments
 (0)