Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit 009ff36

Browse files
author
Amir Blum
authored
fix: message process loop patch should use the original operation context (#59)
1 parent b97308a commit 009ff36

File tree

4 files changed

+71
-45
lines changed

4 files changed

+71
-45
lines changed

packages/plugin-aws-sdk/src/aws-sdk.ts

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,16 @@
99
callback | 1 | 2
1010
*/
1111
import { BasePlugin } from '@opentelemetry/core';
12-
import { Span, StatusCode, Attributes, SpanKind, context, setSpan, suppressInstrumentation } from '@opentelemetry/api';
12+
import {
13+
Span,
14+
StatusCode,
15+
Attributes,
16+
SpanKind,
17+
context,
18+
setSpan,
19+
suppressInstrumentation,
20+
Context,
21+
} from '@opentelemetry/api';
1322
import * as shimmer from 'shimmer';
1423
import AWS from 'aws-sdk';
1524
import { AttributeNames } from './enums';
@@ -43,16 +52,13 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
4352
shimmer.unwrap(this._moduleExports?.Request.prototype, 'promise');
4453
}
4554

46-
private _bindPromise(target: Promise<any>, span: Span) {
47-
const thisPlugin = this;
48-
55+
private _bindPromise(target: Promise<any>, contextForCallbacks: Context) {
4956
const origThen = target.then;
5057
target.then = function (onFulfilled, onRejected) {
51-
const newOnFulfilled = context.bind(onFulfilled, setSpan(context.active(), span));
52-
const newOnRejected = context.bind(onRejected, setSpan(context.active(), span));
58+
const newOnFulfilled = context.bind(onFulfilled, contextForCallbacks);
59+
const newOnRejected = context.bind(onRejected, contextForCallbacks);
5360
return origThen.call(this, newOnFulfilled, newOnRejected);
5461
};
55-
5662
return target;
5763
}
5864

@@ -96,25 +102,28 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
96102
}
97103
}
98104

99-
private _registerCompletedEvent(span: Span, request: AWS.Request<any, any>) {
105+
private _registerCompletedEvent(span: Span, request: AWS.Request<any, any>, completedEventContext: Context) {
100106
const thisPlugin = this;
101107
request.on('complete', (response) => {
102-
if (!request[thisPlugin.REQUEST_SPAN_KEY]) {
103-
return;
104-
}
105-
request[thisPlugin.REQUEST_SPAN_KEY] = undefined;
106-
107-
if (response.error) {
108-
span.setAttribute(AttributeNames.AWS_ERROR, response.error);
109-
}
110-
111-
this._callUserResponseHook(span, response);
112-
this.servicesExtensions.responseHook(response, span);
113-
114-
span.setAttributes({
115-
[AttributeNames.AWS_REQUEST_ID]: response.requestId,
108+
// read issue https://github.com/aspecto-io/opentelemetry-ext-js/issues/60
109+
context.with(completedEventContext, () => {
110+
if (!request[thisPlugin.REQUEST_SPAN_KEY]) {
111+
return;
112+
}
113+
request[thisPlugin.REQUEST_SPAN_KEY] = undefined;
114+
115+
if (response.error) {
116+
span.setAttribute(AttributeNames.AWS_ERROR, response.error);
117+
}
118+
119+
this._callUserResponseHook(span, response);
120+
this.servicesExtensions.responseHook(response, span);
121+
122+
span.setAttributes({
123+
[AttributeNames.AWS_REQUEST_ID]: response.requestId,
124+
});
125+
span.end();
116126
});
117-
span.end();
118127
});
119128
}
120129

@@ -137,11 +146,13 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
137146
requestMetadata.spanKind,
138147
requestMetadata.spanName
139148
);
149+
const activeContextWithSpan = setSpan(context.active(), span);
150+
const callbackWithContext = context.bind(callback, activeContextWithSpan);
151+
140152
thisPlugin._callUserPreRequestHook(span, awsRequest);
141-
thisPlugin._registerCompletedEvent(span, awsRequest);
153+
thisPlugin._registerCompletedEvent(span, awsRequest, activeContextWithSpan);
142154

143-
const callbackWithContext = context.bind(callback, setSpan(context.active(), span));
144-
return context.with(setSpan(context.active(), span), () => {
155+
return context.with(activeContextWithSpan, () => {
145156
thisPlugin.servicesExtensions.requestPostSpanHook(awsRequest);
146157
return thisPlugin._callOriginalFunction(() => original.call(awsRequest, callbackWithContext));
147158
});
@@ -167,15 +178,19 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
167178
requestMetadata.spanKind,
168179
requestMetadata.spanName
169180
);
181+
182+
const activeContextWithSpan = setSpan(context.active(), span);
170183
thisPlugin._callUserPreRequestHook(span, awsRequest);
171-
thisPlugin._registerCompletedEvent(span, awsRequest);
184+
thisPlugin._registerCompletedEvent(span, awsRequest, activeContextWithSpan);
172185

173-
const origPromise: Promise<any> = context.with(setSpan(context.active(), span), () => {
186+
const origPromise: Promise<any> = context.with(activeContextWithSpan, () => {
174187
thisPlugin.servicesExtensions.requestPostSpanHook(awsRequest);
175188
return thisPlugin._callOriginalFunction(() => original.call(awsRequest, arguments));
176189
});
177190

178-
return requestMetadata.isIncoming ? thisPlugin._bindPromise(origPromise, span) : origPromise;
191+
return requestMetadata.isIncoming
192+
? thisPlugin._bindPromise(origPromise, activeContextWithSpan)
193+
: origPromise;
179194
};
180195
}
181196

packages/plugin-aws-sdk/src/services/sqs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ export class SqsServiceExtension implements ServiceExtension {
158158
this.sqsProcessHook ? this.sqsProcessHook(span, message) : {},
159159
});
160160

161-
pubsubPropagation.patchArrayForProcessSpans(messages, this.tracer);
161+
pubsubPropagation.patchArrayForProcessSpans(messages, this.tracer, context.active());
162162
}
163163
};
164164

