Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit e625f41

Browse files
author
Amir Blum
committed
fix(plugin-aws-sdk): set span hierarchy for sqs processing
1 parent 74651a4 commit e625f41

File tree

4 files changed

+49
-43
lines changed

4 files changed

+49
-43
lines changed

packages/plugin-aws-sdk/src/aws-sdk.ts

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,7 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
6969
return target;
7070
}
7171

72-
private _startAwsSpan(
73-
request: AWS.Request<any, any>,
74-
additionalAttributes?: Attributes
75-
): Span {
72+
private _startAwsSpan(request: AWS.Request<any, any>): Span {
7673
const operation = (request as any).operation;
7774
const service = (request as any).service;
7875

@@ -86,7 +83,6 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
8683
[AttributeNames.AWS_SERVICE_API]: service?.api?.className,
8784
[AttributeNames.AWS_SERVICE_IDENTIFIER]: service?.serviceIdentifier,
8885
[AttributeNames.AWS_SERVICE_NAME]: service?.api?.abbreviation,
89-
...additionalAttributes,
9086
},
9187
});
9288

@@ -118,11 +114,12 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
118114

119115
const responseAttributes = getResponseServiceAttributes(
120116
response,
117+
span,
121118
thisPlugin._tracer
122119
);
120+
123121
span.setAttributes({
124122
[AttributeNames.AWS_REQUEST_ID]: response.requestId,
125-
...responseAttributes,
126123
});
127124
span.end();
128125
});
@@ -145,11 +142,8 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
145142
return original.apply(this, arguments);
146143
}
147144

148-
const requestMetadata = getRequestServiceAttributes(awsRequest);
149-
const span = thisPlugin._startAwsSpan(
150-
awsRequest,
151-
requestMetadata?.attributes
152-
);
145+
const span = thisPlugin._startAwsSpan(awsRequest);
146+
const requestMetadata = getRequestServiceAttributes(awsRequest, span);
153147
thisPlugin._callPreRequestHooks(span, awsRequest);
154148
thisPlugin._registerCompletedEvent(span, awsRequest);
155149

@@ -175,11 +169,8 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
175169
return original.apply(this, arguments);
176170
}
177171

178-
const requestMetadata = getRequestServiceAttributes(awsRequest);
179-
const span = thisPlugin._startAwsSpan(
180-
awsRequest,
181-
requestMetadata?.attributes
182-
);
172+
const span = thisPlugin._startAwsSpan(awsRequest);
173+
const requestMetadata = getRequestServiceAttributes(awsRequest, span);
183174
thisPlugin._callPreRequestHooks(span, awsRequest);
184175
thisPlugin._registerCompletedEvent(span, awsRequest);
185176

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import { Attributes } from "@opentelemetry/api";
1+
import { Attributes, Span } from "@opentelemetry/api";
22
import { RequestMetadata } from "./service-attributes";
33

