Skip to content

Commit 12ca51d

Browse files
committed
Remove messaging header normalization
1 parent a159439 commit 12ca51d

File tree

13 files changed

+261
-84
lines changed

13 files changed

+261
-84
lines changed

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/CapturedMessageHeadersUtil.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,26 @@
1212

1313
final class CapturedMessageHeadersUtil {
1414

15-
private static final ConcurrentMap<String, AttributeKey<List<String>>> attributeKeysCache =
16-
new ConcurrentHashMap<>();
15+
private static final ConcurrentMap<String, AttributeKey<List<String>>>
16+
oldSemconvAttributeKeysCache = new ConcurrentHashMap<>();
17+
private static final ConcurrentMap<String, AttributeKey<List<String>>>
18+
stableSemconvAttributeKeysCache = new ConcurrentHashMap<>();
1719

18-
static AttributeKey<List<String>> attributeKey(String headerName) {
19-
return attributeKeysCache.computeIfAbsent(headerName, n -> createKey(n));
20+
static AttributeKey<List<String>> oldSemconvAttributeKey(String headerName) {
21+
return oldSemconvAttributeKeysCache.computeIfAbsent(headerName, n -> createOldSemconvKey(n));
2022
}
2123

22-
private static AttributeKey<List<String>> createKey(String headerName) {
24+
static AttributeKey<List<String>> stableSemconvAttributeKey(String headerName) {
25+
return stableSemconvAttributeKeysCache.computeIfAbsent(
26+
headerName, n -> createStableSemconvKey(n));
27+
}
28+
29+
private static AttributeKey<List<String>> createOldSemconvKey(String headerName) {
30+
String key = "messaging.header." + headerName.replace('-', '_');
31+
return AttributeKey.stringArrayKey(key);
32+
}
33+
34+
private static AttributeKey<List<String>> createStableSemconvKey(String headerName) {
2335
String key = "messaging.header." + headerName;
2436
return AttributeKey.stringArrayKey(key);
2537
}

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.opentelemetry.api.common.AttributesBuilder;
1212
import io.opentelemetry.context.Context;
1313
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
14+
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
1415
import io.opentelemetry.instrumentation.api.internal.SpanKey;
1516
import io.opentelemetry.instrumentation.api.internal.SpanKeyProvider;
1617
import java.util.ArrayList;
@@ -131,7 +132,13 @@ public void onEnd(
131132
for (String name : capturedHeaders) {
132133
List<String> values = getter.getMessageHeader(request, name);
133134
if (!values.isEmpty()) {
134-
internalSet(attributes, CapturedMessageHeadersUtil.attributeKey(name), values);
135+
if (SemconvStability.isEmitOldMessageSemconv()) {
136+
internalSet(attributes, CapturedMessageHeadersUtil.oldSemconvAttributeKey(name), values);
137+
}
138+
if (SemconvStability.isEmitStableMessageSemconv()) {
139+
internalSet(
140+
attributes, CapturedMessageHeadersUtil.stableSemconvAttributeKey(name), values);
141+
}
135142
}
136143
}
137144
}

instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/SemconvStability.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,19 @@ public final class SemconvStability {
2424
private static final boolean emitOldCodeSemconv;
2525
private static final boolean emitStableCodeSemconv;
2626

27+
private static final boolean emitOldMessageSemconv;
28+
private static final boolean emitStableMessageSemconv;
29+
2730
static {
2831
boolean oldDatabase = true;
2932
boolean stableDatabase = false;
3033

3134
boolean oldCode = true;
3235
boolean stableCode = false;
3336

37+
boolean oldMessage = true;
38+
boolean stableMessage = false;
39+
3440
String value = ConfigPropertiesUtil.getString("otel.semconv-stability.opt-in");
3541
if (value != null) {
3642
Set<String> values = new HashSet<>(asList(value.split(",")));
@@ -55,13 +61,25 @@ public final class SemconvStability {
5561
oldCode = true;
5662
stableCode = true;
5763
}
64+
65+
if (values.contains("message")) {
66+
oldMessage = false;
67+
stableMessage = true;
68+
}
69+
if (values.contains("message/dup")) {
70+
oldMessage = true;
71+
stableMessage = true;
72+
}
5873
}
5974

6075
emitOldDatabaseSemconv = oldDatabase;
6176
emitStableDatabaseSemconv = stableDatabase;
6277

6378
emitOldCodeSemconv = oldCode;
6479
emitStableCodeSemconv = stableCode;
80+
81+
emitOldMessageSemconv = oldMessage;
82+
emitStableMessageSemconv = stableMessage;
6583
}
6684

6785
public static boolean emitOldDatabaseSemconv() {
@@ -105,5 +123,13 @@ public static boolean isEmitStableCodeSemconv() {
105123
return emitStableCodeSemconv;
106124
}
107125

126+
public static boolean isEmitOldMessageSemconv() {
127+
return emitOldMessageSemconv;
128+
}
129+
130+
public static boolean isEmitStableMessageSemconv() {
131+
return emitStableMessageSemconv;
132+
}
133+
108134
private SemconvStability() {}
109135
}

instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.amazonaws.services.sqs.model.SendMessageRequest;
4141
import io.opentelemetry.api.common.Attributes;
4242
import io.opentelemetry.api.trace.SpanKind;
43+
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
4344
import io.opentelemetry.instrumentation.test.utils.PortUtils;
4445
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
4546
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
@@ -180,10 +181,18 @@ void testSimpleSqsProducerConsumerServicesCaptureHeaders(boolean testCaptureHead
180181
equalTo(NETWORK_PROTOCOL_VERSION, "1.1")));
181182

182183
if (testCaptureHeaders) {
183-
attributes.add(
184-
satisfies(
185-
stringArrayKey("messaging.header.Test-Message-Header"),
186-
val -> val.isEqualTo(singletonList("test"))));
184+
if (SemconvStability.isEmitOldMessageSemconv()) {
185+
attributes.add(
186+
satisfies(
187+
stringArrayKey("messaging.header.Test_Message_Header"),
188+
val -> val.isEqualTo(singletonList("test"))));
189+
}
190+
if (SemconvStability.isEmitStableMessageSemconv()) {
191+
attributes.add(
192+
satisfies(
193+
stringArrayKey("messaging.header.Test-Message-Header"),
194+
val -> val.isEqualTo(singletonList("test"))));
195+
}
187196
}
188197

189198
span.hasName("testSdkSqs publish")
@@ -220,10 +229,18 @@ void testSimpleSqsProducerConsumerServicesCaptureHeaders(boolean testCaptureHead
220229
equalTo(NETWORK_PROTOCOL_VERSION, "1.1")));
221230

222231
if (testCaptureHeaders) {
223-
attributes.add(
224-
satisfies(
225-
stringArrayKey("messaging.header.Test-Message-Header"),
226-
val -> val.isEqualTo(singletonList("test"))));
232+
if (SemconvStability.isEmitOldMessageSemconv()) {
233+
attributes.add(
234+
satisfies(
235+
stringArrayKey("messaging.header.Test_Message_Header"),
236+
val -> val.isEqualTo(singletonList("test"))));
237+
}
238+
if (SemconvStability.isEmitStableMessageSemconv()) {
239+
attributes.add(
240+
satisfies(
241+
stringArrayKey("messaging.header.Test-Message-Header"),
242+
val -> val.isEqualTo(singletonList("test"))));
243+
}
227244
}
228245

229246
span.hasName("testSdkSqs receive")

instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import io.opentelemetry.api.common.Attributes;
3131
import io.opentelemetry.api.trace.SpanKind;
32+
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
3233
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
3334
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
3435
import io.opentelemetry.sdk.trace.data.SpanData;
@@ -91,10 +92,18 @@ protected void assertSqsTraces(boolean withParent, boolean captureHeaders) {
9192
MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))));
9293

