Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit 3bf21ff

Browse files
author
Amir Blum
authored
Merge pull request #24 from aspecto-io/add-tests
feat(opentelemetry-plugin-aws-sdk): support more iterations on sqs received messages
2 parents ee3e3d6 + 426af77 commit 3bf21ff

File tree

4 files changed

+175
-50
lines changed

4 files changed

+175
-50
lines changed

packages/plugin-aws-sdk/src/services/sqs.ts

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import * as AWS from "aws-sdk";
1212
import {
1313
getExtractedSpanContext,
1414
TRACE_PARENT_HEADER,
15+
setActiveSpan,
1516
} from "@opentelemetry/core";
1617
import {
1718
MessageBodyAttributeMap,
@@ -148,28 +149,25 @@ export class SqsServiceExtension implements ServiceExtension {
148149
const queueName = this.extractQueueNameFromUrl(queueUrl);
149150

150151
messages.forEach((message: AWS.SQS.Message) => {
151-
const parentContext: Context = propagation.extract(
152+
const extractedParentContext: Context = propagation.extract(
152153
message.MessageAttributes,
153154
contextGetterFunc
154155
);
155-
message[START_SPAN_FUNCTION] = () => {
156-
return this.tracer.withSpan(span, () =>
157-
this.startSingleMessageSpan(
158-
queueUrl,
159-
queueName,
160-
message,
161-
parentContext
162-
)
156+
message[START_SPAN_FUNCTION] = () =>
157+
this.startMessagingProcessSpan(
158+
queueUrl,
159+
queueName,
160+
message,
161+
span,
162+
extractedParentContext
163163
);
164-
};
165164
message[END_SPAN_FUNCTION] = () =>
166165
console.log(
167166
"open-telemetry aws-sdk plugin: end span called on sqs message which was not started"
168167
);
169168
});
170169

171-
this.patchArrayFunction(messages, "forEach");
172-
this.patchArrayFunction(messages, "map");
170+
this.patchArrayForProcessSpans(messages);
173171
}
174172
};
175173

@@ -186,10 +184,11 @@ export class SqsServiceExtension implements ServiceExtension {
186184
return pisces[pisces.length - 1];
187185
};
188186

189-
startSingleMessageSpan(
187+
startMessagingProcessSpan(
190188
queueUrl: string,
191189
queueName: string,
192190
message: AWS.SQS.Message,
191+
receiveMessageSpan: Span,
193192
propagtedContext: Context
194193
): Span {
195194
const links: Link[] = [];
@@ -211,34 +210,57 @@ export class SqsServiceExtension implements ServiceExtension {
211210
[SqsAttributeNames.MESSAGING_OPERATION]: "process",
212211
},
213212
links,
213+
parent: receiveMessageSpan,
214214
});
215215

216-
message[START_SPAN_FUNCTION] = () =>
217-
console.log(
218-
"open-telemetry aws-sdk plugin: trying to start sqs processing span twice."
219-
);
216+
message[START_SPAN_FUNCTION] = () => messageSpan;
220217
message[END_SPAN_FUNCTION] = () => {
221218
messageSpan.end();
222-
message[END_SPAN_FUNCTION] = () =>
223-
console.log(
224-
"open-telemetry aws-sdk plugin: trying to end sqs processing span which was already ended."
225-
);
219+
message[END_SPAN_FUNCTION] = () => {};
226220
};
227221
return messageSpan;
228222
}
229223

230-
patchArrayFunction(messages: AWS.SQS.Message[], functionName: string) {
224+
patchArrayForProcessSpans(messages: any[]) {
225+
this.patchArrayFunction(messages, "forEach");
226+
this.patchArrayFunction(messages, "map");
227+
this.patchArrayFilter(messages);
228+
}
229+
230+
patchArrayFilter(messages: any[]) {
231+
const self = this;
232+
const origFunc = messages.filter;
233+
messages.filter = function (...args) {
234+
const newArray = origFunc.apply(this, arguments);
235+
self.patchArrayForProcessSpans(newArray);
236+
return newArray;
237+
};
238+
}
239+
240+
patchArrayFunction(messages: any[], functionName: string) {
231241
const self = this;
232242
const origFunc = messages[functionName];
233-
messages[functionName] = function (callback) {
234-
return origFunc.call(this, function (message: AWS.SQS.Message) {
243+
messages[functionName] = function (callback, thisArg) {
244+
const wrappedCallback = function (message: AWS.SQS.Message) {
235245
const messageSpan = message[START_SPAN_FUNCTION]();
236-
const res = self.tracer.withSpan(messageSpan, () =>
237-
callback.apply(this, arguments)
238-
);
239-
message[END_SPAN_FUNCTION]();
246+
const res = self.tracer.withSpan(messageSpan, () => {
247+
try {
248+
return callback.apply(this, arguments);
249+
} catch (err) {
250+
throw err;
251+
} finally {
252+
message[END_SPAN_FUNCTION]();
253+
}
254+
});
255+
if (res) {
256+
res[START_SPAN_FUNCTION] = message[START_SPAN_FUNCTION];
257+
res[END_SPAN_FUNCTION] = message[END_SPAN_FUNCTION];
258+
}
240259
return res;
241-
});
260+
};
261+
const funcResult = origFunc.call(this, wrappedCallback, thisArg);
262+
if (Array.isArray(funcResult)) self.patchArrayForProcessSpans(funcResult);
263+
return funcResult;
242264
};
243265
}
244266

packages/plugin-aws-sdk/test/aws-sdk.spec.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,23 @@ describe("plugin-aws-sdk", () => {
217217
);
218218
done();
219219
});
220+
221+
it("should create span if no callback is supplied", (done) => {
222+
const s3 = new AWS.S3();
223+
const bucketName = "aws-test-bucket";
224+
225+
s3.putObject({
226+
Bucket: bucketName,
227+
Key: "key name from tests",
228+
Body: "Hello World!",
229+
}).send();
230+
231+
setImmediate(() => {
232+
const awsSpans = getAwsSpans();
233+
expect(awsSpans.length).toBe(1);
234+
done();
235+
});
236+
});
220237
});
221238

