Skip to content

feat: Fix context prop for netty instrumentation #426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientRequestTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientResponseTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.OtelHttpClientRequestTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerBlockingRequestHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerRequestTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerResponseTracingHandler;
Expand Down Expand Up @@ -135,14 +136,13 @@ public static void addHandler(
HttpClientTracingHandler.class.getName(),
new HttpClientTracingHandler());

// add OTEL request handler to start spans
// add our custom request handler to start spans with proper context propagation
pipeline.addAfter(
HttpClientTracingHandler.class.getName(),
io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
.HttpClientRequestTracingHandler.class
.getName(),
new io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
.HttpClientRequestTracingHandler());
new OtelHttpClientRequestTracingHandler());
} else if (handler instanceof HttpRequestEncoder) {
pipeline.addLast(
HttpClientRequestTracingHandler.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright The Hypertrace Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import java.util.concurrent.ConcurrentHashMap;
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys;
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientRequestTracingHandler;

/**
* Custom extension of OpenTelemetry's HttpClientRequestTracingHandler that ensures proper context
* propagation by using Context.current() as the parent context.
*/
public class OtelHttpClientRequestTracingHandler extends HttpClientRequestTracingHandler {

// Store the server context for each thread
private static final ThreadLocal<Context> SERVER_CONTEXT = new ThreadLocal<>();

// Store the mapping from thread ID to server span context (for cross-thread scenarios)
private static final ConcurrentHashMap<Long, SpanContext> THREAD_TO_SPAN_CONTEXT =
new ConcurrentHashMap<>();

// Maximum size for the thread map before triggering cleanup
private static final int MAX_THREAD_MAP_SIZE = 1000;

// Cleanup flag to avoid excessive synchronized blocks
private static volatile boolean cleanupNeeded = false;

public OtelHttpClientRequestTracingHandler() {
super();
}

/**
* Stores the current context as the server context for this thread. This should be called from
* the server handler.
*/
public static void storeServerContext(Context context) {
SERVER_CONTEXT.set(context);

// Also store the span context by thread ID for cross-thread lookup
Span span = Span.fromContext(context);
if (span != null && span.getSpanContext().isValid()) {
THREAD_TO_SPAN_CONTEXT.put(Thread.currentThread().getId(), span.getSpanContext());

// Check if we need to clean up the map
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
cleanupNeeded = true;
}
}
}

/**
* Perform cleanup of the thread map if it has grown too large. This is done in a synchronized
* block to prevent concurrent modification issues.
*/
private static void cleanupThreadMapIfNeeded() {
if (cleanupNeeded) {
synchronized (THREAD_TO_SPAN_CONTEXT) {
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
THREAD_TO_SPAN_CONTEXT.clear();
cleanupNeeded = false;
}
}
}
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) {
try {
if (!(msg instanceof HttpRequest)) {
super.write(ctx, msg, prm);
return;
}

Context parentContext = SERVER_CONTEXT.get();

// Fallback -> If no context in thread local, try Context.current()
if (parentContext == null) {
parentContext = Context.current();
}

// Store the parent context in the channel attributes
// This is used by the Opentelemetry's HttpClientRequestTracingHandler in propagating correct
// context.
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).set(parentContext);

// Call the parent implementation which will use our stored parent context
super.write(ctx, msg, prm);

// Clean up after use to prevent memory leaks
SERVER_CONTEXT.remove();
THREAD_TO_SPAN_CONTEXT.remove(Thread.currentThread().getId());
cleanupThreadMapIfNeeded();

} catch (Exception ignored) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.AttributeKeys;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.DataCaptureUtils;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.OtelHttpClientRequestTracingHandler;
import org.hypertrace.agent.core.config.InstrumentationConfig;
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory;
Expand All @@ -57,47 +60,55 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
return;
}
Span span = Span.fromContext(context);

if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;

Map<String, String> headersMap = headersToMap(httpRequest);
if (instrumentationConfig.httpHeaders().request()) {
headersMap.forEach(span::setAttribute);
}
// used by blocking handler
channel.attr(AttributeKeys.REQUEST_HEADERS).set(headersMap);
// Store the server context in our ThreadLocal for later use by client handlers
// This is CRITICAL for proper context propagation to client spans
OtelHttpClientRequestTracingHandler.storeServerContext(context);

try (Scope ignored = context.makeCurrent()) {
Span span = Span.fromContext(context);

CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
if (instrumentationConfig.httpBody().request()
&& contentType != null
&& ContentTypeUtils.shouldCapture(contentType.toString())) {
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;

CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);
Map<String, String> headersMap = headersToMap(httpRequest);
if (instrumentationConfig.httpHeaders().request()) {
headersMap.forEach(span::setAttribute);
}
// used by blocking handler
channel.attr(AttributeKeys.REQUEST_HEADERS).set(headersMap);

String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);
CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
if (instrumentationConfig.httpBody().request()
&& contentType != null
&& ContentTypeUtils.shouldCapture(contentType.toString())) {

// set the buffer to capture response body
// the buffer is used byt captureBody method
Attribute<BoundedByteArrayOutputStream> bufferAttr =
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));
CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);

