Skip to content

Commit 8a6dc8c

Browse files
committed
add messaging wrappers to support lightweight manual instrumentation
1 parent 0c445ec commit 8a6dc8c

25 files changed

+865
-0
lines changed

messaging-wrappers/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# OpenTelemetry Messaging Wrappers
2+
3+
This is a lightweight messaging wrappers API designed to help you quickly add instrumentation to any
4+
type of messaging system client. To further ease the burden of instrumentation, we will also provide
5+
predefined implementations for certain messaging systems, helping you seamlessly address the issue
6+
of broken traces.
7+
8+
## Overview
9+
10+
The primary goal of this API is to simplify the process of adding instrumentation to your messaging
11+
systems, thereby enhancing observability without introducing significant overhead. Inspired by
12+
[#13340](https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/13340) and
13+
[opentelemetry-java-instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java),
14+
this tool aims to streamline the tracing and monitoring process.
15+
16+
## Predefined Implementations
17+
18+
| Messaging system | Version | Wrapper type |
19+
|-------------------|----------------|--------------|
20+
| Aliyun mns-client | 1.3.0-SNAPSHOT | process |
21+
22+
## Component owners
23+
24+
- [Minghui Zhang](https://github.com/Cirilla-zmh), Alibaba Cloud
25+
26+
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
plugins {
2+
id("otel.java-conventions")
3+
4+
id("otel.publish-conventions")
5+
}
6+
7+
description = "OpenTelemetry Messaging Wrappers"
8+
otelJava.moduleName.set("io.opentelemetry.contrib.messaging.wrappers")
9+
10+
dependencies {
11+
api("io.opentelemetry:opentelemetry-api")
12+
13+
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi")
14+
compileOnly("io.opentelemetry:opentelemetry-api-incubator")
15+
16+
implementation("io.opentelemetry.semconv:opentelemetry-semconv")
17+
implementation("io.opentelemetry.semconv:opentelemetry-semconv-incubating")
18+
19+
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
20+
testImplementation("io.opentelemetry:opentelemetry-sdk-trace")
21+
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
22+
23+
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
24+
testImplementation("uk.org.webcompere:system-stubs-jupiter:2.0.3")
25+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# TODO: uncomment when ready to mark as stable
2+
# otel.stable=true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.opentelemetry.contrib.messaging.wrappers;
2+
3+
import io.opentelemetry.api.GlobalOpenTelemetry;
4+
import io.opentelemetry.api.OpenTelemetry;
5+
import io.opentelemetry.context.propagation.TextMapGetter;
6+
import io.opentelemetry.contrib.messaging.wrappers.semconv.DefaultMessagingProcessSpanCustomizer;
7+
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
8+
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessResponse;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
13+
public class DefaultMessagingProcessWrapperBuilder<REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>> {
14+
15+
private OpenTelemetry openTelemetry;
16+
17+
private TextMapGetter<REQUEST> textMapGetter;
18+
19+
private List<MessagingSpanCustomizer<REQUEST, RESPONSE>> spanCustomizers;
20+
21+
public static <REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>> DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> create() {
22+
return new DefaultMessagingProcessWrapperBuilder<>();
23+
}
24+
25+
public DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> openTelemetry(OpenTelemetry openTelemetry) {
26+
this.openTelemetry = openTelemetry;
27+
return this;
28+
}
29+
30+
public DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> textMapGetter(TextMapGetter<REQUEST> textMapGetter) {
31+
this.textMapGetter = textMapGetter;
32+
return this;
33+
}
34+
35+
public DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> spanCustomizers(
36+
List<MessagingSpanCustomizer<REQUEST, RESPONSE>> spanCustomizers) {
37+
this.spanCustomizers = spanCustomizers;
38+
return this;
39+
}
40+
41+
public DefaultMessagingProcessWrapperBuilder<REQUEST, RESPONSE> addSpanCustomizers(
42+
MessagingSpanCustomizer<REQUEST, RESPONSE> spanCustomizer) {
43+
this.spanCustomizers.add(spanCustomizer);
44+
return this;
45+
}
46+
47+
public MessagingProcessWrapper<REQUEST, RESPONSE> build() {
48+
return new MessagingProcessWrapper<>(this.openTelemetry, this.textMapGetter, this.spanCustomizers);
49+
}
50+
51+
private DefaultMessagingProcessWrapperBuilder() {
52+
// init by default
53+
this.openTelemetry = GlobalOpenTelemetry.get();
54+
this.spanCustomizers = new ArrayList<>();
55+
this.spanCustomizers.add(DefaultMessagingProcessSpanCustomizer.create());
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package io.opentelemetry.contrib.messaging.wrappers;
2+
3+
import io.opentelemetry.api.OpenTelemetry;
4+
import io.opentelemetry.api.trace.Span;
5+
import io.opentelemetry.api.trace.SpanBuilder;
6+
import io.opentelemetry.api.trace.Tracer;
7+
import io.opentelemetry.context.Context;
8+
import io.opentelemetry.context.Scope;
9+
import io.opentelemetry.context.propagation.TextMapGetter;
10+
import io.opentelemetry.context.propagation.TextMapPropagator;
11+
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
12+
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessResponse;
13+
14+
import java.util.List;
15+
import java.util.concurrent.Callable;
16+
import java.util.logging.Level;
17+
import java.util.logging.Logger;
18+
19+
public class MessagingProcessWrapper<REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>> {
20+
21+
private static final Logger LOG = Logger.getLogger(MessagingProcessWrapper.class.getName());
22+
23+
private static final String INSTRUMENTATION_SCOPE = "messaging-process-wrapper";
24+
25+
private static final String INSTRUMENTATION_VERSION = "1.0.0";
26+
27+
private static final String OPERATION_NAME = "process";
28+
29+
private final TextMapPropagator textMapPropagator;
30+
31+
private final Tracer tracer;
32+
33+
private final TextMapGetter<REQUEST> textMapGetter;
34+
35+
private final List<MessagingSpanCustomizer<REQUEST, RESPONSE>> spanCustomizers;
36+
37+
public Runnable wrap(REQUEST request, Runnable runnable) {
38+
return () -> {
39+
Span span = handleStart(request);
40+
Scope scope = span.makeCurrent();
41+
42+
try {
43+
runnable.run();
44+
} catch (Throwable t) {
45+
handleEnd(span, request, null, t);
46+
scope.close();
47+
throw t;
48+
}
49+
50+
handleEnd(span, request, null, null);
51+
scope.close();
52+
};
53+
}
54+
55+
public <R> Callable<R> wrap(REQUEST request, Callable<R> callable) {
56+
return () -> {
57+
Span span = handleStart(request);
58+
Scope scope = span.makeCurrent();
59+
RESPONSE response = null;
60+
61+
R result = null;
62+
try {
63+
result = callable.call();
64+
if (result instanceof MessagingProcessResponse) {
65+
response = (RESPONSE) result;
66+
}
67+
} catch (Throwable t) {
68+
handleEnd(span, request, response, t);
69+
scope.close();
70+
throw t;
71+
}
72+
73+
handleEnd(span, request, response, null);
74+
scope.close();
75+
return result;
76+
};
77+
}
78+
79+
public <R> R doProcess(REQUEST request, Callable<R> process) throws Exception {
80+
Span span = handleStart(request);
81+
Scope scope = span.makeCurrent();
82+
RESPONSE response = null;
83+
84+
R result = null;
85+
try {
86+
result = process.call();
87+
if (result instanceof MessagingProcessResponse) {
88+
response = (RESPONSE) result;
89+
}
90+
} catch (Throwable t) {
91+
handleEnd(span, request, response, t);
92+
scope.close();
93+
throw t;
94+
}
95+
96+
// noop response by default
97+
handleEnd(span, request, response, null);
98+
scope.close();
99+
return result;
100+
}
101+
102+
protected Span handleStart(REQUEST request) {
103+
Context context = this.textMapPropagator.extract(Context.current(), request, this.textMapGetter);
104+
SpanBuilder spanBuilder = this.tracer.spanBuilder(getDefaultSpanName(request.getDestination()));
105+
spanBuilder.setParent(context);
106+
for (MessagingSpanCustomizer<REQUEST, RESPONSE> customizer : spanCustomizers) {
107+
try {
108+
context = customizer.onStart(spanBuilder, context, request);
109+
} catch (Exception e) {
110+
LOG.log(Level.WARNING, "Exception occurred while customizing span on start.", e);
111+
}
112+
}
113+
return spanBuilder.startSpan();
114+
}
115+
116+
protected void handleEnd(Span span, REQUEST request, RESPONSE response, Throwable t) {
117+
for (MessagingSpanCustomizer<REQUEST, RESPONSE> customizer : spanCustomizers) {
118+
try {
119+
customizer.onEnd(span, Context.current(), request, response, t);
120+
} catch (Exception e) {
121+
LOG.log(Level.WARNING, "Exception occurred while customizing span on end.", e);
122+
}
123+
}
124+
span.end();
125+
}
126+
127+
protected String getDefaultSpanName(String destination) {
128+
if (destination == null) {
129+
destination = "unknown";
130+
}
131+
return OPERATION_NAME + " " + destination;
132+
}
133+
134+
protected MessagingProcessWrapper(OpenTelemetry openTelemetry,
135+
TextMapGetter<REQUEST> textMapGetter,
136+
List<MessagingSpanCustomizer<REQUEST, RESPONSE>> spanCustomizers) {
137+
this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator();
138+
this.tracer = openTelemetry.getTracer(INSTRUMENTATION_SCOPE + "-" + INSTRUMENTATION_VERSION);
139+
this.textMapGetter = textMapGetter;
140+
this.spanCustomizers = spanCustomizers;
141+
}
142+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.opentelemetry.contrib.messaging.wrappers;
2+
3+
import io.opentelemetry.api.trace.Span;
4+
import io.opentelemetry.api.trace.SpanBuilder;
5+
import io.opentelemetry.context.Context;
6+
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
7+
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessResponse;
8+
9+
public interface MessagingSpanCustomizer<REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>> {
10+
11+
Context onStart(SpanBuilder spanBuilder, Context parentContext, REQUEST request);
12+
13+
void onEnd(Span span, Context context, REQUEST request, RESPONSE response, Throwable t);
14+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/** OpenTelemetry messaging wrappers extension. */
2+
package io.opentelemetry.contrib.messaging.wrappers;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.opentelemetry.contrib.messaging.wrappers.semconv;
2+
3+
import io.opentelemetry.api.common.AttributeKey;
4+
import io.opentelemetry.api.trace.Span;
5+
import io.opentelemetry.api.trace.SpanBuilder;
6+
import io.opentelemetry.api.trace.StatusCode;
7+
import io.opentelemetry.context.Context;
8+
import io.opentelemetry.contrib.messaging.wrappers.MessagingSpanCustomizer;
9+
10+
import static io.opentelemetry.semconv.ErrorAttributes.ERROR_TYPE;
11+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.*;
12+
13+
public class DefaultMessagingProcessSpanCustomizer<REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>>
14+
implements MessagingSpanCustomizer<REQUEST, RESPONSE> {
15+
16+
public Context onStart(SpanBuilder spanBuilder, Context parentContext, REQUEST request) {
17+
if (request == null || spanBuilder == null) {
18+
return parentContext;
19+
}
20+
setAttributeIfNotNull(spanBuilder, MESSAGING_OPERATION_NAME, request.getOperationName());
21+
setAttributeIfNotNull(spanBuilder, MESSAGING_SYSTEM, request.getSystem());
22+
setAttributeIfNotNull(spanBuilder, MESSAGING_CONSUMER_GROUP_NAME, request.getConsumerGroupName());
23+
if (request.isAnonymousDestination()) {
24+
spanBuilder.setAttribute(MESSAGING_DESTINATION_ANONYMOUS, request.isAnonymousDestination());
25+
}
26+
setAttributeIfNotNull(spanBuilder, MESSAGING_DESTINATION_NAME, request.getDestination());
27+
setAttributeIfNotNull(spanBuilder, MESSAGING_DESTINATION_SUBSCRIPTION_NAME, request.getDestinationSubscriptionName());
28+
setAttributeIfNotNull(spanBuilder, MESSAGING_DESTINATION_TEMPLATE, request.getDestinationTemplate());
29+
if (request.isTemporaryDestination()) {
30+
spanBuilder.setAttribute(MESSAGING_DESTINATION_TEMPORARY, request.isTemporaryDestination());
31+
}
32+
setAttributeIfNotNull(spanBuilder, MESSAGING_OPERATION_TYPE, request.getOperationType());
33+
setAttributeIfNotNull(spanBuilder, MESSAGING_CLIENT_ID, request.getClientId());
34+
setAttributeIfNotNull(spanBuilder, MESSAGING_DESTINATION_PARTITION_ID, request.getDestinationPartitionId());
35+
setAttributeIfNotNull(spanBuilder, MESSAGING_MESSAGE_CONVERSATION_ID, request.getConversationId());
36+
setAttributeIfNotNull(spanBuilder, MESSAGING_MESSAGE_ID, request.getMessageId());
37+
setAttributeIfNotNull(spanBuilder, MESSAGING_MESSAGE_BODY_SIZE, request.getMessageBodySize());
38+
setAttributeIfNotNull(spanBuilder, MESSAGING_MESSAGE_ENVELOPE_SIZE, request.getMessageEnvelopeSize());
39+
40+
return parentContext;
41+
}
42+
43+
public void onEnd(Span span, Context context, REQUEST request, RESPONSE response, Throwable t) {
44+
if (t != null) {
45+
span.recordException(t);
46+
span.setAttribute(ERROR_TYPE, t.getClass().getCanonicalName());
47+
span.setStatus(StatusCode.ERROR, t.getMessage());
48+
}
49+
}
50+
51+
protected <T> void setAttributeIfNotNull(SpanBuilder spanBuilder, AttributeKey<T> attributeKey, T value) {
52+
if (value == null) {
53+
return;
54+
}
55+
spanBuilder.setAttribute(attributeKey, value);
56+
}
57+
58+
protected <T> void setAttributeIfNotNull(Span span, AttributeKey<T> attributeKey, T value) {
59+
if (value == null) {
60+
return;
61+
}
62+
span.setAttribute(attributeKey, value);
63+
}
64+
65+
public static <REQUEST extends MessagingProcessRequest, RESPONSE extends MessagingProcessResponse<?>> MessagingSpanCustomizer<REQUEST, RESPONSE> create() {
66+
return new DefaultMessagingProcessSpanCustomizer<>();
67+
}
68+
69+
DefaultMessagingProcessSpanCustomizer() {}
70+
}

0 commit comments

Comments
 (0)