222239
describe("send return error", () => {

packages/plugin-aws-sdk/test/sqs.spec.ts

Lines changed: 101 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,21 @@ describe("sqs", () => {
7373
};
7474

7575
it("should set parent context in sqs receive callback", async (done) => {
76+
const sqs = new AWS.SQS();
77+
sqs.receiveMessage(
78+
{
79+
QueueUrl: "queue/url/for/unittests",
80+
},
81+
(err: AWSError, data: AWS.SQS.Types.ReceiveMessageResult) => {
82+
expect(err).toBeFalsy();
83+
createReceiveChildSpan();
84+
expectReceiverWithChildSpan(memoryExporter.getFinishedSpans());
85+
done();
86+
}
87+
);
88+
});
89+
90+
it("should set parent context in sqs receive 'send' callback", async (done) => {
7691
const sqs = new AWS.SQS();
7792
sqs
7893
.receiveMessage({
@@ -139,7 +154,10 @@ describe("sqs", () => {
139154
processChildSpan.end();
140155
};
141156

142-
const expectReceiver2ProcessWithOneChildEach = (spans: ReadableSpan[]) => {
157+
const expectReceiver2ProcessWithNChildrenEach = (
158+
spans: ReadableSpan[],
159+
numChildPerProcessSpan: number
160+
) => {
143161
const awsReceiveSpan = spans.filter(
144162
(s) => s.attributes[SqsAttributeNames.MESSAGING_OPERATION] === "receive"
145163
);
@@ -159,13 +177,23 @@ describe("sqs", () => {
159177
const processChildSpans = spans.filter(
160178
(s) => s.kind === SpanKind.INTERNAL
161179
);
162-
expect(processChildSpans.length).toBe(2);
163-
expect(processChildSpans[0].parentSpanId).toStrictEqual(
164-
processSpans[0].spanContext.spanId
165-
);
166-
expect(processChildSpans[1].parentSpanId).toStrictEqual(
167-
processSpans[1].spanContext.spanId
168-
);
180+
expect(processChildSpans.length).toBe(2 * numChildPerProcessSpan);
181+
for (let i = 0; i < numChildPerProcessSpan; i++) {
182+
expect(processChildSpans[2 * i + 0].parentSpanId).toStrictEqual(
183+
processSpans[0].spanContext.spanId
184+
);
185+
expect(processChildSpans[2 * i + 1].parentSpanId).toStrictEqual(
186+
processSpans[1].spanContext.spanId
187+
);
188+
}
189+
};
190+
191+
const expectReceiver2ProcessWith1ChildEach = (spans: ReadableSpan[]) => {
192+
expectReceiver2ProcessWithNChildrenEach(spans, 1);
193+
};
194+
195+
const expectReceiver2ProcessWith2ChildEach = (spans: ReadableSpan[]) => {
196+
expectReceiver2ProcessWithNChildrenEach(spans, 2);
169197
};
170198

171199
beforeEach(async () => {
@@ -182,54 +210,110 @@ describe("sqs", () => {
182210
receivedMessages.forEach((msg) => {
183211
createProcessChildSpan(msg.Body);
184212
});
185-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
213+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
186214
});
187215

188216
it("should create processing child with map", async () => {
189217
receivedMessages.map((msg) => {
190218
createProcessChildSpan(msg.Body);
191219
});
192-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
220+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
221+
});
222+
223+
it("should create one processing child when throws in map", async () => {
224+
try {
225+
receivedMessages.map((msg) => {
226+
createProcessChildSpan(msg.Body);
227+
throw Error("error from array.map");
228+
});
229+
} catch (err) {}
230+
231+
const processChildSpans = memoryExporter
232+
.getFinishedSpans()
233+
.filter((s) => s.kind === SpanKind.INTERNAL);
234+
expect(processChildSpans.length).toBe(1);
235+
});
236+
237+
it("should create processing child with two forEach", async () => {
238+
receivedMessages.forEach((msg) => {
239+
createProcessChildSpan(msg.Body);
240+
});
241+
receivedMessages.forEach((msg) => {
242+
createProcessChildSpan(msg.Body);
243+
});
244+
expectReceiver2ProcessWith2ChildEach(memoryExporter.getFinishedSpans());
245+
});
246+
247+
it("should forward all parameters to forEach callback", async () => {
248+
const objectForThis = {};
249+
receivedMessages.forEach(function (msg, index, array) {
250+
expect(msg).not.toBeUndefined();
251+
expect(index).toBeLessThan(2);
252+
expect(index).toBeGreaterThanOrEqual(0);
253+
expect(array).toBe(receivedMessages);
254+
expect(this).toBe(objectForThis);
255+
}, objectForThis);
256+
});
257+
258+
it("should create one processing child with forEach that throws", async () => {
259+
try {
260+
receivedMessages.forEach((msg) => {
261+
createProcessChildSpan(msg.Body);
262+
throw Error("error from forEach");
263+
});
264+
} catch (err) {}
265+
const processChildSpans = memoryExporter
266+
.getFinishedSpans()
267+
.filter((s) => s.kind === SpanKind.INTERNAL);
268+
expect(processChildSpans.length).toBe(1);
193269
});
194270

195271
it.skip("should create processing child with array index access", async () => {
196272
for (let i = 0; i < receivedMessages.length; i++) {
197273
const msg = receivedMessages[i];
198274
createProcessChildSpan(msg.Body);
199275
}
200-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
276+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
201277
});
202278

203-
it.skip("should create processing child with map and forEach calls", async () => {
279+
it("should create processing child with map and forEach calls", async () => {
204280
receivedMessages
205281
.map((msg) => JSON.parse(msg.Body))
206282
.forEach((msgBody) => {
207283
createProcessChildSpan(msgBody);
208284
});
209-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
285+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
210286
});
211287

212-
it.skip("should create processing child with filter and forEach", async () => {
288+
it("should create processing child with filter and forEach", async () => {
213289
receivedMessages
214290
.filter((msg) => msg)
215291
.forEach((msgBody) => {
216292
createProcessChildSpan(msgBody);
217293
});
218-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
294+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
219295
});
220296

221297
it.skip("should create processing child with for(msg of messages)", () => {
222298
for (const msg of receivedMessages) {
223299
createProcessChildSpan(msg.Body);
224300
}
225-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
301+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
226302
});
227303

228304
it.skip("should create processing child with array.values() for loop", () => {
229305
for (const msg of receivedMessages.values()) {
230306
createProcessChildSpan(msg.Body);
231307
}
232-
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
308+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
309+
});
310+
311+
it.skip("should create processing child with array.values() for loop and awaits in process", async () => {
312+
for (const msg of receivedMessages.values()) {
313+
await new Promise((resolve) => setImmediate(resolve));
314+
createProcessChildSpan(msg.Body);
315+
}
316+
expectReceiver2ProcessWith1ChildEach(memoryExporter.getFinishedSpans());
233317
});
234318
});
235319
});

packages/plugin-aws-sdk/test/testing-utils.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import AWS from "aws-sdk";
22

33
export const mockAwsSend = (sendResult: any, data: any = undefined) => {
4-
AWS.Request.prototype.send = function (cb: (error, response) => void) {
5-
(this as AWS.Request<any, any>).on("complete", (response) => {
6-
cb(response.error, response);
7-
});
4+
AWS.Request.prototype.send = function (cb?: (error, response) => void) {
5+
if (cb) {
6+
(this as AWS.Request<any, any>).on("complete", (response) => {
7+
cb(response.error, response);
8+
});
9+
}
810
const response = {
911
...sendResult,
1012
data,

0 commit comments

Comments
 (0)