|
1 | 1 | import { BasePlugin } from '@opentelemetry/core'; |
2 | | -import { SpanKind, Span, StatusCode, Context, propagation, Link, getActiveSpan } from '@opentelemetry/api'; |
| 2 | +import { |
| 3 | + SpanKind, |
| 4 | + Span, |
| 5 | + StatusCode, |
| 6 | + Context, |
| 7 | + propagation, |
| 8 | + Link, |
| 9 | + getActiveSpan, |
| 10 | + setActiveSpan, |
| 11 | + context, |
| 12 | +} from '@opentelemetry/api'; |
3 | 13 | import { ROOT_CONTEXT } from '@opentelemetry/context-base'; |
4 | 14 | import { MessagingAttribute, MessagingOperationName } from '@opentelemetry/semantic-conventions'; |
5 | 15 | import * as shimmer from 'shimmer'; |
@@ -78,9 +88,9 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> { |
78 | 88 | const thisPlugin = this; |
79 | 89 | return function (payload: EachMessagePayload): Promise<void> { |
80 | 90 | const propagatedContext: Context = propagation.extract( |
| 91 | + ROOT_CONTEXT, |
81 | 92 | payload.message.headers, |
82 | | - bufferTextMapGetter, |
83 | | - ROOT_CONTEXT |
| 93 | + bufferTextMapGetter |
84 | 94 | ); |
85 | 95 | const span = thisPlugin._startConsumerSpan( |
86 | 96 | payload.topic, |
@@ -109,9 +119,9 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> { |
109 | 119 | return thisPlugin._tracer.withSpan(receivingSpan, () => { |
110 | 120 | const spans = payload.batch.messages.map((message: KafkaMessage) => { |
111 | 121 | const propagatedContext: Context = propagation.extract( |
| 122 | + ROOT_CONTEXT, |
112 | 123 | message.headers, |
113 | | - bufferTextMapGetter, |
114 | | - ROOT_CONTEXT |
| 124 | + bufferTextMapGetter |
115 | 125 | ); |
116 | 126 | const spanContext = getActiveSpan(propagatedContext)?.context(); |
117 | 127 | let origSpanLink: Link; |
@@ -180,13 +190,7 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> { |
180 | 190 | }); |
181 | 191 | } |
182 | 192 |
|
183 | | - private _startConsumerSpan( |
184 | | - topic: string, |
185 | | - message: KafkaMessage, |
186 | | - operation: string, |
187 | | - context: Context, |
188 | | - link?: Link |
189 | | - ) { |
| 193 | + private _startConsumerSpan(topic: string, message: KafkaMessage, operation: string, context: Context, link?: Link) { |
190 | 194 | const span = this._tracer.startSpan( |
191 | 195 | topic, |
192 | 196 | { |
@@ -219,10 +223,8 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> { |
219 | 223 | }, |
220 | 224 | }); |
221 | 225 |
|
222 | | - this._tracer.withSpan(span, () => { |
223 | | - if (!message.headers) message.headers = {}; |
224 | | - propagation.inject(message.headers); |
225 | | - }); |
| 226 | + message.headers = message.headers ?? {}; |
| 227 | + propagation.inject(setActiveSpan(context.active(), span), message.headers); |
226 | 228 |
|
227 | 229 | if (this._config?.producerHook) { |
228 | 230 | this._safeExecute([], () => this._config.producerHook!(span, topic, message), false); |
|
0 commit comments