Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit d307f10

Browse files
author
Amir Blum
committed
fix: inject sqs context per each message in batch
1 parent c191bb2 commit d307f10

File tree

1 file changed

+39
-10
lines changed
  • packages/plugin-aws-sdk/src/services

1 file changed

+39
-10
lines changed

packages/plugin-aws-sdk/src/services/sqs.ts

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ import {
1313
getExtractedSpanContext,
1414
TRACE_PARENT_HEADER,
1515
} from "@opentelemetry/core";
16+
import {
17+
MessageBodyAttributeMap,
18+
SendMessageRequest,
19+
SendMessageBatchRequest,
20+
SendMessageBatchRequestEntry,
21+
} from "aws-sdk/clients/sqs";
1622

1723
export enum SqsAttributeNames {
1824
// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md
@@ -94,21 +100,30 @@ export class SqsServiceExtension implements ServiceExtension {
94100
break;
95101

96102
case "sendMessage":
103+
{
104+
spanKind = SpanKind.PRODUCER;
105+
spanName = queueName;
106+
107+
const params: SendMessageRequest = (request as any).params;
108+
params.MessageAttributes = this.InjectPropagationContext(
109+
params.MessageAttributes
110+
);
111+
}
112+
break;
113+
97114
case "sendMessageBatch":
98115
{
99116
spanKind = SpanKind.PRODUCER;
100117
spanName = queueName;
101118

102-
const params: Record<string, any> = (request as any).params;
103-
const attributes = params.MessageAttributes ?? {};
104-
if (Object.keys(attributes).length < SQS_MAX_MESSAGE_ATTRIBUTES) {
105-
propagation.inject(attributes, contextSetterFunc);
106-
params.MessageAttributes = attributes;
107-
} else {
108-
this.logger.warn(
109-
"OpenTelemetry aws-sdk plugin cannot set context propagation on SQS message due to maximum amount of MessageAttributes"
110-
);
111-
}
119+
const params: SendMessageBatchRequest = (request as any).params;
120+
params.Entries.forEach(
121+
(messageParams: SendMessageBatchRequestEntry) => {
122+
messageParams.MessageAttributes = this.InjectPropagationContext(
123+
messageParams.MessageAttributes
124+
);
125+
}
126+
);
112127
}
113128
break;
114129
}
@@ -220,4 +235,18 @@ export class SqsServiceExtension implements ServiceExtension {
220235
});
221236
};
222237
}
238+
239+
InjectPropagationContext(
240+
attributesMap?: MessageBodyAttributeMap
241+
): MessageBodyAttributeMap {
242+
const attributes = attributesMap ?? {};
243+
if (Object.keys(attributes).length < SQS_MAX_MESSAGE_ATTRIBUTES) {
244+
propagation.inject(attributes, contextSetterFunc);
245+
} else {
246+
this.logger.warn(
247+
"OpenTelemetry aws-sdk plugin cannot set context propagation on SQS message due to maximum amount of MessageAttributes"
248+
);
249+
}
250+
return attributes;
251+
}
223252
}

0 commit comments

Comments
 (0)