|
19 | 19 | import io.netty.channel.ChannelHandlerContext; |
20 | 20 | import io.netty.channel.ChannelPromise; |
21 | 21 | import io.netty.handler.codec.http.HttpRequest; |
22 | | -import io.netty.handler.codec.http.HttpResponse; |
23 | 22 | import io.opentelemetry.api.trace.Span; |
24 | 23 | import io.opentelemetry.api.trace.SpanContext; |
25 | 24 | import io.opentelemetry.context.Context; |
26 | | -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; |
27 | | -import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel; |
28 | | -import java.util.concurrent.ConcurrentHashMap; |
29 | 25 | import io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys; |
30 | 26 | import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientRequestTracingHandler; |
| 27 | +import java.util.concurrent.ConcurrentHashMap; |
31 | 28 |
|
32 | 29 | /** |
33 | 30 | * Custom extension of OpenTelemetry's HttpClientRequestTracingHandler that ensures proper context |
34 | 31 | * propagation by using Context.current() as the parent context. |
35 | 32 | */ |
36 | 33 | public class OtelHttpClientRequestTracingHandler extends HttpClientRequestTracingHandler { |
37 | 34 |
|
38 | | - // Store the server context for each thread |
39 | | - private static final ThreadLocal<Context> SERVER_CONTEXT = new ThreadLocal<>(); |
| 35 | + // Store the server context for each thread |
| 36 | + private static final ThreadLocal<Context> SERVER_CONTEXT = new ThreadLocal<>(); |
40 | 37 |
|
41 | | - // Store the mapping from thread ID to server span context (for cross-thread scenarios) |
42 | | - private static final ConcurrentHashMap<Long, SpanContext> THREAD_TO_SPAN_CONTEXT = |
43 | | - new ConcurrentHashMap<>(); |
| 38 | + // Store the mapping from thread ID to server span context (for cross-thread scenarios) |
| 39 | + private static final ConcurrentHashMap<Long, SpanContext> THREAD_TO_SPAN_CONTEXT = |
| 40 | + new ConcurrentHashMap<>(); |
44 | 41 |
|
45 | | - // Maximum size for the thread map before triggering cleanup |
46 | | - private static final int MAX_THREAD_MAP_SIZE = 1000; |
| 42 | + // Maximum size for the thread map before triggering cleanup |
| 43 | + private static final int MAX_THREAD_MAP_SIZE = 1000; |
47 | 44 |
|
48 | | - // Cleanup flag to avoid excessive synchronized blocks |
49 | | - private static volatile boolean cleanupNeeded = false; |
| 45 | + // Cleanup flag to avoid excessive synchronized blocks |
| 46 | + private static volatile boolean cleanupNeeded = false; |
50 | 47 |
|
51 | | - public OtelHttpClientRequestTracingHandler() { |
52 | | - super(); |
53 | | - } |
| 48 | + public OtelHttpClientRequestTracingHandler() { |
| 49 | + super(); |
| 50 | + } |
54 | 51 |
|
55 | | - /** |
56 | | - * Stores the current context as the server context for this thread. This should be called from |
57 | | - * the server handler. |
58 | | - */ |
59 | | - public static void storeServerContext(Context context) { |
60 | | - SERVER_CONTEXT.set(context); |
61 | | - |
62 | | - // Also store the span context by thread ID for cross-thread lookup |
63 | | - Span span = Span.fromContext(context); |
64 | | - if (span != null && span.getSpanContext().isValid()) { |
65 | | - THREAD_TO_SPAN_CONTEXT.put(Thread.currentThread().getId(), span.getSpanContext()); |
66 | | - |
67 | | - // Check if we need to clean up the map |
68 | | - if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) { |
69 | | - cleanupNeeded = true; |
70 | | - } |
71 | | - } |
72 | | - } |
| 52 | + /** |
| 53 | + * Stores the current context as the server context for this thread. This should be called from |
| 54 | + * the server handler. |
| 55 | + */ |
| 56 | + public static void storeServerContext(Context context) { |
| 57 | + SERVER_CONTEXT.set(context); |
| 58 | + |
| 59 | + // Also store the span context by thread ID for cross-thread lookup |
| 60 | + Span span = Span.fromContext(context); |
| 61 | + if (span != null && span.getSpanContext().isValid()) { |
| 62 | + THREAD_TO_SPAN_CONTEXT.put(Thread.currentThread().getId(), span.getSpanContext()); |
73 | 63 |
|
74 | | - /** |
75 | | - * Perform cleanup of the thread map if it has grown too large. This is done in a synchronized |
76 | | - * block to prevent concurrent modification issues. |
77 | | - */ |
78 | | - private static void cleanupThreadMapIfNeeded() { |
79 | | - if (cleanupNeeded) { |
80 | | - synchronized (THREAD_TO_SPAN_CONTEXT) { |
81 | | - if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) { |
82 | | - THREAD_TO_SPAN_CONTEXT.clear(); |
83 | | - cleanupNeeded = false; |
84 | | - } |
85 | | - } |
| 64 | + // Check if we need to clean up the map |
| 65 | + if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) { |
| 66 | + cleanupNeeded = true; |
| 67 | + } |
| 68 | + } |
| 69 | + } |
| 70 | + |
| 71 | + /** |
| 72 | + * Perform cleanup of the thread map if it has grown too large. This is done in a synchronized |
| 73 | + * block to prevent concurrent modification issues. |
| 74 | + */ |
| 75 | + private static void cleanupThreadMapIfNeeded() { |
| 76 | + if (cleanupNeeded) { |
| 77 | + synchronized (THREAD_TO_SPAN_CONTEXT) { |
| 78 | + if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) { |
| 79 | + THREAD_TO_SPAN_CONTEXT.clear(); |
| 80 | + cleanupNeeded = false; |
86 | 81 | } |
| 82 | + } |
87 | 83 | } |
| 84 | + } |
88 | 85 |
|
89 | | - @Override |
90 | | - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { |
91 | | - try { |
92 | | - if (!(msg instanceof HttpRequest)) { |
93 | | - super.write(ctx, msg, prm); |
94 | | - return; |
95 | | - } |
| 86 | + @Override |
| 87 | + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { |
| 88 | + try { |
| 89 | + if (!(msg instanceof HttpRequest)) { |
| 90 | + super.write(ctx, msg, prm); |
| 91 | + return; |
| 92 | + } |
96 | 93 |
|
97 | | - Context parentContext = SERVER_CONTEXT.get(); |
| 94 | + Context parentContext = SERVER_CONTEXT.get(); |
98 | 95 |
|
99 | | - // Fallback -> If no context in thread local, try Context.current() |
100 | | - if (parentContext == null) { |
101 | | - parentContext = Context.current(); |
102 | | - } |
| 96 | + // Fallback -> If no context in thread local, try Context.current() |
| 97 | + if (parentContext == null) { |
| 98 | + parentContext = Context.current(); |
| 99 | + } |
103 | 100 |
|
104 | | - // Store the parent context in the channel attributes |
105 | | - // This is used by the Opentelemetry's HttpClientRequestTracingHandler in propagating correct |
106 | | - // context. |
107 | | - ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).set(parentContext); |
| 101 | + // Store the parent context in the channel attributes |
| 102 | + // This is used by the Opentelemetry's HttpClientRequestTracingHandler in propagating correct |
| 103 | + // context. |
| 104 | + ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).set(parentContext); |
108 | 105 |
|
109 | | - // Call the parent implementation which will use our stored parent context |
110 | | - super.write(ctx, msg, prm); |
| 106 | + // Call the parent implementation which will use our stored parent context |
| 107 | + super.write(ctx, msg, prm); |
111 | 108 |
|
112 | | - // Clean up after use to prevent memory leaks |
113 | | - SERVER_CONTEXT.remove(); |
114 | | - THREAD_TO_SPAN_CONTEXT.remove(Thread.currentThread().getId()); |
115 | | - cleanupThreadMapIfNeeded(); |
| 109 | + // Clean up after use to prevent memory leaks |
| 110 | + SERVER_CONTEXT.remove(); |
| 111 | + THREAD_TO_SPAN_CONTEXT.remove(Thread.currentThread().getId()); |
| 112 | + cleanupThreadMapIfNeeded(); |
116 | 113 |
|
117 | | - } catch (Exception ignored) { |
118 | | - } |
| 114 | + } catch (Exception ignored) { |
119 | 115 | } |
| 116 | + } |
120 | 117 | } |
0 commit comments