Skip to content

Commit 744eb1e

Browse files
authored
Support Kafka 3.7+ (#707)
1 parent 2e08217 commit 744eb1e

File tree

9 files changed

+104
-6
lines changed

9 files changed

+104
-6
lines changed

.dlc.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
200,
3737
301,
3838
302,
39-
401
39+
401,
40+
403
4041
]
4142
}

CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Release Notes.
2222
* Improve 4x performance of ContextManagerExtendService.createTraceContext()
2323
* Add a plugin that supports the Solon framework.
2424
* Fixed issues in the MySQL component where the executeBatch method could result in empty SQL statements .
25-
25+
* Support kafka-clients-3.7.x intercept
2626

2727
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/213?closed=1)
2828

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.kafka.define;
20+
21+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
22+
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
23+
24+
/**
25+
* For Kafka 3.7.x change
26+
*
27+
* <pre>
28+
* 1. The method named pollForFetchs was removed from KafkaConsumer to <code>AsyncKafkaConsumer</code> and <code>LegacyKafkaConsumer</code>
29+
* 2. Because of the enhance class was changed, so we should create new Instrumentation to intercept the method
30+
* </pre>
31+
*/
32+
public class Kafka37AsyncConsumerInstrumentation extends KafkaConsumerInstrumentation {
33+
34+
private static final String ENHANCE_CLASS_37_ASYNC = "org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer";
35+
36+
@Override
37+
protected ClassMatch enhanceClass() {
38+
return byName(ENHANCE_CLASS_37_ASYNC);
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.kafka.define;
20+
21+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
22+
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
23+
24+
/**
25+
* For Kafka 3.7.x change
26+
*
27+
* <pre>
28+
* 1. The method named pollForFetchs was removed from KafkaConsumer to <code>AsyncKafkaConsumer</code> and <code>LegacyKafkaConsumer</code>
29+
* 2. Because of the enhance class was changed, so we should create new Instrumentation to intercept the method
30+
* </pre>
31+
*/
32+
public class Kafka37LegacyConsumerInstrumentation extends KafkaConsumerInstrumentation {
33+
34+
private static final String ENHANCE_CLASS_37_LEGACY = "org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer";
35+
36+
@Override
37+
protected ClassMatch enhanceClass() {
38+
return byName(ENHANCE_CLASS_37_LEGACY);
39+
}
40+
}

apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,6 @@ kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstr
1818
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
1919
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
2020
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
21-
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
21+
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
22+
kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37AsyncConsumerInstrumentation
23+
kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37LegacyConsumerInstrumentation

docs/en/setup/service-agent/java-agent/Plugin-list.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
- jetty-client-9.x
5555
- jetty-server-9.x
5656
- kafka-0.11.x/1.x/2.x
57+
- kafka-3.7.x
5758
- kotlin-coroutine
5859
- lettuce-5.x
5960
- light4j

docs/en/setup/service-agent/java-agent/Supported-list.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ metrics based on the tracing data.
7676
* MQ
7777
* [RocketMQ](https://github.com/apache/rocketmq) 3.x-> 5.x
7878
* [RocketMQ-gRPC](http://github.com/apache/rocketmq-clients) 5.x
79-
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3
79+
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.7.1
8080
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
8181
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
8282
* [RabbitMQ](https://www.rabbitmq.com/) 3.x-> 5.x

test/plugin/scenarios/kafka-scenario/src/main/java/test/apache/skywalking/apm/testcase/kafka/controller/CaseController.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.Future;
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.function.Consumer;
27+
import java.util.Collection;
2728
import java.util.regex.Pattern;
2829
import java.util.List;
2930
import java.util.ArrayList;
@@ -32,10 +33,10 @@
3233
import okhttp3.OkHttpClient;
3334
import okhttp3.Request;
3435
import okhttp3.Response;
36+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
3537
import org.apache.kafka.clients.consumer.ConsumerRecord;
3638
import org.apache.kafka.clients.consumer.ConsumerRecords;
3739
import org.apache.kafka.clients.consumer.KafkaConsumer;
38-
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
3940
import org.apache.kafka.clients.producer.Callback;
4041
import org.apache.kafka.clients.producer.KafkaProducer;
4142
import org.apache.kafka.clients.producer.Producer;
@@ -270,7 +271,17 @@ public void run() {
270271
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
271272
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
272273
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
273-
consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());
274+
consumer.subscribe(topicPattern, new ConsumerRebalanceListener() {
275+
@Override
276+
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
277+
278+
}
279+
280+
@Override
281+
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
282+
283+
}
284+
});
274285
while (true) {
275286
if (pollAndInvoke(consumer)) break;
276287
}

test/plugin/scenarios/kafka-scenario/support-version.list

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,6 @@
2828
3.0.2
2929
3.1.2
3030
3.2.3
31+
3.6.0
32+
3.7.0
33+
3.7.1

0 commit comments

Comments
 (0)