9394
if (captureHeaders) {
94-
attributes.add(
95-
satisfies(
96-
stringArrayKey("messaging.header.Test-Message-Header"),
97-
v -> v.isEqualTo(ImmutableList.of("test"))));
95+
if (SemconvStability.isEmitOldMessageSemconv()) {
96+
attributes.add(
97+
satisfies(
98+
stringKey("messaging.header.Test_Message_Header"),
99+
v -> v.isEqualTo("test")));
100+
}
101+
if (SemconvStability.isEmitStableMessageSemconv()) {
102+
attributes.add(
103+
satisfies(
104+
stringArrayKey("messaging.header.Test-Message-Header"),
105+
v -> v.isEqualTo(ImmutableList.of("test"))));
106+
}
98107
}
99108
span.hasName("testSdkSqs publish")
100109
.hasKind(SpanKind.PRODUCER)
@@ -161,10 +170,18 @@ protected void assertSqsTraces(boolean withParent, boolean captureHeaders) {
161170
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)));
162171

163172
if (captureHeaders) {
164-
attributes.add(
165-
satisfies(
166-
stringArrayKey("messaging.header.Test-Message-Header"),
167-
v -> v.isEqualTo(ImmutableList.of("test"))));
173+
if (SemconvStability.isEmitOldMessageSemconv()) {
174+
attributes.add(
175+
satisfies(
176+
stringKey("messaging.header.Test_Message_Header"),
177+
v -> v.isEqualTo("test")));
178+
}
179+
if (SemconvStability.isEmitStableMessageSemconv()) {
180+
attributes.add(
181+
satisfies(
182+
stringArrayKey("messaging.header.Test-Message-Header"),
183+
v -> v.isEqualTo(ImmutableList.of("test"))));
184+
}
168185
}
169186

