Skip to content

Commit 77e3001

Browse files
committed
improve the integration test of kafka-clients messaging wrapper
1 parent b6ed6c6 commit 77e3001

File tree

7 files changed

+234
-134
lines changed

7 files changed

+234
-134
lines changed

messaging-wrappers/README.md

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,99 @@ this tool aims to streamline the tracing and monitoring process.
1515

1616
## Predefined Implementations
1717

18-
| Messaging system | Version | Wrapper type |
19-
|-------------------|----------------|--------------|
20-
| Aliyun mns-client | 1.3.0-SNAPSHOT | process |
18+
| Messaging system | Version Scope | Wrapper type |
19+
|-------------------|---------------------|--------------|
20+
| kafka-clients | `[0.11.0.0,)` | process |
21+
| aliyun mns-client | `[1.3.0-SNAPSHOT,)` | process |
22+
23+
## Quickstart
24+
25+
### Step 1 Add dependencies
26+
27+
To use OpenTelemetry in your project, you need to add the necessary dependencies. Below are the configurations for both
28+
Gradle and Maven.
29+
30+
#### Gradle
31+
32+
```kotlin
33+
dependencies {
34+
implementation("io.opentelemetry.contrib:opentelemetry-messaging-wrappers-api")
35+
}
36+
```
37+
38+
#### Maven
39+
40+
```xml
41+
<dependency>
42+
<groupId>io.opentelemetry.contrib</groupId>
43+
<artifactId>opentelemetry-messaging-wrappers-api</artifactId>
44+
</dependency>
45+
```
46+
47+
### Step 2 Initializing MessagingWrappers
48+
49+
Below is an example of how to initialize a messaging wrapper.
50+
51+
```java
52+
public class Demo {
53+
54+
public static MessagingProcessWrapper<MyMessagingProcessRequest> createWrapper(
55+
OpenTelemetry openTelemetry,
56+
MyTextMapGetter textMapGetter,
57+
List<AttributesExtractor<MyMessagingProcessRequest, Void>> additionalExtractor) {
58+
59+
return MessagingProcessWrapper.<MyMessagingProcessRequest>defaultBuilder()
60+
.openTelemetry(openTelemetry)
61+
.textMapGetter(textMapGetter)
62+
.addAttributesExtractors(additionalExtractor)
63+
.build();
64+
}
65+
}
66+
67+
public class MyMessagingProcessRequest implements MessagingProcessRequest {
68+
// your implementation here
69+
}
70+
71+
public class MyTextMapGetter implements TextMapGetter<MyMessagingProcessRequest> {
72+
// your implementation here
73+
}
74+
```
75+
76+
For arbitrary messaging systems, you need to manually define `MessagingProcessRequest` and the corresponding `TextMapGetter`.
77+
You can also customize your messaging spans by adding an AttributesExtractor.
78+
79+
For popular messaging systems, we provide pre-implemented wrappers that allow for out-of-the-box integration. We provide
80+
an implementation based on the OpenTelemetry semantic convention by default.
81+
82+
```java
83+
public class KafkaDemo {
84+
85+
public static MessagingProcessWrapper<KafkaProcessRequest> createWrapper() {
86+
return KafkaHelper.processWrapperBuilder().build();
87+
}
88+
}
89+
```
90+
91+
### Step 3 Wrapping your process
92+
93+
Once the MessagingWrapper are initialized, you can wrap your message processing logic to ensure that tracing spans are
94+
properly created and propagated.
95+
96+
**P.S.** Some instrumentations may also [generate process spans](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/docs/supported-libraries.md).
97+
If both are enabled, it might result in duplicate nested process spans. It is recommended to disable one of them.
98+
99+
```java
100+
public class Demo {
101+
102+
private static final MessagingProcessWrapper<MyMessagingProcessRequest> WRAPPER = createWrapper();
103+
104+
public String consume(Message message) {
105+
WRAPPER.doProcess(new MyMessagingProcessRequest(message), () -> {
106+
// your processing logic
107+
});
108+
}
109+
}
110+
```
21111

22112
## Component owners
23113

messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/DefaultMessagingProcessWrapperBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ public DefaultMessagingProcessWrapperBuilder<REQUEST> addAttributesExtractor(
5656
return this;
5757
}
5858

