Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit 392005c

Browse files
author
Amir Blum
authored
Merge pull request #17 from aspecto-io/sqs-context-propagation
feat: SQS context propagation
2 parents 865def1 + d307f10 commit 392005c

File tree

5 files changed

+126
-21
lines changed

5 files changed

+126
-21
lines changed

packages/plugin-aws-sdk/docs/sqs.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ SQS is amazon's managed message queue. Thus, it should follow the [Open Telemetr
55
The following methods are automatically enhanced:
66

77
### sendMessage / sendMessageBatch
8-
- [Message Attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes) are added by this plugin according to the spec.
9-
- TODO: Inject trace context as SQS MessageAttributes, so the service receiving the message can link cascading spans to the trace which created the message.
8+
- [Messaging Attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes) are added by this plugin according to the spec.
9+
- Open Telemetry trace context is injected as SQS MessageAttributes, so the service receiving the message can link cascading spans to the trace which created the message.
1010

1111
### receiveMessage
12-
- [Message Attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes) are added by this plugin according to the spec.
12+
- [Messaging Attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes) are added by this plugin according to the spec.
1313
- Additional "processing spans" are created for each message received by the application.
1414
If an application invoked `receiveMessage`, and received a 10 messages batch, a single `messaging.operation` = `receive` span will be created for the `receiveMessage` operation, and 10 `messaging.operation` = `process` spans will be created, one for each message.
1515
Those processing spans are created by the library. This behavior is partially implemented, [See discussion below](#processing-spans).
1616
- Sets the inter process context correctly, so that additional spans created through the process will be linked to parent spans correctly.
1717
This behavior is partially implemented, [See discussion below](#processing-spans).
18-
- TODO: Extract trace context from SQS MessageAttributes, and set span's `parent` and `links` correctly according to the spec.
18+
- Extract trace context from SQS MessageAttributes, and set span's `parent` and `links` correctly according to the spec.
1919

2020
#### Processing Spans
2121
According to open telemetry specification (and to reasonable expectation for trace structure), user of this library would expect to see one span for the operation of receiving messages batch from SQS, and then, for each message, a span with it's own sub-tree for the processing of this specific message.

packages/plugin-aws-sdk/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
"url": "https://github.com/aspecto-io/opentelemetry-ext-js/issues"
3636
},
3737
"devDependencies": {
38-
"@opentelemetry/node": "^0.8.3",
39-
"@opentelemetry/tracing": "^0.8.3",
38+
"@opentelemetry/node": "^0.9.0",
39+
"@opentelemetry/tracing": "^0.9.0",
4040
"@types/jest": "^25.2.3",
4141
"@types/shimmer": "^1.0.1",
4242
"aws-sdk": "^2.694.0",
@@ -48,7 +48,7 @@
4848
"typescript": "^3.9.5"
4949
},
5050
"dependencies": {
51-
"@opentelemetry/core": "^0.8.3",
51+
"@opentelemetry/core": "^0.9.0",
5252
"shimmer": "^1.2.1"
5353
},
5454
"jest": {

packages/plugin-aws-sdk/src/aws-sdk.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
2929
}
3030

3131
protected patch() {
32-
this.servicesExtensions = new ServicesExtensions(this._tracer);
32+
this.servicesExtensions = new ServicesExtensions(
33+
this._tracer,
34+
this._logger
35+
);
3336

3437
this._logger.debug(
3538
"applying patch to %s@%s",

packages/plugin-aws-sdk/src/services/ServicesExtensions.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
import { Tracer, Span } from "@opentelemetry/api";
1+
import { Tracer, Span, Logger } from "@opentelemetry/api";
22
import { ServiceExtension, RequestMetadata } from "./ServiceExtension";
33
import { SqsServiceExtension } from "./sqs";
44
import * as AWS from "aws-sdk";
55

66
export class ServicesExtensions implements ServiceExtension {
77
services: Map<string, ServiceExtension> = new Map();
88

9-
constructor(tracer: Tracer) {
10-
this.services.set("sqs", new SqsServiceExtension(tracer));
9+
constructor(tracer: Tracer, logger: Logger) {
10+
this.services.set("sqs", new SqsServiceExtension(tracer, logger));
1111
}
1212

1313
requestHook(request: AWS.Request<any, any>): RequestMetadata {

packages/plugin-aws-sdk/src/services/sqs.ts

Lines changed: 112 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,24 @@
1-
import { Tracer, SpanKind, Span } from "@opentelemetry/api";
1+
import {
2+
Tracer,
3+
SpanKind,
4+
Span,
5+
propagation,
6+
Context,
7+
Link,
8+
Logger,
9+
} from "@opentelemetry/api";
210
import { RequestMetadata, ServiceExtension } from "./ServiceExtension";
311
import * as AWS from "aws-sdk";
12+
import {
13+
getExtractedSpanContext,
14+
TRACE_PARENT_HEADER,
15+
} from "@opentelemetry/core";
16+
import {
17+
MessageBodyAttributeMap,
18+
SendMessageRequest,
19+
SendMessageBatchRequest,
20+
SendMessageBatchRequestEntry,
21+
} from "aws-sdk/clients/sqs";
422

523
export enum SqsAttributeNames {
624
// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md
@@ -20,11 +38,34 @@ export const END_SPAN_FUNCTION = Symbol(
2038
"opentelemetry.plugin.aws-sdk.sqs.end_span"
2139
);
2240

41+
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html
42+
const SQS_MAX_MESSAGE_ATTRIBUTES = 10;
43+
44+
const contextSetterFunc = (
45+
messageAttributes: AWS.SQS.MessageBodyAttributeMap,
46+
key: string,
47+
value: unknown
48+
) => {
49+
messageAttributes[key] = {
50+
DataType: "String",
51+
StringValue: value as string,
52+
};
53+
};
54+
55+
const contextGetterFunc = (
56+
messageAttributes: AWS.SQS.MessageBodyAttributeMap,
57+
key: string
58+
) => {
59+
return messageAttributes?.[key]?.StringValue;
60+
};
61+
2362
export class SqsServiceExtension implements ServiceExtension {
2463
tracer: Tracer;
64+
logger: Logger;
2565

26-
constructor(tracer: Tracer) {
66+
constructor(tracer: Tracer, logger: Logger) {
2767
this.tracer = tracer;
68+
this.logger = logger;
2869
}
2970

3071
requestHook(request: AWS.Request<any, any>): RequestMetadata {
@@ -45,16 +86,45 @@ export class SqsServiceExtension implements ServiceExtension {
4586
const operation = (request as any)?.operation;
4687
switch (operation) {
4788
case "receiveMessage":
48-
isIncoming = true;
49-
spanKind = SpanKind.CONSUMER;
50-
spanName = queueName;
51-
spanAttributes[SqsAttributeNames.MESSAGING_OPERATION] = "receive";
89+
{
90+
isIncoming = true;
91+
spanKind = SpanKind.CONSUMER;
92+
spanName = queueName;
93+
spanAttributes[SqsAttributeNames.MESSAGING_OPERATION] = "receive";
94+
95+
const params: Record<string, any> = (request as any).params;
96+
const attributesNames = params.MessageAttributeNames || [];
97+
attributesNames.push(TRACE_PARENT_HEADER);
98+
params.MessageAttributeNames = attributesNames;
99+
}
52100
break;
53101

54102
case "sendMessage":
103+
{
104+
spanKind = SpanKind.PRODUCER;
105+
spanName = queueName;
106+
107+
const params: SendMessageRequest = (request as any).params;
108+
params.MessageAttributes = this.InjectPropagationContext(
109+
params.MessageAttributes
110+
);
111+
}
112+
break;
113+
55114
case "sendMessageBatch":
56-
spanKind = SpanKind.PRODUCER;
57-
spanName = queueName;
115+
{
116+
spanKind = SpanKind.PRODUCER;
117+
spanName = queueName;
118+
119+
const params: SendMessageBatchRequest = (request as any).params;
120+
params.Entries.forEach(
121+
(messageParams: SendMessageBatchRequestEntry) => {
122+
messageParams.MessageAttributes = this.InjectPropagationContext(
123+
messageParams.MessageAttributes
124+
);
125+
}
126+
);
127+
}
58128
break;
59129
}
60130

@@ -73,9 +143,18 @@ export class SqsServiceExtension implements ServiceExtension {
73143
const queueName = this.extractQueueNameFromUrl(queueUrl);
74144

75145
messages.forEach((message: AWS.SQS.Message) => {
146+
const parentContext: Context = propagation.extract(
147+
message.MessageAttributes,
148+
contextGetterFunc
149+
);
76150
message[START_SPAN_FUNCTION] = () => {
77151
return this.tracer.withSpan(span, () =>
78-
this.startSingleMessageSpan(queueUrl, queueName, message)
152+
this.startSingleMessageSpan(
153+
queueUrl,
154+
queueName,
155+
message,
156+
parentContext
157+
)
79158
);
80159
};
81160
message[END_SPAN_FUNCTION] = () =>
@@ -105,8 +184,16 @@ export class SqsServiceExtension implements ServiceExtension {
105184
startSingleMessageSpan(
106185
queueUrl: string,
107186
queueName: string,
108-
message: AWS.SQS.Message
187+
message: AWS.SQS.Message,
188+
propagtedContext: Context
109189
): Span {
190+
const links: Link[] = [];
191+
if (propagtedContext) {
192+
links.push({
193+
context: getExtractedSpanContext(propagtedContext),
194+
} as Link);
195+
}
196+
110197
const messageSpan = this.tracer.startSpan(queueName, {
111198
kind: SpanKind.CONSUMER,
112199
attributes: {
@@ -117,6 +204,7 @@ export class SqsServiceExtension implements ServiceExtension {
117204
[SqsAttributeNames.MESSAGING_URL]: queueUrl,
118205
[SqsAttributeNames.MESSAGING_OPERATION]: "process",
119206
},
207+
links,
120208
});
121209

122210
message[START_SPAN_FUNCTION] = () =>
@@ -147,4 +235,18 @@ export class SqsServiceExtension implements ServiceExtension {
147235
});
148236
};
149237
}
238+
239+
InjectPropagationContext(
240+
attributesMap?: MessageBodyAttributeMap
241+
): MessageBodyAttributeMap {
242+
const attributes = attributesMap ?? {};
243+
if (Object.keys(attributes).length < SQS_MAX_MESSAGE_ATTRIBUTES) {
244+
propagation.inject(attributes, contextSetterFunc);
245+
} else {
246+
this.logger.warn(
247+
"OpenTelemetry aws-sdk plugin cannot set context propagation on SQS message due to maximum amount of MessageAttributes"
248+
);
249+
}
250+
return attributes;
251+
}
150252
}

0 commit comments

Comments
 (0)