Skip to content

Commit 8ef16f8

Browse files
committed
refactor messaging wrappers & add the kafka-clients implementation
1 parent 8a6dc8c commit 8ef16f8

32 files changed

+628
-471
lines changed

messaging-wrappers/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ this tool aims to streamline the tracing and monitoring process.
2222
## Component owners
2323

2424
- [Minghui Zhang](https://github.com/Cirilla-zmh), Alibaba Cloud
25+
- [Steve Rao](https://github.com/steverao), Alibaba Cloud
2526

2627
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).

messaging-wrappers/api/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ description = "OpenTelemetry Messaging Wrappers"
88
otelJava.moduleName.set("io.opentelemetry.contrib.messaging.wrappers")
99

1010
dependencies {
11-
api("io.opentelemetry:opentelemetry-api")
11+
api("io.opentelemetry.instrumentation:opentelemetry-instrumentation-api-incubator")
1212

1313
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi")
1414
compileOnly("io.opentelemetry:opentelemetry-api-incubator")

messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/DefaultMessagingProcessWrapperBuilder.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,55 +3,60 @@
33
import io.opentelemetry.api.GlobalOpenTelemetry;
44
import io.opentelemetry.api.OpenTelemetry;
55
import io.opentelemetry.context.propagation.TextMapGetter;
6-
import io.opentelemetry.contrib.messaging.wrappers.semconv.DefaultMessagingProcessSpanCustomizer;
6+
import io.opentelemetry.contrib.messaging.wrappers.semconv.DefaultMessagingAttributesGetter;
77
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
8-
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessResponse;
8+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
9+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
10+
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
911

1012
import java.util.ArrayList;
13+
import java.util.Collection;
1114
import java.util.List;
1215

13-
public class DefaultMessagingProcessWrapperBuilder<REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>> {
16+
public class DefaultMessagingProcessWrapperBuilder<REQUEST extends MessagingProcessRequest> {
1417

1518
private OpenTelemetry openTelemetry;
1619

17-
private TextMapGetter<REQUEST> textMapGetter;
20+
protected TextMapGetter<REQUEST> textMapGetter;
1821

19-
private List<MessagingSpanCustomizer<REQUEST, RESPONSE>> spanCustomizers;
22+
protected List<AttributesExtractor<REQUEST, Void>> attributesExtractors;
2023

21-
public static <REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>> DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> create() {
22-
return new DefaultMessagingProcessWrapperBuilder<>();
23-
}
24-
25-
public DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> openTelemetry(OpenTelemetry openTelemetry) {
24+
public DefaultMessagingProcessWrapperBuilder<REQUEST> openTelemetry(OpenTelemetry openTelemetry) {
2625
this.openTelemetry = openTelemetry;
2726
return this;
2827
}
2928

30-
public DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> textMapGetter(TextMapGetter<REQUEST> textMapGetter) {
29+
public DefaultMessagingProcessWrapperBuilder<REQUEST> textMapGetter(TextMapGetter<REQUEST> textMapGetter) {
3130
this.textMapGetter = textMapGetter;
3231
return this;
3332
}
3433

35-
public DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> spanCustomizers(
36-
List<MessagingSpanCustomizer<REQUEST, RESPONSE>> spanCustomizers) {
37-
this.spanCustomizers = spanCustomizers;
34+
/**
35+
* This method overrides the original items.
36+
* <p>See {@link DefaultMessagingProcessWrapperBuilder#addAttributesExtractor} if you just want to append one.</p>
37+
* */
38+
public DefaultMessagingProcessWrapperBuilder<REQUEST> attributesExtractors(
39+
Collection<AttributesExtractor<REQUEST, Void>> attributesExtractors) {
40+
this.attributesExtractors = new ArrayList<>();
41+
this.attributesExtractors.addAll(attributesExtractors);
3842
return this;
3943
}
4044

41-
public DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> addSpanCustomizers(
42-
MessagingSpanCustomizer<REQUEST, RESPONSE> spanCustomizer) {
43-
this.spanCustomizers.add(spanCustomizer);
45+
public DefaultMessagingProcessWrapperBuilder<REQUEST> addAttributesExtractor(
46+
AttributesExtractor<REQUEST, Void> attributesExtractor) {
47+
this.attributesExtractors.add(attributesExtractor);
4448
return this;
4549
}
4650

47-
public MessagingProcessWrapper<REQUEST, RESPONSE> build() {
48-
return new MessagingProcessWrapper<>(this.openTelemetry, this.textMapGetter, this.spanCustomizers);
51+
public MessagingProcessWrapper<REQUEST> build() {
52+
return new MessagingProcessWrapper<>(this.openTelemetry == null ? GlobalOpenTelemetry.get() : this.openTelemetry,
53+
this.textMapGetter, this.attributesExtractors);
4954
}
5055

51-
private DefaultMessagingProcessWrapperBuilder() {
52-
// init by default
53-
this.openTelemetry = GlobalOpenTelemetry.get();
54-
this.spanCustomizers = new ArrayList<>();
55-
this.spanCustomizers.add(DefaultMessagingProcessSpanCustomizer.create());
56+
protected DefaultMessagingProcessWrapperBuilder() {
57+
// init attributes extractors by default
58+
this.attributesExtractors = new ArrayList<>();
59+
this.attributesExtractors.add(MessagingAttributesExtractor.create(
60+
DefaultMessagingAttributesGetter.create(), MessageOperation.PROCESS));
5661
}
5762
}
Lines changed: 36 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.opentelemetry.contrib.messaging.wrappers;
22

33
import io.opentelemetry.api.OpenTelemetry;
4+
import io.opentelemetry.api.common.Attributes;
5+
import io.opentelemetry.api.common.AttributesBuilder;
46
import io.opentelemetry.api.trace.Span;
57
import io.opentelemetry.api.trace.SpanBuilder;
68
import io.opentelemetry.api.trace.Tracer;
@@ -9,16 +11,12 @@
911
import io.opentelemetry.context.propagation.TextMapGetter;
1012
import io.opentelemetry.context.propagation.TextMapPropagator;
1113
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
12-
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessResponse;
14+
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
1315

16+
import javax.annotation.Nullable;
1417
import java.util.List;
15-
import java.util.concurrent.Callable;
16-
import java.util.logging.Level;
17-
import java.util.logging.Logger;
1818

19-
public class MessagingProcessWrapper<REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>> {
20-
21-
private static final Logger LOG = Logger.getLogger(MessagingProcessWrapper.class.getName());
19+
public class MessagingProcessWrapper<REQUEST extends MessagingProcessRequest> {
2220

2321
private static final String INSTRUMENTATION_SCOPE = "messaging-process-wrapper";
2422

@@ -32,94 +30,57 @@ public class MessagingProcessWrapper<REQUEST extends MessagingProcessRequest, RE
3230

3331
private final TextMapGetter<REQUEST> textMapGetter;
3432

35-
private final List<MessagingSpanCustomizer<REQUEST, RESPONSE>> spanCustomizers;
33+
// no attributes need to be extracted from responses in process operations
34+
private final List<AttributesExtractor<REQUEST, Void>> attributesExtractors;
3635

37-
public Runnable wrap(REQUEST request, Runnable runnable) {
38-
return () -> {
39-
Span span = handleStart(request);
40-
Scope scope = span.makeCurrent();
36+
public static <REQUEST extends MessagingProcessRequest> DefaultMessagingProcessWrapperBuilder<REQUEST> defaultBuilder() {
37+
return new DefaultMessagingProcessWrapperBuilder<>();
38+
}
4139

42-
try {
43-
runnable.run();
44-
} catch (Throwable t) {
45-
handleEnd(span, request, null, t);
46-
scope.close();
47-
throw t;
48-
}
40+
public <E extends Throwable> void doProcess(REQUEST request, ThrowingRunnable<E> runnable) throws E {
41+
Span span = handleStart(request);
4942

50-
handleEnd(span, request, null, null);
51-
scope.close();
52-
};
53-
}
43+
try (Scope scope = span.makeCurrent()) {
44+
runnable.run();
45+
} catch (Throwable t) {
46+
handleEnd(span, request, t);
47+
throw t;
48+
}
5449

55-
public <R> Callable<R> wrap(REQUEST request, Callable<R> callable) {
56-
return () -> {
57-
Span span = handleStart(request);
58-
Scope scope = span.makeCurrent();
59-
RESPONSE response = null;
60-
61-
R result = null;
62-
try {
63-
result = callable.call();
64-
if (result instanceof MessagingProcessResponse) {
65-
response = (RESPONSE) result;
66-
}
67-
} catch (Throwable t) {
68-
handleEnd(span, request, response, t);
69-
scope.close();
70-
throw t;
71-
}
72-
73-
handleEnd(span, request, response, null);
74-
scope.close();
75-
return result;
76-
};
50+
handleEnd(span, request, null);
7751
}
7852

79-
public <R> R doProcess(REQUEST request, Callable<R> process) throws Exception {
53+
public <R, E extends Throwable> R doProcess(REQUEST request, ThrowingSupplier<R, E> supplier) throws E {
8054
Span span = handleStart(request);
81-
Scope scope = span.makeCurrent();
82-
RESPONSE response = null;
8355

8456
R result = null;
85-
try {
86-
result = process.call();
87-
if (result instanceof MessagingProcessResponse) {
88-
response = (RESPONSE) result;
89-
}
57+
try (Scope scope = span.makeCurrent()) {
58+
result = supplier.get();
9059
} catch (Throwable t) {
91-
handleEnd(span, request, response, t);
92-
scope.close();
60+
handleEnd(span, request, t);
9361
throw t;
9462
}
9563

96-
// noop response by default
97-
handleEnd(span, request, response, null);
98-
scope.close();
64+
handleEnd(span, request, null);
9965
return result;
10066
}
10167

10268
protected Span handleStart(REQUEST request) {
10369
Context context = this.textMapPropagator.extract(Context.current(), request, this.textMapGetter);
10470
SpanBuilder spanBuilder = this.tracer.spanBuilder(getDefaultSpanName(request.getDestination()));
10571
spanBuilder.setParent(context);
106-
for (MessagingSpanCustomizer<REQUEST, RESPONSE> customizer : spanCustomizers) {
107-
try {
108-
context = customizer.onStart(spanBuilder, context, request);
109-
} catch (Exception e) {
110-
LOG.log(Level.WARNING, "Exception occurred while customizing span on start.", e);
111-
}
72+
73+
AttributesBuilder builder = Attributes.builder();
74+
for (AttributesExtractor<REQUEST, Void> extractor : this.attributesExtractors) {
75+
extractor.onStart(builder, context, request);
11276
}
113-
return spanBuilder.startSpan();
77+
return spanBuilder.setAllAttributes(builder.build()).startSpan();
11478
}
11579

116-
protected void handleEnd(Span span, REQUEST request, RESPONSE response, Throwable t) {
117-
for (MessagingSpanCustomizer<REQUEST, RESPONSE> customizer : spanCustomizers) {
118-
try {
119-
customizer.onEnd(span, Context.current(), request, response, t);
120-
} catch (Exception e) {
121-
LOG.log(Level.WARNING, "Exception occurred while customizing span on end.", e);
122-
}
80+
protected void handleEnd(Span span, REQUEST request, Throwable t) {
81+
AttributesBuilder builder = Attributes.builder();
82+
for (AttributesExtractor<REQUEST, Void> extractor : this.attributesExtractors) {
83+
extractor.onEnd(builder, Context.current(), request, null, t);
12384
}
12485
span.end();
12586
}
@@ -132,11 +93,11 @@ protected String getDefaultSpanName(String destination) {
13293
}
13394

13495
protected MessagingProcessWrapper(OpenTelemetry openTelemetry,
135-
TextMapGetter<REQUEST> textMapGetter,
136-
List<MessagingSpanCustomizer<REQUEST, RESPONSE>> spanCustomizers) {
96+
@Nullable TextMapGetter<REQUEST> textMapGetter,
97+
List<AttributesExtractor<REQUEST, Void>> attributesExtractors) {
13798
this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator();
13899
this.tracer = openTelemetry.getTracer(INSTRUMENTATION_SCOPE + "-" + INSTRUMENTATION_VERSION);
139100
this.textMapGetter = textMapGetter;
140-
this.spanCustomizers = spanCustomizers;
101+
this.attributesExtractors = attributesExtractors;
141102
}
142103
}

messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/MessagingSpanCustomizer.java

Lines changed: 0 additions & 14 deletions
This file was deleted.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.opentelemetry.contrib.messaging.wrappers;
2+
3+
/**
4+
* A utility interface representing a {@link Runnable} that may throw.
5+
*
6+
* <p>Inspired from <a href=https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/util/ThrowingRunnable.java>ThrowingRunnable</a>.
7+
*
8+
* @param <E> Thrown exception type.
9+
*/
10+
@FunctionalInterface
11+
public interface ThrowingRunnable<E extends Throwable> {
12+
void run() throws E;
13+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.opentelemetry.contrib.messaging.wrappers;
2+
3+
import java.util.function.Supplier;
4+
5+
/**
6+
* A utility interface representing a {@link Supplier} that may throw.
7+
*
8+
* <p>Inspired from <a href=https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/util/ThrowingSupplier.java>ThrowingSupplier</a>.
9+
*
10+
* @param <E> Thrown exception type.
11+
*/
12+
@FunctionalInterface
13+
public interface ThrowingSupplier<T, E extends Throwable> {
14+
T get() throws E;
15+
}

0 commit comments

Comments
 (0)