170187
if (withParent) {

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
2020

2121
import io.opentelemetry.api.common.AttributeKey;
22+
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
2223
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
2324
import java.nio.charset.StandardCharsets;
2425
import java.time.Duration;
@@ -186,10 +187,18 @@ protected static List<AttributeAssertion> sendAttributes(
186187
assertions.add(equalTo(MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true));
187188
}
188189
if (testHeaders) {
189-
assertions.add(
190-
equalTo(
191-
AttributeKey.stringArrayKey("messaging.header.Test-Message-Header"),
192-
Collections.singletonList("test")));
190+
if (SemconvStability.isEmitOldMessageSemconv()) {
191+
assertions.add(
192+
equalTo(
193+
AttributeKey.stringArrayKey("messaging.header.Test_Message_Header"),
194+
Collections.singletonList("test")));
195+
}
196+
if (SemconvStability.isEmitStableMessageSemconv()) {
197+
assertions.add(
198+
equalTo(
199+
AttributeKey.stringArrayKey("messaging.header.Test-Message-Header"),
200+
Collections.singletonList("test")));
201+
}
193202
}
194203
return assertions;
195204
}
@@ -209,10 +218,18 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
209218
assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"));
210219
}
211220
if (testHeaders) {
212-
assertions.add(
213-
equalTo(
214-
AttributeKey.stringArrayKey("messaging.header.Test-Message-Header"),
215-
Collections.singletonList("test")));
221+
if (SemconvStability.isEmitOldMessageSemconv()) {
222+
assertions.add(
223+
equalTo(
224+
AttributeKey.stringArrayKey("messaging.header.Test_Message_Header"),
225+
Collections.singletonList("test")));
226+
}
227+
if (SemconvStability.isEmitStableMessageSemconv()) {
228+
assertions.add(
229+
equalTo(
230+
AttributeKey.stringArrayKey("messaging.header.Test-Message-Header"),
231+
Collections.singletonList("test")));
232+
}
216233
}
217234
return assertions;
218235
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
import static java.util.Collections.singletonList;
2121
import static org.assertj.core.api.Assertions.assertThat;
2222

