Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit b61385b

Browse files
author
Amir Blum
committed
test(opentelemetry-plugin-aws-sdk): add tests for span tree in sqs
1 parent 71c57a3 commit b61385b

File tree

4 files changed

+255
-27
lines changed

4 files changed

+255
-27
lines changed

packages/plugin-aws-sdk/src/services/sqs.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ export class SqsServiceExtension implements ServiceExtension {
116116

117117
requestPostSpanHook = (request: AWS.Request<any, any>) => {
118118
const operation = (request as any)?.operation;
119-
console.log(operation);
120119
switch (operation) {
121120
case "sendMessage":
122121
{

packages/plugin-aws-sdk/test/aws-sdk.spec.ts

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { NodeTracerProvider } from "@opentelemetry/node";
1212
import { ContextManager } from "@opentelemetry/context-base";
1313
import { AsyncHooksContextManager } from "@opentelemetry/context-async-hooks";
1414
import { AttributeNames } from "../src/enums";
15+
import { mockAwsSend } from "./testing-utils";
1516

1617
describe("plugin-aws-sdk", () => {
1718
const logger = new NoopLogger();
@@ -33,37 +34,11 @@ describe("plugin-aws-sdk", () => {
3334
};
3435

3536
const getAwsSpans = (): ReadableSpan[] => {
36-
// console.log(' spans :', memoryExporter.getFinishedSpans());
3737
return memoryExporter
3838
.getFinishedSpans()
3939
.filter((s) => s.attributes[AttributeNames.COMPONENT] === "aws-sdk");
4040
};
4141

42-
const mockAwsSend = (sendResult: any) => {
43-
AWS.Request.prototype.send = function (cb: (error, response) => void) {
44-
(this as AWS.Request<any, any>).on("complete", (response) => {
45-
cb(response.error, response);
46-
});
47-
const response = {
48-
...sendResult,
49-
request: this,
50-
};
51-
setImmediate(() => {
52-
this._events.complete.forEach((handler) => handler(response));
53-
});
54-
return response;
55-
};
56-
57-
AWS.Request.prototype.promise = function () {
58-
const response = {
59-
...sendResult,
60-
request: this,
61-
};
62-
this._events.complete.forEach((handler) => handler(response));
63-
return Promise.resolve(response);
64-
};
65-
};
66-
6742
beforeAll(() => {
6843
AWS.config.credentials = {
6944
accessKeyId: "test key id",
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
import { plugin } from "../src";
2+
import AWS, { AWSError } from "aws-sdk";
3+
import { NoopLogger } from "@opentelemetry/core";
4+
import { NodeTracerProvider } from "@opentelemetry/node";
5+
import { ContextManager } from "@opentelemetry/context-base";
6+
import { context, SpanKind } from "@opentelemetry/api";
7+
import {
8+
InMemorySpanExporter,
9+
SimpleSpanProcessor,
10+
ReadableSpan,
11+
} from "@opentelemetry/tracing";
12+
import { AsyncHooksContextManager } from "@opentelemetry/context-async-hooks";
13+
import { mockAwsSend } from "./testing-utils";
14+
import { SqsAttributeNames } from "../src/services/sqs";
15+
import { Message } from "aws-sdk/clients/sqs";
16+
17+
const logger = new NoopLogger();
18+
const provider = new NodeTracerProvider();
19+
const memoryExporter = new InMemorySpanExporter();
20+
provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter));
21+
let contextManager: ContextManager;
22+
23+
const responseMockSuccess = {
24+
requestId: "0000000000000",
25+
error: null,
26+
};
27+
28+
const expectReceiver2ProcessWithOneChildEach = (spans: ReadableSpan[]) => {
29+
const awsReceiveSpan = spans.filter(
30+
(s) => s.attributes[SqsAttributeNames.MESSAGING_OPERATION] === "receive"
31+
);
32+
expect(awsReceiveSpan.length).toBe(1);
33+
34+
const processSpans = spans.filter(
35+
(s) => s.attributes[SqsAttributeNames.MESSAGING_OPERATION] === "process"
36+
);
37+
expect(processSpans.length).toBe(2);
38+
expect(processSpans[0].parentSpanId).toStrictEqual(
39+
awsReceiveSpan[0].spanContext.spanId
40+
);
41+
expect(processSpans[1].parentSpanId).toStrictEqual(
42+
awsReceiveSpan[0].spanContext.spanId
43+
);
44+
45+
const processChildSpans = spans.filter((s) => s.kind === SpanKind.INTERNAL);
46+
expect(processChildSpans.length).toBe(2);
47+
expect(processChildSpans[0].parentSpanId).toStrictEqual(
48+
processSpans[0].spanContext.spanId
49+
);
50+
expect(processChildSpans[1].parentSpanId).toStrictEqual(
51+
processSpans[1].spanContext.spanId
52+
);
53+
};
54+
55+
describe("sqs", () => {
56+
beforeAll(() => {
57+
AWS.config.credentials = {
58+
accessKeyId: "test key id",
59+
expired: false,
60+
expireTime: null,
61+
secretAccessKey: "test acc key",
62+
sessionToken: "test token",
63+
};
64+
});
65+
66+
beforeEach(() => {
67+
contextManager = new AsyncHooksContextManager();
68+
context.setGlobalContextManager(contextManager.enable());
69+
70+
mockAwsSend(responseMockSuccess, {
71+
Messages: [
72+
{ Body: JSON.stringify({ data: "msg 1" }) },
73+
{ Body: JSON.stringify({ data: "msg 2" }) },
74+
],
75+
} as AWS.SQS.Types.ReceiveMessageResult);
76+
plugin.enable(AWS, provider, logger);
77+
});
78+
79+
afterEach(() => {
80+
memoryExporter.reset();
81+
contextManager.disable();
82+
});
83+
84+
it("should set parent context in sqs receive callback", async (done) => {
85+
const sqs = new AWS.SQS();
86+
sqs
87+
.receiveMessage({
88+
QueueUrl: "queue/url/for/unittests",
89+
})
90+
.send((err: AWSError, data: AWS.SQS.Types.ReceiveMessageResult) => {
91+
expect(err).toBeFalsy();
92+
const childSpan = provider
93+
.getTracer("default")
94+
.startSpan("child span of sqs.receiveMessage in send callback");
95+
childSpan.end();
96+
const awsReceiveSpan = memoryExporter
97+
.getFinishedSpans()
98+
.filter((s) => s.kind === SpanKind.CONSUMER);
99+
expect(awsReceiveSpan.length).toBe(1);
100+
const internalSpan = memoryExporter
101+
.getFinishedSpans()
102+
.filter((s) => s.kind === SpanKind.INTERNAL);
103+
expect(internalSpan.length).toBe(1);
104+
expect(internalSpan[0].parentSpanId).toStrictEqual(
105+
awsReceiveSpan[0].spanContext.spanId
106+
);
107+
done();
108+
});
109+
});
110+
111+
it("should set parent context in sqs receive after await", async () => {
112+
const sqs = new AWS.SQS();
113+
const res = await sqs
114+
.receiveMessage({
115+
QueueUrl: "queue/url/for/unittests",
116+
})
117+
.promise();
118+
119+
const childSpan = provider
120+
.getTracer("default")
121+
.startSpan("child span of sqs.receiveMessage after promise await");
122+
childSpan.end();
123+
const awsReceiveSpan = memoryExporter
124+
.getFinishedSpans()
125+
.filter((s) => s.kind === SpanKind.CONSUMER);
126+
expect(awsReceiveSpan.length).toBe(1);
127+
const internalSpan = memoryExporter
128+
.getFinishedSpans()
129+
.filter((s) => s.kind === SpanKind.INTERNAL);
130+
expect(internalSpan.length).toBe(1);
131+
expect(internalSpan[0].parentSpanId).toStrictEqual(
132+
awsReceiveSpan[0].spanContext.spanId
133+
);
134+
});
135+
136+
describe("process spans", () => {
137+
let receivedMessages: Message[];
138+
139+
beforeEach(async () => {
140+
const sqs = new AWS.SQS();
141+
const res = await sqs
142+
.receiveMessage({
143+
QueueUrl: "queue/url/for/unittests",
144+
})
145+
.promise();
146+
receivedMessages = res.Messages;
147+
});
148+
149+
it("should create processing child with forEach", async () => {
150+
receivedMessages.forEach((msg) => {
151+
const processChildSpan = provider
152+
.getTracer("default")
153+
.startSpan(`child span of sqs processing span of msg ${msg.Body}`);
154+
processChildSpan.end();
155+
});
156+
157+
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
158+
});
159+
160+
it("should create processing child with map", async () => {
161+
receivedMessages.map((msg) => {
162+
const processChildSpan = provider
163+
.getTracer("default")
164+
.startSpan(`child span of sqs processing span of msg ${msg.Body}`);
165+
processChildSpan.end();
166+
});
167+
168+
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
169+
});
170+
171+
it.skip("should create processing child with array index access", async () => {
172+
for (let i = 0; i < receivedMessages.length; i++) {
173+
const msg = receivedMessages[i];
174+
const processChildSpan = provider
175+
.getTracer("default")
176+
.startSpan(`child span of sqs processing span of msg ${msg.Body}`);
177+
processChildSpan.end();
178+
}
179+
180+
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
181+
});
182+
183+
it.skip("should create processing child with map and forEach calls", async () => {
184+
receivedMessages
185+
.map((msg) => JSON.parse(msg.Body))
186+
.forEach((msgBody) => {
187+
const processChildSpan = provider
188+
.getTracer("default")
189+
.startSpan(`child span of sqs processing span of msg ${msgBody}`);
190+
processChildSpan.end();
191+
});
192+
193+
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
194+
});
195+
196+
it.skip("should create processing child with filter and forEach", async () => {
197+
receivedMessages
198+
.filter((msg) => msg)
199+
.forEach((msgBody) => {
200+
const processChildSpan = provider
201+
.getTracer("default")
202+
.startSpan(`child span of sqs processing span of msg ${msgBody}`);
203+
processChildSpan.end();
204+
});
205+
206+
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
207+
});
208+
209+
it.skip("should create processing child with for(msg of messages)", () => {
210+
for (const msg of receivedMessages) {
211+
const processChildSpan = provider
212+
.getTracer("default")
213+
.startSpan(`child span of sqs processing span of msg ${msg.Body}`);
214+
processChildSpan.end();
215+
}
216+
217+
expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans());
218+
});
219+
});
220+
});
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import AWS from "aws-sdk";
2+
3+
export const mockAwsSend = (sendResult: any, data: any = undefined) => {
4+
AWS.Request.prototype.send = function (cb: (error, response) => void) {
5+
(this as AWS.Request<any, any>).on("complete", (response) => {
6+
cb(response.error, response);
7+
});
8+
const response = {
9+
...sendResult,
10+
data,
11+
request: this,
12+
};
13+
setImmediate(() => {
14+
this._events.complete.forEach((handler) => handler(response));
15+
});
16+
return response;
17+
};
18+
19+
AWS.Request.prototype.promise = function () {
20+
const response = {
21+
...sendResult,
22+
data,
23+
request: this,
24+
};
25+
setImmediate(() => {
26+
this._events.complete.forEach((handler) => handler(response));
27+
});
28+
return new Promise((resolve) =>
29+
setImmediate(() => {
30+
resolve(data);
31+
})
32+
);
33+
};
34+
};

0 commit comments

Comments
 (0)