packages/plugin-aws-sdk/test/sqs.spec.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,19 @@ describe('sqs', () => {
170170
expectReceiver2ProcessWithNChildrenEach(spans, 2);
171171
};
172172

173+
const contextKeyFromTest = Symbol('context key from test');
174+
const contextValueFromTest = 'context value from test';
175+
173176
beforeEach(async () => {
174177
const sqs = new AWS.SQS();
175-
const res = await sqs
176-
.receiveMessage({
177-
QueueUrl: 'queue/url/for/unittests',
178-
})
179-
.promise();
180-
receivedMessages = res.Messages;
178+
await context.with(context.active().setValue(contextKeyFromTest, contextValueFromTest), async () => {
179+
const res = await sqs
180+
.receiveMessage({
181+
QueueUrl: 'queue/url/for/unittests',
182+
})
183+
.promise();
184+
receivedMessages = res.Messages;
185+
});
181186
});
182187

183188
it('should create processing child with forEach', async () => {
@@ -293,6 +298,12 @@ describe('sqs', () => {
293298
}
294299
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
295300
});
301+
302+
it('should propagate the context of the receive call in process spans loop', async () => {
303+
receivedMessages.forEach(() => {
304+
expect(context.active().getValue(contextKeyFromTest)).toStrictEqual(contextValueFromTest);
305+
});
306+
});
296307
});
297308

298309
describe('hooks', () => {

packages/propagation-utils/src/pubsub-propagation.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ import { Tracer, SpanKind, Span, Context, Link, getSpanContext, context, setSpan
33
const START_SPAN_FUNCTION = Symbol('opentelemetry.pubsub-propagation.start_span');
44
const END_SPAN_FUNCTION = Symbol('opentelemetry.pubsub-propagation.end_span');
55

6-
const patchArrayFilter = (messages: any[], tracer: Tracer) => {
6+
const patchArrayFilter = (messages: any[], tracer: Tracer, loopContext: Context) => {
77
const origFunc = messages.filter;
88
const patchedFunc = function (..._args) {
99
const newArray = origFunc.apply(this, arguments);
10-
patchArrayForProcessSpans(newArray, tracer);
10+
patchArrayForProcessSpans(newArray, tracer, loopContext);
1111
return newArray;
1212
};
1313

@@ -17,14 +17,14 @@ const patchArrayFilter = (messages: any[], tracer: Tracer) => {
1717
});
1818
};
1919

20-
const patchArrayFunction = (messages: any[], functionName: string, tracer: Tracer) => {
20+
const patchArrayFunction = (messages: any[], functionName: string, tracer: Tracer, loopContext: Context) => {
2121
const origFunc = messages[functionName];
2222
const patchedFunc = function (callback: any, thisArg: any) {
2323
const wrappedCallback = function (message: any) {
2424
const messageSpan = message?.[START_SPAN_FUNCTION]?.();
2525
if (!messageSpan) return callback.apply(this, arguments);
2626

27-
const res = context.with(setSpan(context.active(), messageSpan), () => {
27+
const res = context.with(setSpan(loopContext, messageSpan), () => {
2828
try {
2929
return callback.apply(this, arguments);
3030
} catch (err) {
@@ -49,7 +49,7 @@ const patchArrayFunction = (messages: any[], functionName: string, tracer: Trace
4949
return res;
5050
};
5151
const funcResult = origFunc.call(this, wrappedCallback, thisArg);
52-
if (Array.isArray(funcResult)) patchArrayForProcessSpans(funcResult, tracer);
52+
if (Array.isArray(funcResult)) patchArrayForProcessSpans(funcResult, tracer, loopContext);
5353
return funcResult;
5454
};
5555

@@ -59,10 +59,10 @@ const patchArrayFunction = (messages: any[], functionName: string, tracer: Trace
5959
});
6060
};
6161

62-
const patchArrayForProcessSpans = (messages: any[], tracer: Tracer) => {
63-
patchArrayFunction(messages, 'forEach', tracer);
64-
patchArrayFunction(messages, 'map', tracer);
65-
patchArrayFilter(messages, tracer);
62+
const patchArrayForProcessSpans = (messages: any[], tracer: Tracer, loopContext: Context = context.active()) => {
63+
patchArrayFunction(messages, 'forEach', tracer, loopContext);
64+
patchArrayFunction(messages, 'map', tracer, loopContext);
65+
patchArrayFilter(messages, tracer, loopContext);
6666
};
6767

6868
const startMessagingProcessSpan = <T>(

0 commit comments

Comments
 (0)