Skip to content

Commit 6c4a011

Browse files
committed
netty-4.1's fix to correct context prop
1 parent efd9b45 commit 6c4a011

File tree

3 files changed

+169
-36
lines changed

3 files changed

+169
-36
lines changed

instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientRequestTracingHandler;
3737
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientResponseTracingHandler;
3838
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientTracingHandler;
39+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.OtelHttpClientRequestTracingHandler;
3940
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerBlockingRequestHandler;
4041
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerRequestTracingHandler;
4142
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerResponseTracingHandler;
@@ -133,14 +134,13 @@ public static void addHandler(
133134
HttpClientTracingHandler.class.getName(),
134135
new HttpClientTracingHandler());
135136

136-
// add OTEL request handler to start spans
137+
// add our custom request handler to start spans with proper context propagation
137138
pipeline.addAfter(
138139
HttpClientTracingHandler.class.getName(),
139140
io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler
140141
.class
141142
.getName(),
142-
new io.opentelemetry.instrumentation.netty.v4_1.internal.client
143-
.HttpClientRequestTracingHandler(NettyClientSingletons.instrumenter()));
143+
new OtelHttpClientRequestTracingHandler(NettyClientSingletons.instrumenter()));
144144
} else if (handler instanceof HttpRequestEncoder) {
145145
pipeline.addLast(
146146
HttpClientRequestTracingHandler.class.getName(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright The Hypertrace Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client;
18+
19+
import io.netty.channel.ChannelHandlerContext;
20+
import io.netty.channel.ChannelPromise;
21+
import io.netty.handler.codec.http.HttpRequest;
22+
import io.netty.handler.codec.http.HttpResponse;
23+
import io.opentelemetry.api.trace.Span;
24+
import io.opentelemetry.api.trace.SpanContext;
25+
import io.opentelemetry.context.Context;
26+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
27+
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
28+
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
29+
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientRequestTracingHandler;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
32+
/**
33+
* Custom extension of OpenTelemetry's HttpClientRequestTracingHandler that ensures proper context
34+
* propagation by using Context.current() as the parent context.
35+
*/
36+
public class OtelHttpClientRequestTracingHandler extends HttpClientRequestTracingHandler {
37+
38+
// Store the server context for each thread
39+
private static final ThreadLocal<Context> SERVER_CONTEXT = new ThreadLocal<>();
40+
41+
// Store the mapping from thread ID to server span context (for cross-thread scenarios)
42+
private static final ConcurrentHashMap<Long, SpanContext> THREAD_TO_SPAN_CONTEXT =
43+
new ConcurrentHashMap<>();
44+
45+
// Maximum size for the thread map before triggering cleanup
46+
private static final int MAX_THREAD_MAP_SIZE = 1000;
47+
48+
// Cleanup flag to avoid excessive synchronized blocks
49+
private static volatile boolean cleanupNeeded = false;
50+
51+
public OtelHttpClientRequestTracingHandler(
52+
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter) {
53+
super(instrumenter);
54+
}
55+
56+
/**
57+
* Stores the current context as the server context for this thread. This should be called from
58+
* the server handler.
59+
*/
60+
public static void storeServerContext(Context context) {
61+
SERVER_CONTEXT.set(context);
62+
63+
// Also store the span context by thread ID for cross-thread lookup
64+
Span span = Span.fromContext(context);
65+
if (span != null && span.getSpanContext().isValid()) {
66+
THREAD_TO_SPAN_CONTEXT.put(Thread.currentThread().getId(), span.getSpanContext());
67+
68+
// Check if we need to clean up the map
69+
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
70+
cleanupNeeded = true;
71+
}
72+
}
73+
}
74+
75+
/**
76+
* Perform cleanup of the thread map if it has grown too large. This is done in a synchronized
77+
* block to prevent concurrent modification issues.
78+
*/
79+
private static void cleanupThreadMapIfNeeded() {
80+
if (cleanupNeeded) {
81+
synchronized (THREAD_TO_SPAN_CONTEXT) {
82+
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
83+
THREAD_TO_SPAN_CONTEXT.clear();
84+
cleanupNeeded = false;
85+
}
86+
}
87+
}
88+
}
89+
90+
@Override
91+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) {
92+
try {
93+
if (!(msg instanceof HttpRequest)) {
94+
super.write(ctx, msg, prm);
95+
return;
96+
}
97+
98+
Context parentContext = SERVER_CONTEXT.get();
99+
100+
// Fallback -> If no context in thread local, try Context.current()
101+
if (parentContext == null) {
102+
parentContext = Context.current();
103+
}
104+
105+
// Store the parent context in the channel attributes
106+
// This is used by the Opentelemetry's HttpClientRequestTracingHandler in propagating correct
107+
// context.
108+
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).set(parentContext);
109+
110+
// Call the parent implementation which will use our stored parent context
111+
super.write(ctx, msg, prm);
112+
113+
// Clean up after use to prevent memory leaks
114+
SERVER_CONTEXT.remove();
115+
THREAD_TO_SPAN_CONTEXT.remove(Thread.currentThread().getId());
116+
cleanupThreadMapIfNeeded();
117+
118+
} catch (Exception ignored) {
119+
}
120+
}
121+
}

instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import io.netty.util.Attribute;
2727
import io.opentelemetry.api.common.AttributeKey;
2828
import io.opentelemetry.api.trace.Span;
29+
import io.opentelemetry.context.Context;
30+
import io.opentelemetry.context.Scope;
2931
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
3032
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.AttributeKeys;
3133
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.DataCaptureUtils;
34+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.OtelHttpClientRequestTracingHandler;
3235
import java.nio.charset.Charset;
3336
import java.util.Deque;
3437
import java.util.HashMap;
@@ -57,50 +60,59 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
5760
ctx.fireChannelRead(msg);
5861
return;
5962
}
60-
Span span = Span.fromContext(serverContexts.element().context());
6163

62-
if (msg instanceof HttpRequest) {
63-
HttpRequest httpRequest = (HttpRequest) msg;
64+
Context context = serverContexts.element().context();
6465

65-
Map<String, String> headersMap = headersToMap(httpRequest);
66-
if (instrumentationConfig.httpHeaders().request()) {
67-
headersMap.forEach(span::setAttribute);
68-
}
69-
// used by blocking handler
70-
channel.attr(AttributeKeys.REQUEST_HEADERS).set(headersMap);
66+
// Store the server context in our ThreadLocal for later use by client handlers
67+
// This is CRITICAL for proper context propagation to client spans
68+
OtelHttpClientRequestTracingHandler.storeServerContext(context);
69+
70+
try (Scope ignored = context.makeCurrent()) {
71+
Span span = Span.fromContext(context);
72+
73+
if (msg instanceof HttpRequest) {
74+
HttpRequest httpRequest = (HttpRequest) msg;
75+
76+
Map<String, String> headersMap = headersToMap(httpRequest);
77+
if (instrumentationConfig.httpHeaders().request()) {
78+
headersMap.forEach(span::setAttribute);
79+
}
80+
// used by blocking handler
81+
channel.attr(AttributeKeys.REQUEST_HEADERS).set(headersMap);
7182

72-
CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
73-
if (instrumentationConfig.httpBody().request()
74-
&& contentType != null
75-
&& ContentTypeUtils.shouldCapture(contentType.toString())) {
83+
CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
84+
if (instrumentationConfig.httpBody().request()
85+
&& contentType != null
86+
&& ContentTypeUtils.shouldCapture(contentType.toString())) {
7687

77-
CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
78-
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);
88+
CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
89+
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);
7990

80-
String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
81-
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);
91+
String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
92+
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);
8293

83-
// set the buffer to capture response body
84-
// the buffer is used byt captureBody method
85-
Attribute<BoundedByteArrayOutputStream> bufferAttr =
86-
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
87-
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));
94+
// set the buffer to capture response body
95+
// the buffer is used byt captureBody method
96+
Attribute<BoundedByteArrayOutputStream> bufferAttr =
97+
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
98+
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));
8899

89-
channel.attr(AttributeKeys.PROVIDED_CHARSET).set(charset);
100+
channel.attr(AttributeKeys.PROVIDED_CHARSET).set(charset);
101+
}
90102
}
91-
}
92103

93-
if ((msg instanceof HttpContent || msg instanceof ByteBuf)
94-
&& instrumentationConfig.httpBody().request()) {
95-
Charset charset = channel.attr(AttributeKeys.PROVIDED_CHARSET).get();
96-
if (charset == null) {
97-
charset = ContentTypeCharsetUtils.getDefaultCharset();
104+
if ((msg instanceof HttpContent || msg instanceof ByteBuf)
105+
&& instrumentationConfig.httpBody().request()) {
106+
Charset charset = channel.attr(AttributeKeys.PROVIDED_CHARSET).get();
107+
if (charset == null) {
108+
charset = ContentTypeCharsetUtils.getDefaultCharset();
109+
}
110+
DataCaptureUtils.captureBody(
111+
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
98112
}
99-
DataCaptureUtils.captureBody(
100-
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
101-
}
102113

103-
ctx.fireChannelRead(msg);
114+
ctx.fireChannelRead(msg);
115+
}
104116
}
105117

106118
private static Map<String, String> headersToMap(HttpMessage httpMessage) {

0 commit comments

Comments
 (0)