channel.attr(AttributeKeys.CHARSET).set(charset);
String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);

// set the buffer to capture response body
// the buffer is used byt captureBody method
Attribute<BoundedByteArrayOutputStream> bufferAttr =
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));

channel.attr(AttributeKeys.CHARSET).set(charset);
}
}
}

if ((msg instanceof HttpContent || msg instanceof ByteBuf)
&& instrumentationConfig.httpBody().request()) {
Charset charset = channel.attr(AttributeKeys.CHARSET).get();
if (charset == null) {
charset = ContentTypeCharsetUtils.getDefaultCharset();
if ((msg instanceof HttpContent || msg instanceof ByteBuf)
&& instrumentationConfig.httpBody().request()) {
Charset charset = channel.attr(AttributeKeys.CHARSET).get();
if (charset == null) {
charset = ContentTypeCharsetUtils.getDefaultCharset();
}
DataCaptureUtils.captureBody(
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
}
DataCaptureUtils.captureBody(
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
}

ctx.fireChannelRead(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientRequestTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientResponseTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.OtelHttpClientRequestTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerBlockingRequestHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerRequestTracingHandler;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerResponseTracingHandler;
Expand Down Expand Up @@ -133,14 +134,13 @@ public static void addHandler(
HttpClientTracingHandler.class.getName(),
new HttpClientTracingHandler());

// add OTEL request handler to start spans
// add our custom request handler to start spans with proper context propagation
pipeline.addAfter(
HttpClientTracingHandler.class.getName(),
io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler
.class
.getName(),
new io.opentelemetry.instrumentation.netty.v4_1.internal.client
.HttpClientRequestTracingHandler(NettyClientSingletons.instrumenter()));
new OtelHttpClientRequestTracingHandler(NettyClientSingletons.instrumenter()));
} else if (handler instanceof HttpRequestEncoder) {
pipeline.addLast(
HttpClientRequestTracingHandler.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright The Hypertrace Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientRequestTracingHandler;
import java.util.concurrent.ConcurrentHashMap;

/**
* Custom extension of OpenTelemetry's HttpClientRequestTracingHandler that ensures proper context
* propagation by using Context.current() as the parent context.
*/
public class OtelHttpClientRequestTracingHandler extends HttpClientRequestTracingHandler {

// Store the server context for each thread
private static final ThreadLocal<Context> SERVER_CONTEXT = new ThreadLocal<>();

// Store the mapping from thread ID to server span context (for cross-thread scenarios)
private static final ConcurrentHashMap<Long, SpanContext> THREAD_TO_SPAN_CONTEXT =
new ConcurrentHashMap<>();

// Maximum size for the thread map before triggering cleanup
private static final int MAX_THREAD_MAP_SIZE = 1000;

// Cleanup flag to avoid excessive synchronized blocks
private static volatile boolean cleanupNeeded = false;

public OtelHttpClientRequestTracingHandler(
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter) {
super(instrumenter);
}

/**
* Stores the current context as the server context for this thread. This should be called from
* the server handler.
*/
public static void storeServerContext(Context context) {
SERVER_CONTEXT.set(context);

// Also store the span context by thread ID for cross-thread lookup
Span span = Span.fromContext(context);
if (span != null && span.getSpanContext().isValid()) {
THREAD_TO_SPAN_CONTEXT.put(Thread.currentThread().getId(), span.getSpanContext());

// Check if we need to clean up the map
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
cleanupNeeded = true;
}
}
}

/**
* Perform cleanup of the thread map if it has grown too large. This is done in a synchronized
* block to prevent concurrent modification issues.
*/
private static void cleanupThreadMapIfNeeded() {
if (cleanupNeeded) {
synchronized (THREAD_TO_SPAN_CONTEXT) {
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
THREAD_TO_SPAN_CONTEXT.clear();
cleanupNeeded = false;
}
}
}
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) {
try {
if (!(msg instanceof HttpRequest)) {
super.write(ctx, msg, prm);
return;
}

Context parentContext = SERVER_CONTEXT.get();

// Fallback -> If no context in thread local, try Context.current()
if (parentContext == null) {
parentContext = Context.current();
}

// Store the parent context in the channel attributes
// This is used by the Opentelemetry's HttpClientRequestTracingHandler in propagating correct
// context.
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).set(parentContext);

// Call the parent implementation which will use our stored parent context
super.write(ctx, msg, prm);

// Clean up after use to prevent memory leaks
SERVER_CONTEXT.remove();
THREAD_TO_SPAN_CONTEXT.remove(Thread.currentThread().getId());
cleanupThreadMapIfNeeded();

} catch (Exception ignored) {
}
}
}
Loading
Loading