Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit 74651a4

Browse files
author
Amir Blum
committed
(plugin-aws-sdk): WIP add sqs processing spans and attributes
1 parent 3afffb9 commit 74651a4

File tree

3 files changed

+130
-13
lines changed

3 files changed

+130
-13
lines changed

packages/plugin-aws-sdk/src/aws-sdk.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,13 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
116116
span.setAttribute(AttributeNames.AWS_ERROR, response.error);
117117
}
118118

119+
const responseAttributes = getResponseServiceAttributes(
120+
response,
121+
thisPlugin._tracer
122+
);
119123
span.setAttributes({
120124
[AttributeNames.AWS_REQUEST_ID]: response.requestId,
121-
...getResponseServiceAttributes(response),
125+
...responseAttributes,
122126
});
123127
span.end();
124128
});

packages/plugin-aws-sdk/src/services/service-attributes.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Attributes } from "@opentelemetry/api";
1+
import { Attributes, Tracer } from "@opentelemetry/api";
22
import { getS3RequestSpanAttributes, getS3ResponseSpanAttributes } from "./s3";
33
import {
44
getSqsRequestSpanAttributes,
@@ -15,7 +15,10 @@ export interface RequestMetadata {
1515
}
1616

1717
type RequestAttrProcessor = (request: AWS.Request<any, any>) => RequestMetadata;
18-
type ResponseAttrProcessor = (response: AWS.Response<any, any>) => Attributes;
18+
type ResponseAttrProcessor = (
19+
response: AWS.Response<any, any>,
20+
tracer: Tracer
21+
) => Attributes;
1922

2023
class ServiceAttributes {
2124
public attributeProcessors: {
@@ -47,12 +50,16 @@ export function getRequestServiceAttributes(
4750
}
4851

4952
export function getResponseServiceAttributes(
50-
response: AWS.Response<any, any>
53+
response: AWS.Response<any, any>,
54+
tracer: Tracer
5155
): Attributes {
5256
const serviceId = (response as any)?.request?.service?.serviceIdentifier;
5357
if (!serviceId) return;
5458

55-
return serviceAttributes.attributeProcessors[serviceId]?.response(response);
59+
return serviceAttributes.attributeProcessors[serviceId]?.response(
60+
response,
61+
tracer
62+
);
5663
}
5764

5865
export const serviceAttributes = new ServiceAttributes();
Lines changed: 114 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,132 @@
1-
import { Attributes } from "@opentelemetry/api";
1+
import { Attributes, Tracer, SpanKind, Span } from "@opentelemetry/api";
22
import { RequestMetadata } from "./service-attributes";
33

4+
export enum SqsAttributeNames {
5+
// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md
6+
MESSAGING_SYSTEM = "messaging.system",
7+
MESSAGING_DESTINATION = "messaging.destination",
8+
MESSAGING_DESTINATIONKIND = "messaging.destination_kind",
9+
MESSAGING_MESSAGE_ID = "messaging.message_id",
10+
MESSAGING_OPERATION = "messaging.operation",
11+
MESSAGING_URL = "messaging.url",
12+
}
13+
14+
export const START_SPAN_FUNCTION = Symbol(
15+
"opentelemetry.plugin.aws-sdk.start_span"
16+
);
17+
18+
export const END_SPAN_FUNCTION = Symbol(
19+
"opentelemetry.plugin.aws-sdk.end_span"
20+
);
21+
22+
function extractQueueUrl(request: AWS.Request<any, any>): string {
23+
return (request as any)?.params?.QueueUrl;
24+
}
25+
26+
function extractQueueNameFromUrl(queueUrl: string): string {
27+
if (!queueUrl) return undefined;
28+
29+
const pisces = queueUrl.split("/");
30+
if (pisces.length === 0) return undefined;
31+
32+
return pisces[pisces.length - 1];
33+
}
34+
35+
function startSingleMessageSpan(
36+
tracer: Tracer,
37+
queueUrl: string,
38+
queueName: string,
39+
message: AWS.SQS.Message
40+
): Span {
41+
const messageSpan = tracer.startSpan(queueName, {
42+
kind: SpanKind.CONSUMER,
43+
attributes: {
44+
[SqsAttributeNames.MESSAGING_SYSTEM]: "aws.sqs",
45+
[SqsAttributeNames.MESSAGING_DESTINATION]: queueName,
46+
[SqsAttributeNames.MESSAGING_DESTINATIONKIND]: "queue",
47+
[SqsAttributeNames.MESSAGING_MESSAGE_ID]: message.MessageId,
48+
[SqsAttributeNames.MESSAGING_URL]: queueUrl,
49+
[SqsAttributeNames.MESSAGING_OPERATION]: "process",
50+
},
51+
});
52+
53+
message[START_SPAN_FUNCTION] = () =>
54+
console.log(
55+
"open-telemetry aws-sdk plugin: trying to start sqs processing span twice."
56+
);
57+
message[END_SPAN_FUNCTION] = () => {
58+
messageSpan.end();
59+
message[END_SPAN_FUNCTION] = () =>
60+
console.log(
61+
"open-telemetry aws-sdk plugin: trying to end sqs processing span which was already ended."
62+
);
63+
};
64+
return messageSpan;
65+
}
66+
67+
function patchArrayFunction(
68+
messages: AWS.SQS.Message[],
69+
functionName: string,
70+
tracer: Tracer
71+
) {
72+
const origFunc = messages[functionName];
73+
messages[functionName] = function (callback) {
74+
return origFunc.call(this, function (message: AWS.SQS.Message) {
75+
const messageSpan = message[START_SPAN_FUNCTION]();
76+
tracer.withSpan(messageSpan, () => callback.apply(this, arguments));
77+
message[END_SPAN_FUNCTION]();
78+
});
79+
};
80+
}
81+
482
export function getSqsRequestSpanAttributes(
583
request: AWS.Request<any, any>
684
): RequestMetadata {
85+
const queueUrl = extractQueueUrl(request);
86+
const queueName = extractQueueNameFromUrl(queueUrl);
87+
88+
let isIncoming = false;
89+
const attributes = {
90+
[SqsAttributeNames.MESSAGING_SYSTEM]: "aws.sqs",
91+
[SqsAttributeNames.MESSAGING_DESTINATIONKIND]: "queue",
92+
[SqsAttributeNames.MESSAGING_DESTINATION]: queueName,
93+
[SqsAttributeNames.MESSAGING_URL]: queueUrl,
94+
};
95+
796
const operation = (request as any)?.operation;
897
switch (operation) {
998
case "receiveMessage":
10-
return {
11-
attributes: {},
12-
isIncoming: true,
13-
};
99+
isIncoming = true;
100+
attributes[SqsAttributeNames.MESSAGING_OPERATION] = "receive";
101+
break;
14102
}
15103

16104
return {
17-
attributes: {},
18-
isIncoming: false,
105+
attributes,
106+
isIncoming,
19107
};
20108
}
21109

22110
export function getSqsResponseSpanAttributes(
23-
response: AWS.Response<any, any>
111+
response: AWS.Response<any, any>,
112+
tracer: Tracer
24113
): Attributes {
114+
const messages: AWS.SQS.Message[] = response.data.Messages;
115+
if (messages) {
116+
const queueUrl = extractQueueUrl((response as any)?.request);
117+
const queueName = extractQueueNameFromUrl(queueUrl);
118+
119+
messages.forEach((message: AWS.SQS.Message) => {
120+
message[START_SPAN_FUNCTION] = () =>
121+
startSingleMessageSpan(tracer, queueUrl, queueName, message);
122+
message[END_SPAN_FUNCTION] = () =>
123+
console.log(
124+
"open-telemetry aws-sdk plugin: end span called on sqs message which was not started"
125+
);
126+
});
127+
128+
patchArrayFunction(messages, "forEach", tracer);
129+
patchArrayFunction(messages, "map", tracer);
130+
}
25131
return {};
26132
}

0 commit comments

Comments
 (0)