44
export function getS3RequestSpanAttributes(
5-
request: AWS.Request<any, any>
5+
request: AWS.Request<any, any>,
6+
span: Span
67
): RequestMetadata {
78
return {
89
attributes: {},
@@ -11,7 +12,8 @@ export function getS3RequestSpanAttributes(
1112
}
1213

1314
export function getS3ResponseSpanAttributes(
14-
response: AWS.Response<any, any>
15+
response: AWS.Response<any, any>,
16+
span: Span
1517
): Attributes {
1618
return {};
1719
}

packages/plugin-aws-sdk/src/services/service-attributes.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1-
import { Attributes, Tracer } from "@opentelemetry/api";
1+
import { Attributes, Tracer, Span } from "@opentelemetry/api";
22
import { getS3RequestSpanAttributes, getS3ResponseSpanAttributes } from "./s3";
33
import {
44
getSqsRequestSpanAttributes,
55
getSqsResponseSpanAttributes,
66
} from "./sqs";
77

88
/*
9-
attributes are additional span attributes that should be added to the span.
109
isIncoming - if true, then the operation callback / promise should be bind with the operation's span
1110
*/
1211
export interface RequestMetadata {
13-
attributes: Attributes;
1412
isIncoming: boolean;
1513
}
1614

17-
type RequestAttrProcessor = (request: AWS.Request<any, any>) => RequestMetadata;
15+
type RequestAttrProcessor = (
16+
request: AWS.Request<any, any>,
17+
span: Span
18+
) => RequestMetadata;
1819
type ResponseAttrProcessor = (
1920
response: AWS.Response<any, any>,
21+
span: Span,
2022
tracer: Tracer
21-
) => Attributes;
23+
) => void;
2224

2325
class ServiceAttributes {
2426
public attributeProcessors: {
@@ -41,23 +43,29 @@ class ServiceAttributes {
4143
}
4244

4345
export function getRequestServiceAttributes(
44-
request: AWS.Request<any, any>
46+
request: AWS.Request<any, any>,
47+
span: Span
4548
): RequestMetadata {
4649
const serviceId = (request as any)?.service?.serviceIdentifier;
4750
if (serviceId) {
48-
return serviceAttributes.attributeProcessors[serviceId]?.request(request);
51+
return serviceAttributes.attributeProcessors[serviceId]?.request(
52+
request,
53+
span
54+
);
4955
}
5056
}
5157

5258
export function getResponseServiceAttributes(
5359
response: AWS.Response<any, any>,
60+
span: Span,
5461
tracer: Tracer
55-
): Attributes {
62+
): void {
5663
const serviceId = (response as any)?.request?.service?.serviceIdentifier;
5764
if (!serviceId) return;
5865

5966
return serviceAttributes.attributeProcessors[serviceId]?.response(
6067
response,
68+
span,
6169
tracer
6270
);
6371
}

packages/plugin-aws-sdk/src/services/sqs.ts

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ export enum SqsAttributeNames {
1212
}
1313

1414
export const START_SPAN_FUNCTION = Symbol(
15-
"opentelemetry.plugin.aws-sdk.start_span"
15+
"opentelemetry.plugin.aws-sdk.sqs.start_span"
1616
);
1717

1818
export const END_SPAN_FUNCTION = Symbol(
19-
"opentelemetry.plugin.aws-sdk.end_span"
19+
"opentelemetry.plugin.aws-sdk.sqs.end_span"
2020
);
2121

2222
function extractQueueUrl(request: AWS.Request<any, any>): string {
@@ -73,52 +73,58 @@ function patchArrayFunction(
7373
messages[functionName] = function (callback) {
7474
return origFunc.call(this, function (message: AWS.SQS.Message) {
7575
const messageSpan = message[START_SPAN_FUNCTION]();
76-
tracer.withSpan(messageSpan, () => callback.apply(this, arguments));
76+
const res = tracer.withSpan(messageSpan, () =>
77+
callback.apply(this, arguments)
78+
);
7779
message[END_SPAN_FUNCTION]();
80+
return res;
7881
});
7982
};
8083
}
8184

8285
export function getSqsRequestSpanAttributes(
83-
request: AWS.Request<any, any>
86+
request: AWS.Request<any, any>,
87+
span: Span
8488
): RequestMetadata {
8589
const queueUrl = extractQueueUrl(request);
8690
const queueName = extractQueueNameFromUrl(queueUrl);
8791

92+
span.setAttribute(SqsAttributeNames.MESSAGING_SYSTEM, "aws.sqs");
93+
span.setAttribute(SqsAttributeNames.MESSAGING_DESTINATIONKIND, "queue");
94+
span.setAttribute(SqsAttributeNames.MESSAGING_DESTINATION, queueName);
95+
span.setAttribute(SqsAttributeNames.MESSAGING_URL, queueUrl);
96+
8897
let isIncoming = false;
89-
const attributes = {
90-
[SqsAttributeNames.MESSAGING_SYSTEM]: "aws.sqs",
91-
[SqsAttributeNames.MESSAGING_DESTINATIONKIND]: "queue",
92-
[SqsAttributeNames.MESSAGING_DESTINATION]: queueName,
93-
[SqsAttributeNames.MESSAGING_URL]: queueUrl,
94-
};
9598

9699
const operation = (request as any)?.operation;
97100
switch (operation) {
98101
case "receiveMessage":
99102
isIncoming = true;
100-
attributes[SqsAttributeNames.MESSAGING_OPERATION] = "receive";
103+
span.setAttribute(SqsAttributeNames.MESSAGING_OPERATION, "receive");
101104
break;
102105
}
103106

104107
return {
105-
attributes,
106108
isIncoming,
107109
};
108110
}
109111

110112
export function getSqsResponseSpanAttributes(
111113
response: AWS.Response<any, any>,
114+
span: Span,
112115
tracer: Tracer
113-
): Attributes {
116+
) {
114117
const messages: AWS.SQS.Message[] = response.data.Messages;
115118
if (messages) {
116119
const queueUrl = extractQueueUrl((response as any)?.request);
117120
const queueName = extractQueueNameFromUrl(queueUrl);
118121

119122
messages.forEach((message: AWS.SQS.Message) => {
120-
message[START_SPAN_FUNCTION] = () =>
121-
startSingleMessageSpan(tracer, queueUrl, queueName, message);
123+
message[START_SPAN_FUNCTION] = () => {
124+
return tracer.withSpan(span, () =>
125+
startSingleMessageSpan(tracer, queueUrl, queueName, message)
126+
);
127+
};
122128
message[END_SPAN_FUNCTION] = () =>
123129
console.log(
124130
"open-telemetry aws-sdk plugin: end span called on sqs message which was not started"
@@ -128,5 +134,4 @@ export function getSqsResponseSpanAttributes(
128134
patchArrayFunction(messages, "forEach", tracer);
129135
patchArrayFunction(messages, "map", tracer);
130136
}
131-
return {};
132137
}

0 commit comments

Comments
 (0)