|
| 1 | +import { plugin } from "../src"; |
| 2 | +import AWS, { AWSError } from "aws-sdk"; |
| 3 | +import { NoopLogger } from "@opentelemetry/core"; |
| 4 | +import { NodeTracerProvider } from "@opentelemetry/node"; |
| 5 | +import { ContextManager } from "@opentelemetry/context-base"; |
| 6 | +import { context, SpanKind } from "@opentelemetry/api"; |
| 7 | +import { |
| 8 | + InMemorySpanExporter, |
| 9 | + SimpleSpanProcessor, |
| 10 | + ReadableSpan, |
| 11 | +} from "@opentelemetry/tracing"; |
| 12 | +import { AsyncHooksContextManager } from "@opentelemetry/context-async-hooks"; |
| 13 | +import { mockAwsSend } from "./testing-utils"; |
| 14 | +import { SqsAttributeNames } from "../src/services/sqs"; |
| 15 | +import { Message } from "aws-sdk/clients/sqs"; |
| 16 | + |
| 17 | +const logger = new NoopLogger(); |
| 18 | +const provider = new NodeTracerProvider(); |
| 19 | +const memoryExporter = new InMemorySpanExporter(); |
| 20 | +provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter)); |
| 21 | +let contextManager: ContextManager; |
| 22 | + |
| 23 | +const responseMockSuccess = { |
| 24 | + requestId: "0000000000000", |
| 25 | + error: null, |
| 26 | +}; |
| 27 | + |
| 28 | +describe("sqs", () => { |
| 29 | + beforeAll(() => { |
| 30 | + AWS.config.credentials = { |
| 31 | + accessKeyId: "test key id", |
| 32 | + expired: false, |
| 33 | + expireTime: null, |
| 34 | + secretAccessKey: "test acc key", |
| 35 | + sessionToken: "test token", |
| 36 | + }; |
| 37 | + }); |
| 38 | + |
| 39 | + beforeEach(() => { |
| 40 | + contextManager = new AsyncHooksContextManager(); |
| 41 | + context.setGlobalContextManager(contextManager.enable()); |
| 42 | + |
| 43 | + mockAwsSend(responseMockSuccess, { |
| 44 | + Messages: [ |
| 45 | + { Body: JSON.stringify({ data: "msg 1" }) }, |
| 46 | + { Body: JSON.stringify({ data: "msg 2" }) }, |
| 47 | + ], |
| 48 | + } as AWS.SQS.Types.ReceiveMessageResult); |
| 49 | + plugin.enable(AWS, provider, logger); |
| 50 | + }); |
| 51 | + |
| 52 | + afterEach(() => { |
| 53 | + memoryExporter.reset(); |
| 54 | + contextManager.disable(); |
| 55 | + }); |
| 56 | + |
| 57 | + describe("receive context", () => { |
| 58 | + const createReceiveChildSpan = () => { |
| 59 | + const childSpan = provider |
| 60 | + .getTracer("default") |
| 61 | + .startSpan("child span of sqs.receiveMessage"); |
| 62 | + childSpan.end(); |
| 63 | + }; |
| 64 | + |
| 65 | + const expectReceiverWithChildSpan = (spans: ReadableSpan[]) => { |
| 66 | + const awsReceiveSpan = spans.filter((s) => s.kind === SpanKind.CONSUMER); |
| 67 | + expect(awsReceiveSpan.length).toBe(1); |
| 68 | + const internalSpan = spans.filter((s) => s.kind === SpanKind.INTERNAL); |
| 69 | + expect(internalSpan.length).toBe(1); |
| 70 | + expect(internalSpan[0].parentSpanId).toStrictEqual( |
| 71 | + awsReceiveSpan[0].spanContext.spanId |
| 72 | + ); |
| 73 | + }; |
| 74 | + |
| 75 | + it("should set parent context in sqs receive callback", async (done) => { |
| 76 | + const sqs = new AWS.SQS(); |
| 77 | + sqs |
| 78 | + .receiveMessage({ |
| 79 | + QueueUrl: "queue/url/for/unittests", |
| 80 | + }) |
| 81 | + .send((err: AWSError, data: AWS.SQS.Types.ReceiveMessageResult) => { |
| 82 | + expect(err).toBeFalsy(); |
| 83 | + createReceiveChildSpan(); |
| 84 | + expectReceiverWithChildSpan(memoryExporter.getFinishedSpans()); |
| 85 | + done(); |
| 86 | + }); |
| 87 | + }); |
| 88 | + |
| 89 | + it("should set parent context in sqs receive promise then", async () => { |
| 90 | + const sqs = new AWS.SQS(); |
| 91 | + const res = await sqs |
| 92 | + .receiveMessage({ |
| 93 | + QueueUrl: "queue/url/for/unittests", |
| 94 | + }) |
| 95 | + .promise() |
| 96 | + .then(() => { |
| 97 | + createReceiveChildSpan(); |
| 98 | + expectReceiverWithChildSpan(memoryExporter.getFinishedSpans()); |
| 99 | + }); |
| 100 | + }); |
| 101 | + |
| 102 | + it.skip("should set parent context in sqs receive after await", async () => { |
| 103 | + const sqs = new AWS.SQS(); |
| 104 | + await sqs |
| 105 | + .receiveMessage({ |
| 106 | + QueueUrl: "queue/url/for/unittests", |
| 107 | + }) |
| 108 | + .promise(); |
| 109 | + |
| 110 | + createReceiveChildSpan(); |
| 111 | + expectReceiverWithChildSpan(memoryExporter.getFinishedSpans()); |
| 112 | + }); |
| 113 | + |
| 114 | + it.skip("should set parent context in sqs receive from async function", async () => { |
| 115 | + const asycnReceive = async () => { |
| 116 | + try { |
| 117 | + const sqs = new AWS.SQS(); |
| 118 | + return await sqs |
| 119 | + .receiveMessage({ |
| 120 | + QueueUrl: "queue/url/for/unittests", |
| 121 | + }) |
| 122 | + .promise(); |
| 123 | + } catch (err) {} |
| 124 | + }; |
| 125 | + |
| 126 | + const res = await asycnReceive(); |
| 127 | + createReceiveChildSpan(); |
| 128 | + expectReceiverWithChildSpan(memoryExporter.getFinishedSpans()); |
| 129 | + }); |
| 130 | + }); |
| 131 | + |
| 132 | + describe("process spans", () => { |
| 133 | + let receivedMessages: Message[]; |
| 134 | + |
| 135 | + const createProcessChildSpan = (msgContext: any) => { |
| 136 | + const processChildSpan = provider |
| 137 | + .getTracer("default") |
| 138 | + .startSpan(`child span of sqs processing span of msg ${msgContext}`); |
| 139 | + processChildSpan.end(); |
| 140 | + }; |
| 141 | + |
| 142 | + const expectReceiver2ProcessWithOneChildEach = (spans: ReadableSpan[]) => { |
| 143 | + const awsReceiveSpan = spans.filter( |
| 144 | + (s) => s.attributes[SqsAttributeNames.MESSAGING_OPERATION] === "receive" |
| 145 | + ); |
| 146 | + expect(awsReceiveSpan.length).toBe(1); |
| 147 | + |
| 148 | + const processSpans = spans.filter( |
| 149 | + (s) => s.attributes[SqsAttributeNames.MESSAGING_OPERATION] === "process" |
| 150 | + ); |
| 151 | + expect(processSpans.length).toBe(2); |
| 152 | + expect(processSpans[0].parentSpanId).toStrictEqual( |
| 153 | + awsReceiveSpan[0].spanContext.spanId |
| 154 | + ); |
| 155 | + expect(processSpans[1].parentSpanId).toStrictEqual( |
| 156 | + awsReceiveSpan[0].spanContext.spanId |
| 157 | + ); |
| 158 | + |
| 159 | + const processChildSpans = spans.filter( |
| 160 | + (s) => s.kind === SpanKind.INTERNAL |
| 161 | + ); |
| 162 | + expect(processChildSpans.length).toBe(2); |
| 163 | + expect(processChildSpans[0].parentSpanId).toStrictEqual( |
| 164 | + processSpans[0].spanContext.spanId |
| 165 | + ); |
| 166 | + expect(processChildSpans[1].parentSpanId).toStrictEqual( |
| 167 | + processSpans[1].spanContext.spanId |
| 168 | + ); |
| 169 | + }; |
| 170 | + |
| 171 | + beforeEach(async () => { |
| 172 | + const sqs = new AWS.SQS(); |
| 173 | + const res = await sqs |
| 174 | + .receiveMessage({ |
| 175 | + QueueUrl: "queue/url/for/unittests", |
| 176 | + }) |
| 177 | + .promise(); |
| 178 | + receivedMessages = res.Messages; |
| 179 | + }); |
| 180 | + |
| 181 | + it("should create processing child with forEach", async () => { |
| 182 | + receivedMessages.forEach((msg) => { |
| 183 | + createProcessChildSpan(msg.Body); |
| 184 | + }); |
| 185 | + expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans()); |
| 186 | + }); |
| 187 | + |
| 188 | + it("should create processing child with map", async () => { |
| 189 | + receivedMessages.map((msg) => { |
| 190 | + createProcessChildSpan(msg.Body); |
| 191 | + }); |
| 192 | + expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans()); |
| 193 | + }); |
| 194 | + |
| 195 | + it.skip("should create processing child with array index access", async () => { |
| 196 | + for (let i = 0; i < receivedMessages.length; i++) { |
| 197 | + const msg = receivedMessages[i]; |
| 198 | + createProcessChildSpan(msg.Body); |
| 199 | + } |
| 200 | + expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans()); |
| 201 | + }); |
| 202 | + |
| 203 | + it.skip("should create processing child with map and forEach calls", async () => { |
| 204 | + receivedMessages |
| 205 | + .map((msg) => JSON.parse(msg.Body)) |
| 206 | + .forEach((msgBody) => { |
| 207 | + createProcessChildSpan(msgBody); |
| 208 | + }); |
| 209 | + expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans()); |
| 210 | + }); |
| 211 | + |
| 212 | + it.skip("should create processing child with filter and forEach", async () => { |
| 213 | + receivedMessages |
| 214 | + .filter((msg) => msg) |
| 215 | + .forEach((msgBody) => { |
| 216 | + createProcessChildSpan(msgBody); |
| 217 | + }); |
| 218 | + expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans()); |
| 219 | + }); |
| 220 | + |
| 221 | + it.skip("should create processing child with for(msg of messages)", () => { |
| 222 | + for (const msg of receivedMessages) { |
| 223 | + createProcessChildSpan(msg.Body); |
| 224 | + } |
| 225 | + expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans()); |
| 226 | + }); |
| 227 | + |
| 228 | + it.skip("should create processing child with array.values() for loop", () => { |
| 229 | + for (const msg of receivedMessages.values()) { |
| 230 | + createProcessChildSpan(msg.Body); |
| 231 | + } |
| 232 | + expectReceiver2ProcessWithOneChildEach(memoryExporter.getFinishedSpans()); |
| 233 | + }); |
| 234 | + }); |
| 235 | +}); |
0 commit comments