Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit ba97daf

Browse files
author
Amir Blum
committed
fix(opentelemetry-plugin-aws-sdk): add tests and support more iterations in sqs receive
1 parent a128f42 commit ba97daf

File tree

2 files changed

+178
-46
lines changed

2 files changed

+178
-46
lines changed

packages/plugin-aws-sdk/src/services/sqs.ts

Lines changed: 92 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import * as AWS from "aws-sdk";
1212
import {
1313
getExtractedSpanContext,
1414
TRACE_PARENT_HEADER,
15+
setActiveSpan,
1516
} from "@opentelemetry/core";
1617
import {
1718
MessageBodyAttributeMap,
@@ -148,28 +149,25 @@ export class SqsServiceExtension implements ServiceExtension {
148149
const queueName = this.extractQueueNameFromUrl(queueUrl);
149150

150151
messages.forEach((message: AWS.SQS.Message) => {
151-
const parentContext: Context = propagation.extract(
152+
const extractedParentContext: Context = propagation.extract(
152153
message.MessageAttributes,
153154
contextGetterFunc
154155
);
155-
message[START_SPAN_FUNCTION] = () => {
156-
return this.tracer.withSpan(span, () =>
157-
this.startSingleMessageSpan(
158-
queueUrl,
159-
queueName,
160-
message,
161-
parentContext
162-
)
156+
message[START_SPAN_FUNCTION] = () =>
157+
this.startMessagingProcessSpan(
158+
queueUrl,
159+
queueName,
160+
message,
161+
span,
162+
extractedParentContext
163163
);
164-
};
165164
message[END_SPAN_FUNCTION] = () =>
166165
console.log(
167166
"open-telemetry aws-sdk plugin: end span called on sqs message which was not started"
168167
);
169168
});
170169

171-
this.patchArrayFunction(messages, "forEach");
172-
this.patchArrayFunction(messages, "map");
170+
this.patchArrayForProcessSpans(messages);
173171
}
174172
};
175173

@@ -186,10 +184,11 @@ export class SqsServiceExtension implements ServiceExtension {
186184
return pisces[pisces.length - 1];
187185
};
188186

189-
startSingleMessageSpan(
187+
startMessagingProcessSpan(
190188
queueUrl: string,
191189
queueName: string,
192190
message: AWS.SQS.Message,
191+
receiveMessageSpan: Span,
193192
propagtedContext: Context
194193
): Span {
195194
const links: Link[] = [];
@@ -211,34 +210,98 @@ export class SqsServiceExtension implements ServiceExtension {
211210
[SqsAttributeNames.MESSAGING_OPERATION]: "process",
212211
},
213212
links,
213+
parent: receiveMessageSpan,
214214
});
215215

216-
message[START_SPAN_FUNCTION] = () =>
217-
console.log(
218-
"open-telemetry aws-sdk plugin: trying to start sqs processing span twice."
219-
);
216+
message[START_SPAN_FUNCTION] = () => messageSpan;
220217
message[END_SPAN_FUNCTION] = () => {
221218
messageSpan.end();
222-
message[END_SPAN_FUNCTION] = () =>
223-
console.log(
224-
"open-telemetry aws-sdk plugin: trying to end sqs processing span which was already ended."
225-
);
219+
message[END_SPAN_FUNCTION] = () => {};
226220
};
227221
return messageSpan;
228222
}
229223

230-
patchArrayFunction(messages: AWS.SQS.Message[], functionName: string) {
224+
patchArrayForProcessSpans(messages: any[]) {
225+
this.patchArrayFunction(messages, "forEach");
226+
this.patchArrayFunction(messages, "map");
227+
this.patchArrayFilter(messages);
228+
// this.patchArrayIterators(messages, 'values');
229+
// this.patchArrayIterators(messages, Symbol.iterator);
230+
}
231+
232+
// the following implementation relays on the fact that
233+
// an array element processing is done once the iterator is accessed again.
234+
// it fails in case of throw\return\break from the loop body which cause
235+
// context manager data structure to be broken
236+
//
237+
// patchArrayIterators(messages: any[], functionName: any) {
238+
// const self = this;
239+
240+
// const contextManager = context["_getContextManager"]?.();
241+
// const enterContext = contextManager?.["_enterContext"]?.bind(contextManager);
242+
// const exitContext = contextManager?.["_exitContext"]?.bind(contextManager);
243+
// if(!enterContext || !exitContext) return;
244+
245+
// let activeMessage: any;
246+
247+
// const origFunc = messages[functionName];
248+
// messages[functionName] = function (...args: unknown[]) {
249+
// const iterator: IterableIterator<unknown> = origFunc.apply(
250+
// this,
251+
// arguments
252+
// );
253+
// const iteratorNext = iterator.next;
254+
// iterator.next = function (...args: unknown[]) {
255+
// const iteratorNextResult = iteratorNext.apply(this, arguments);
256+
// if(activeMessage) {
257+
// exitContext();
258+
// activeMessage?.[END_SPAN_FUNCTION]?.();
259+
// }
260+
// const messageSpan = iteratorNextResult.value?.[START_SPAN_FUNCTION]?.();
261+
// if(messageSpan) {
262+
// enterContext(setActiveSpan(context.active(), messageSpan));
263+
// activeMessage = iteratorNextResult.value;
264+
// }
265+
// return iteratorNextResult;
266+
// };
267+
// return iterator;
268+
// };
269+
// }
270+
271+
patchArrayFilter(messages: any[]) {
272+
const self = this;
273+
const origFunc = messages.filter;
274+
messages.filter = function (...args) {
275+
const newArray = origFunc.apply(this, arguments);
276+
self.patchArrayForProcessSpans(newArray);
277+
return newArray;
278+
};
279+
}
280+
281+
patchArrayFunction(messages: any[], functionName: string) {
231282
const self = this;
232283
const origFunc = messages[functionName];
233-
messages[functionName] = function (callback) {
234-
return origFunc.call(this, function (message: AWS.SQS.Message) {
284+
messages[functionName] = function (callback, thisArg) {
285+
const wrappedCallback = function (message: AWS.SQS.Message) {
235286
const messageSpan = message[START_SPAN_FUNCTION]();
236-
const res = self.tracer.withSpan(messageSpan, () =>
237-
callback.apply(this, arguments)
238-
);
239-
message[END_SPAN_FUNCTION]();
287+
const res = self.tracer.withSpan(messageSpan, () => {
288+
try {
289+
return callback.apply(this, arguments);
290+
} catch (err) {
291+
throw err;
292+
} finally {
293+
message[END_SPAN_FUNCTION]();
294+
}
295+
});
296+
if (res) {
297+
res[START_SPAN_FUNCTION] = message[START_SPAN_FUNCTION];
298+
res[END_SPAN_FUNCTION] = message[END_SPAN_FUNCTION];
299+
}
240300
return res;
241-
});
301+
};
302+
const funcResult = origFunc.call(this, wrappedCallback, thisArg);
303+
if (Array.isArray(funcResult)) self.patchArrayForProcessSpans(funcResult);
304+
return funcResult;
242305
};
243306
}
244307

packages/plugin-aws-sdk/test/sqs.spec.ts

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,10 @@ describe("sqs", () => {
154154
processChildSpan.end();
155155
};
156156

157-
const expectReceiver2ProcessWithOneChildEach = (spans: ReadableSpan[]) => {
157+
const expectReceiver2ProcessWithNChildrenEach = (
158+
spans: ReadableSpan[],
159+
numChildPerProcessSpan: number
160+
) => {
158161
const awsReceiveSpan = spans.filter(
159162
(s) => s.attributes[SqsAttributeNames.MESSAGING_OPERATION] === "receive"
160163
);
@@ -174,13 +177,23 @@ describe("sqs", () => {
174177
const processChildSpans = spans.filter(
175178
(s) => s.kind === SpanKind.INTERNAL
176179
);
177-
expect(processChildSpans.length).toBe(2);
178-
expect(processChildSpans[0].parentSpanId).toStrictEqual(
179-
processSpans[0].spanContext.spanId
180-
);
181-
expect(processChildSpans[1].parentSpanId).toStrictEqual(
182-
processSpans[1].spanContext.spanId
183-
);
180+
expect(processChildSpans.length).toBe(2 * numChildPerProcessSpan);
181+
for (let i = 0; i < numChildPerProcessSpan; i++) {
182+
expect(processChildSpans[2 * i + 0].parentSpanId).toStrictEqual(
183+
processSpans[0].spanContext.spanId
184+
);
185+
expect(processChildSpans[2 * i + 1].parentSpanId).toStrictEqual(
186+
processSpans[1].spanContext.spanId
187+
);
188+
}
189+
};
190+
191+
const expectReceiver2ProcessWith1ChildEach = (spans: ReadableSpan[]) => {
192+
expectReceiver2ProcessWithNChildrenEach(spans, 1);
193+
};
194+
195+
const expectReceiver2ProcessWith2ChildEach = (spans: ReadableSpan[]) => {
196+
expectReceiver2ProcessWithNChildrenEach(spans, 2);
184197
};
185198

186199
beforeEach(async () => {
@@ -197,54 +210,110 @@ describe("sqs", () => {
197210
receivedMessages.forEach((msg) => {
198211
createProcessChildSpan(msg.Body);
199212
});
200-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
213+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
201214
});
202215

203216
it("should create processing child with map", async () => {
204217
receivedMessages.map((msg) => {
205218
createProcessChildSpan(msg.Body);
206219
});
207-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
220+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
221+
});
222+
223+
it("should create one processing child when throws in map", async () => {
224+
try {
225+
receivedMessages.map((msg) => {
226+
createProcessChildSpan(msg.Body);
227+
throw Error("error from array.map");
228+
});
229+
} catch (err) {}
230+
231+
const processChildSpans = memoryExporter
232+
.getFinishedSpans()
233+
.filter((s) => s.kind === SpanKind.INTERNAL);
234+
expect(processChildSpans.length).toBe(1);
235+
});
236+
237+
it("should create processing child with two forEach", async () => {
238+
receivedMessages.forEach((msg) => {
239+
createProcessChildSpan(msg.Body);
240+
});
241+
receivedMessages.forEach((msg) => {
242+
createProcessChildSpan(msg.Body);
243+
});
244+
expectReceiver2ProcessWith2ChildEach(memoryExporter.getFinishedSpans());
245+
});
246+
247+
it("should forward all parameters to forEach callback", async () => {
248+
const objectForThis = {};
249+
receivedMessages.forEach(function (msg, index, array) {
250+
expect(msg).not.toBeUndefined();
251+
expect(index).toBeLessThan(2);
252+
expect(index).toBeGreaterThanOrEqual(0);
253+
expect(array).toBe(receivedMessages);
254+
expect(this).toBe(objectForThis);
255+
}, objectForThis);
256+
});
257+
258+
it("should create one processing child with forEach that throws", async () => {
259+
try {
260+
receivedMessages.forEach((msg) => {
261+
createProcessChildSpan(msg.Body);
262+
throw Error("error from forEach");
263+
});
264+
} catch (err) {}
265+
const processChildSpans = memoryExporter
266+
.getFinishedSpans()
267+
.filter((s) => s.kind === SpanKind.INTERNAL);
268+
expect(processChildSpans.length).toBe(1);
208269
});
209270

210271
it.skip("should create processing child with array index access", async () => {
211272
for (let i = 0; i < receivedMessages.length; i++) {
212273
const msg = receivedMessages[i];
213274
createProcessChildSpan(msg.Body);
214275
}
215-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
276+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
216277
});
217278

218-
it.skip("should create processing child with map and forEach calls", async () => {
279+
it("should create processing child with map and forEach calls", async () => {
219280
receivedMessages
220281
.map((msg) => JSON.parse(msg.Body))
221282
.forEach((msgBody) => {
222283
createProcessChildSpan(msgBody);
223284
});
224-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
285+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
225286
});
226287

227-
it.skip("should create processing child with filter and forEach", async () => {
288+
it("should create processing child with filter and forEach", async () => {
228289
receivedMessages
229290
.filter((msg) => msg)
230291
.forEach((msgBody) => {
231292
createProcessChildSpan(msgBody);
232293
});
233-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
294+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
234295
});
235296

236297
it.skip("should create processing child with for(msg of messages)", () => {
237298
for (const msg of receivedMessages) {
238299
createProcessChildSpan(msg.Body);
239300
}
240-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
301+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
241302
});
242303

243304
it.skip("should create processing child with array.values() for loop", () => {
244305
for (const msg of receivedMessages.values()) {
245306
createProcessChildSpan(msg.Body);
246307
}
247-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
308+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
309+
});
310+
311+
it.skip("should create processing child with array.values() for loop and awaits in process", async () => {
312+
for (const msg of receivedMessages.values()) {
313+
await new Promise((resolve) => setImmediate(resolve));
314+
createProcessChildSpan(msg.Body);
315+
}
316+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
248317
});
249318
});
250319
});

0 commit comments

Comments
 (0)