Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit cfbbaa3

Browse files
author
Amir Blum
committed
feat: sqs context propagation
1 parent 865def1 commit cfbbaa3

File tree

1 file changed

+67
-9
lines changed
  • packages/plugin-aws-sdk/src/services

1 file changed

+67
-9
lines changed

packages/plugin-aws-sdk/src/services/sqs.ts

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1-
import { Tracer, SpanKind, Span } from "@opentelemetry/api";
1+
import {
2+
Tracer,
3+
SpanKind,
4+
Span,
5+
propagation,
6+
Context,
7+
Link,
8+
} from "@opentelemetry/api";
29
import { RequestMetadata, ServiceExtension } from "./ServiceExtension";
310
import * as AWS from "aws-sdk";
11+
import { getExtractedSpanContext } from "@opentelemetry/core";
412

513
export enum SqsAttributeNames {
614
// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md
@@ -20,6 +28,24 @@ export const END_SPAN_FUNCTION = Symbol(
2028
"opentelemetry.plugin.aws-sdk.sqs.end_span"
2129
);
2230

31+
const contextSetterFunc = (
32+
messageAttributes: AWS.SQS.MessageBodyAttributeMap,
33+
key: string,
34+
value: unknown
35+
) => {
36+
messageAttributes[key] = {
37+
DataType: "String",
38+
StringValue: value as string,
39+
};
40+
};
41+
42+
const contextGetterFunc = (
43+
messageAttributes: AWS.SQS.MessageBodyAttributeMap,
44+
key: string
45+
) => {
46+
return messageAttributes[key]?.StringValue;
47+
};
48+
2349
export class SqsServiceExtension implements ServiceExtension {
2450
tracer: Tracer;
2551

@@ -45,16 +71,30 @@ export class SqsServiceExtension implements ServiceExtension {
4571
const operation = (request as any)?.operation;
4672
switch (operation) {
4773
case "receiveMessage":
48-
isIncoming = true;
49-
spanKind = SpanKind.CONSUMER;
50-
spanName = queueName;
51-
spanAttributes[SqsAttributeNames.MESSAGING_OPERATION] = "receive";
74+
{
75+
isIncoming = true;
76+
spanKind = SpanKind.CONSUMER;
77+
spanName = queueName;
78+
spanAttributes[SqsAttributeNames.MESSAGING_OPERATION] = "receive";
79+
80+
const params: Record<string, any> = (request as any).params;
81+
const attributesNames = params.MessageAttributeNames || [];
82+
attributesNames.push("traceparent");
83+
params.MessageAttributeNames = attributesNames;
84+
}
5285
break;
5386

5487
case "sendMessage":
5588
case "sendMessageBatch":
56-
spanKind = SpanKind.PRODUCER;
57-
spanName = queueName;
89+
{
90+
spanKind = SpanKind.PRODUCER;
91+
spanName = queueName;
92+
93+
const params: Record<string, any> = (request as any).params;
94+
const attributes = params.MessageAttributes || {};
95+
propagation.inject(attributes, contextSetterFunc);
96+
params.MessageAttributes = attributes;
97+
}
5898
break;
5999
}
60100

@@ -73,9 +113,18 @@ export class SqsServiceExtension implements ServiceExtension {
73113
const queueName = this.extractQueueNameFromUrl(queueUrl);
74114

75115
messages.forEach((message: AWS.SQS.Message) => {
116+
const parentContext: Context = propagation.extract(
117+
message.MessageAttributes,
118+
contextGetterFunc
119+
);
76120
message[START_SPAN_FUNCTION] = () => {
77121
return this.tracer.withSpan(span, () =>
78-
this.startSingleMessageSpan(queueUrl, queueName, message)
122+
this.startSingleMessageSpan(
123+
queueUrl,
124+
queueName,
125+
message,
126+
parentContext
127+
)
79128
);
80129
};
81130
message[END_SPAN_FUNCTION] = () =>
@@ -105,8 +154,16 @@ export class SqsServiceExtension implements ServiceExtension {
105154
startSingleMessageSpan(
106155
queueUrl: string,
107156
queueName: string,
108-
message: AWS.SQS.Message
157+
message: AWS.SQS.Message,
158+
propagtedContext: Context
109159
): Span {
160+
const links: Link[] = [];
161+
if (propagtedContext) {
162+
links.push({
163+
context: getExtractedSpanContext(propagtedContext),
164+
} as Link);
165+
}
166+
110167
const messageSpan = this.tracer.startSpan(queueName, {
111168
kind: SpanKind.CONSUMER,
112169
attributes: {
@@ -117,6 +174,7 @@ export class SqsServiceExtension implements ServiceExtension {
117174
[SqsAttributeNames.MESSAGING_URL]: queueUrl,
118175
[SqsAttributeNames.MESSAGING_OPERATION]: "process",
119176
},
177+
links,
120178
});
121179

122180
message[START_SPAN_FUNCTION] = () =>

0 commit comments

Comments
 (0)