59+
@CanIgnoreReturnValue
60+
public DefaultMessagingProcessWrapperBuilder<REQUEST> addAttributesExtractors(
61+
Collection<AttributesExtractor<REQUEST, Void>> attributesExtractor) {
62+
this.attributesExtractors.addAll(attributesExtractor);
63+
return this;
64+
}
65+
5966
public MessagingProcessWrapper<REQUEST> build() {
6067
return new MessagingProcessWrapper<>(
6168
this.openTelemetry == null ? GlobalOpenTelemetry.get() : this.openTelemetry,

messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/MessagingProcessWrapper.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import javax.annotation.Nullable;
1717
import java.util.List;
1818

19+
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
20+
1921
public class MessagingProcessWrapper<REQUEST extends MessagingProcessRequest> {
2022

2123
private static final String INSTRUMENTATION_SCOPE = "messaging-process-wrapper";
@@ -68,7 +70,7 @@ public <R, E extends Throwable> R doProcess(REQUEST request, ThrowingSupplier<R,
6870
protected Span handleStart(REQUEST request) {
6971
Context context = this.textMapPropagator.extract(Context.current(), request, this.textMapGetter);
7072
SpanBuilder spanBuilder = this.tracer.spanBuilder(getDefaultSpanName(request.getDestination()));
71-
spanBuilder.setParent(context);
73+
spanBuilder.setSpanKind(CONSUMER).setParent(context);
7274

7375
AttributesBuilder builder = Attributes.builder();
7476
for (AttributesExtractor<REQUEST, Void> extractor : this.attributesExtractors) {
@@ -96,7 +98,7 @@ protected MessagingProcessWrapper(OpenTelemetry openTelemetry,
9698
TextMapGetter<REQUEST> textMapGetter,
9799
List<AttributesExtractor<REQUEST, Void>> attributesExtractors) {
98100
this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator();
99-
this.tracer = openTelemetry.getTracer(INSTRUMENTATION_SCOPE + "-" + INSTRUMENTATION_VERSION);
101+
this.tracer = openTelemetry.getTracer(INSTRUMENTATION_SCOPE, INSTRUMENTATION_VERSION);
100102
this.textMapGetter = textMapGetter;
101103
this.attributesExtractors = attributesExtractors;
102104
}

messaging-wrappers/kafka-clients/build.gradle.kts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ dependencies {
3939
tasks {
4040
withType<Test>().configureEach {
4141
jvmArgs("-Dotel.java.global-autoconfigure.enabled=true")
42-
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
42+
// TODO: According to https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#message-creation-context-as-parent-of-process-span,
43+
// process span should be the child of receive span. However, we couldn't access the trace context with receive span
44+
// in wrappers, unless we add a generic accessor for that.
45+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
4346
jvmArgs("-Dotel.traces.exporter=logging")
4447
jvmArgs("-Dotel.metrics.exporter=logging")
4548
jvmArgs("-Dotel.logs.exporter=logging")

messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaConsumerAttributesExtractor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ public final class KafkaConsumerAttributesExtractor<REQUEST extends KafkaProcess
1818
// copied from MessagingIncubatingAttributes
1919
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
2020
AttributeKey.stringKey("messaging.destination.partition.id");
21-
private static final AttributeKey<String> MESSAGING_KAFKA_CONSUMER_GROUP =
22-
AttributeKey.stringKey("messaging.kafka.consumer.group");
21+
private static final AttributeKey<String> MESSAGING_CONSUMER_GROUP_NAME =
22+
AttributeKey.stringKey("messaging.consumer.group.name");
23+
private static final AttributeKey<Long> MESSAGING_KAFKA_OFFSET =
24+
AttributeKey.longKey("messaging.kafka.offset");
2325
private static final AttributeKey<String> MESSAGING_KAFKA_MESSAGE_KEY =
2426
AttributeKey.stringKey("messaging.kafka.message.key");
25-
private static final AttributeKey<Long> MESSAGING_KAFKA_MESSAGE_OFFSET =
26-
AttributeKey.longKey("messaging.kafka.message.offset");
2727
private static final AttributeKey<Boolean> MESSAGING_KAFKA_MESSAGE_TOMBSTONE =
2828
AttributeKey.booleanKey("messaging.kafka.message.tombstone");
2929

@@ -38,7 +38,7 @@ public void onStart(
3838
ConsumerRecord<?, ?> record = request.getRecord();
3939

4040
attributes.put(MESSAGING_DESTINATION_PARTITION_ID, String.valueOf(record.partition()));
41-
attributes.put(MESSAGING_KAFKA_MESSAGE_OFFSET, record.offset());
41+
attributes.put(MESSAGING_KAFKA_OFFSET, record.offset());
4242

4343
Object key = record.key();
4444
if (key != null && canSerialize(key.getClass())) {
@@ -50,7 +50,7 @@ public void onStart(
5050

5151
String consumerGroup = request.getConsumerGroup();
5252
if (consumerGroup != null) {
53-
attributes.put(MESSAGING_KAFKA_CONSUMER_GROUP, consumerGroup);
53+
attributes.put(MESSAGING_CONSUMER_GROUP_NAME, consumerGroup);
5454
}
5555
}
5656

0 commit comments

Comments
 (0)