23+
import io.opentelemetry.api.common.AttributeKey;
2324
import io.opentelemetry.api.trace.SpanContext;
2425
import io.opentelemetry.api.trace.SpanKind;
26+
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
2527
import io.opentelemetry.sdk.trace.data.LinkData;
2628
import java.nio.charset.StandardCharsets;
29+
import java.util.List;
2730
import java.util.concurrent.atomic.AtomicReference;
2831
import org.assertj.core.api.AbstractLongAssert;
2932
import org.assertj.core.api.AbstractStringAssert;
@@ -69,9 +72,7 @@ void assertTraces() {
6972
.hasNoParent()
7073
.hasLinksSatisfying(links -> assertThat(links).isEmpty())
7174
.hasAttributesSatisfyingExactly(
72-
equalTo(
73-
stringArrayKey("messaging.header.Test-Message-Header"),
74-
singletonList("test")),
75+
equalTo(headerAttributeKey(), singletonList("test")),
7576
equalTo(MESSAGING_SYSTEM, "kafka"),
7677
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
7778
equalTo(MESSAGING_OPERATION, "receive"),
@@ -86,9 +87,7 @@ void assertTraces() {
8687
.hasParent(trace.getSpan(0))
8788
.hasLinks(LinkData.create(producerSpanContext.get()))
8889
.hasAttributesSatisfyingExactly(
89-
equalTo(
90-
stringArrayKey("messaging.header.Test-Message-Header"),
91-
singletonList("test")),
90+
equalTo(headerAttributeKey(), singletonList("test")),
9291
equalTo(MESSAGING_SYSTEM, "kafka"),
9392
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
9493
equalTo(MESSAGING_OPERATION, "process"),
@@ -115,4 +114,12 @@ void assertTraces() {
115114
span ->
116115
span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
117116
}
117+
118+
private AttributeKey<List<String>> headerAttributeKey() {
119+
if (SemconvStability.isEmitOldMessageSemconv()) {
120+
return AttributeKey.stringArrayKey("messaging.header.Test_MessageHeader");
121+
} else {
122+
return AttributeKey.stringArrayKey("messaging.header.Test-Message-Header");
123+
}
124+
}
118125
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.opentelemetry.api.common.AttributeKey;
1919
import io.opentelemetry.api.trace.SpanKind;
20+
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
2021
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
2122
import java.nio.charset.StandardCharsets;
2223
import java.util.ArrayList;
@@ -71,10 +72,18 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
7172
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
7273
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
7374
if (testHeaders) {
74-
assertions.add(
75-
equalTo(
76-
AttributeKey.stringArrayKey("messaging.header.Test-Message-Header"),
77-
Collections.singletonList("test")));
75+
if (SemconvStability.isEmitOldMessageSemconv()) {
76+
assertions.add(
77+
equalTo(
78+
AttributeKey.stringArrayKey("messaging.header.Test_Message_Header"),
79+
Collections.singletonList("test")));
80+
}
81+
if (SemconvStability.isEmitStableMessageSemconv()) {
82+
assertions.add(
83+
equalTo(
84+
AttributeKey.stringArrayKey("messaging.header.Test-Message-Header"),
85+
Collections.singletonList("test")));
86+
}
7887
}
7988
return assertions;
8089
}
@@ -98,10 +107,18 @@ private static List<AttributeAssertion> processAttributes(String greeting, boole
98107
satisfies(
99108
MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer"))));
100109
if (testHeaders) {
101-
assertions.add(
102-
equalTo(
103-
AttributeKey.stringArrayKey("messaging.header.Test-Message-Header"),
104-
Collections.singletonList("test")));
110+
if (SemconvStability.isEmitOldMessageSemconv()) {
111+
assertions.add(
112+
equalTo(
113+
AttributeKey.stringArrayKey("messaging.header.Test_Message_Header"),
114+
Collections.singletonList("test")));
115+
}
116+
if (SemconvStability.isEmitStableMessageSemconv()) {
117+
assertions.add(
118+
equalTo(
119+
AttributeKey.stringArrayKey("messaging.header.Test-Message-Header"),
120+
Collections.singletonList("test")));
121+
}
105122
}
106123
return assertions;
107124
}

0 commit comments

Comments
 (0)