diff --git a/package-lock.json b/package-lock.json index 4f110e65d1..bd2ae284ea 100644 --- a/package-lock.json +++ b/package-lock.json @@ -31566,6 +31566,7 @@ "license": "Apache-2.0", "dependencies": { "@opentelemetry/instrumentation": "^0.203.0", + "@opentelemetry/propagation-utils": "^0.31.3", "@opentelemetry/semantic-conventions": "^1.27.0", "@types/aws-lambda": "8.10.150" }, @@ -41549,6 +41550,7 @@ "@opentelemetry/api": "^1.3.0", "@opentelemetry/core": "^2.0.0", "@opentelemetry/instrumentation": "^0.203.0", + "@opentelemetry/propagation-utils": "^0.31.3", "@opentelemetry/propagator-aws-xray": "^2.1.0", "@opentelemetry/propagator-aws-xray-lambda": "^0.55.0", "@opentelemetry/sdk-metrics": "^2.0.0", diff --git a/packages/instrumentation-aws-lambda/package.json b/packages/instrumentation-aws-lambda/package.json index 908e795aae..caba2375a5 100644 --- a/packages/instrumentation-aws-lambda/package.json +++ b/packages/instrumentation-aws-lambda/package.json @@ -61,6 +61,7 @@ }, "dependencies": { "@opentelemetry/instrumentation": "^0.203.0", + "@opentelemetry/propagation-utils": "^0.31.3", "@opentelemetry/semantic-conventions": "^1.27.0", "@types/aws-lambda": "8.10.150" }, diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index b67a72ca4e..0ae67bff4e 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -39,11 +39,17 @@ import { ROOT_CONTEXT, Attributes, } from '@opentelemetry/api'; +import { pubsubPropagation } from '@opentelemetry/propagation-utils'; import { ATTR_URL_FULL, SEMATTRS_FAAS_EXECUTION, SEMRESATTRS_CLOUD_ACCOUNT_ID, SEMRESATTRS_FAAS_ID, + MESSAGINGOPERATIONVALUES_PROCESS, + SEMATTRS_MESSAGING_DESTINATION, + SEMATTRS_MESSAGING_MESSAGE_ID, + SEMATTRS_MESSAGING_OPERATION, + SEMATTRS_MESSAGING_SYSTEM, } from '@opentelemetry/semantic-conventions'; import { ATTR_FAAS_COLDSTART } from './semconv'; @@ -52,6 +58,7 @@ import { Callback, Context, Handler, + SQSRecord, } from 'aws-lambda'; import { AwsLambdaInstrumentationConfig, EventContextExtractor } from './types'; @@ -68,6 +75,18 @@ const headerGetter: TextMapGetter = { }, }; +export const sqsContextGetter: TextMapGetter = { + keys(carrier): string[] { + if (carrier == null) { + return []; + } + return Object.keys(carrier); + }, + get(carrier, key: string) { + return carrier?.[key]?.stringValue || carrier?.[key]?.value; + }, +}; + export const lambdaMaxInitInMilliseconds = 10_000; export class AwsLambdaInstrumentation extends InstrumentationBase { @@ -265,6 +284,39 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { + if (event.Records && event.Records[0].eventSource === 'aws:sqs') { + const messages = event.Records; + const queueArn = messages[0]?.eventSourceARN; + const queueName = queueArn?.split(':').pop() ?? 'unknown'; + + pubsubPropagation.patchMessagesArrayToStartProcessSpans({ + messages, + parentContext: trace.setSpan(otelContext.active(), span), + tracer: plugin.tracer, + messageToSpanDetails: (message: SQSRecord) => ({ + name: queueName, + parentContext: propagation.extract( + ROOT_CONTEXT, + message.messageAttributes || {}, + sqsContextGetter + ), + attributes: { + [SEMATTRS_MESSAGING_SYSTEM]: 'aws.sqs', + [SEMATTRS_MESSAGING_DESTINATION]: queueName, + [SEMATTRS_MESSAGING_MESSAGE_ID]: message.messageId, + [SEMATTRS_MESSAGING_OPERATION]: + MESSAGINGOPERATIONVALUES_PROCESS, + }, + }), + }); + + pubsubPropagation.patchArrayForProcessSpans( + messages, + plugin.tracer, + otelContext.active() + ); + } + // Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling // the handler and see if the result is a Promise or not. In such a case, the callback is usually ignored. If // the handler happened to both call the callback and complete a returned Promise, whichever happens first will diff --git a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts index 8ec2e8614f..d89c9198e2 100644 --- a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts +++ b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts @@ -54,6 +54,7 @@ import { import { AWSXRayPropagator } from '@opentelemetry/propagator-aws-xray'; import { W3CTraceContextPropagator } from '@opentelemetry/core'; import { AWSXRayLambdaPropagator } from '@opentelemetry/propagator-aws-xray-lambda'; +import { sqsContextGetter } from '../../src/instrumentation'; const memoryExporter = new InMemorySpanExporter(); @@ -827,4 +828,113 @@ describe('lambda handler', () => { ); }); }); + + describe('sync handler sqs propagation', () => { + it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => { + initializeHandler('lambda-test/sync.sqshandler'); + const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; + const producerSpanId = '83b7424a259945cb'; + const sqsEvent = { + Records: [ + { + messageAttributes: { + traceparent: { + stringValue: `00-${producerTraceId}-${producerSpanId}-01`, + dataType: 'String', + }, + }, + eventSource: 'aws:sqs', + eventSourceARN: + 'arn:aws:sqs:eu-central-1:783764587482:launch-queue', + }, + ], + }; + + await lambdaRequire('lambda-test/sync').sqshandler( + sqsEvent, + ctx, + () => {} + ); + const spans = memoryExporter.getFinishedSpans(); + + assert.strictEqual(spans.length, 2); + assert.equal( + spans[0].parentSpanContext?.traceId, + spans[1].spanContext().traceId + ); + assert.equal( + spans[0].parentSpanContext?.spanId, + spans[1].spanContext().spanId + ); + assert.equal(spans[0].links[0]?.context.traceId, producerTraceId); + assert.equal(spans[0].links[0].context.spanId, producerSpanId); + }); + }); + + describe('async handler sqs propagation', () => { + it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => { + initializeHandler('lambda-test/async.sqshandler'); + const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; + const producerSpanId = '83b7424a259945cb'; + const sqsEvent = { + Records: [ + { + messageAttributes: { + traceparent: { + stringValue: `00-${producerTraceId}-${producerSpanId}-01`, + dataType: 'String', + }, + }, + eventSource: 'aws:sqs', + eventSourceARN: + 'arn:aws:sqs:eu-central-1:783764587482:launch-queue', + }, + ], + }; + + await lambdaRequire('lambda-test/async').sqshandler(sqsEvent, ctx); + const spans = memoryExporter.getFinishedSpans(); + + assert.strictEqual(spans.length, 2); + assert.equal( + spans[0].parentSpanContext?.traceId, + spans[1].spanContext().traceId + ); + assert.equal( + spans[0].parentSpanContext?.spanId, + spans[1].spanContext().spanId + ); + assert.equal(spans[0].links[0]?.context.traceId, producerTraceId); + assert.equal(spans[0].links[0].context.spanId, producerSpanId); + }); + }); + + describe('sqsContextGetter', () => { + it('returns the keys for a given message attributes carrier', () => { + const carrier = { + 'x-amzn-trace-id': { + stringValue: 'dummy', + stringListValues: [], + binaryListValues: [], + dataType: 'String', + }, + traceparent: { + stringValue: 'dummy', + stringListValues: [], + binaryListValues: [], + dataType: 'String', + }, + }; + + const keys = sqsContextGetter.keys(carrier); + assert.deepEqual(keys, ['x-amzn-trace-id', 'traceparent']); + }); + + it('returns empty array for null or undefined carrier', () => { + const keysNull = sqsContextGetter.keys(null); + const keysUndefined = sqsContextGetter.keys(undefined); + assert.deepEqual(keysNull, []); + assert.deepEqual(keysUndefined, []); + }); + }); }); diff --git a/packages/instrumentation-aws-lambda/test/lambda-test/async.js b/packages/instrumentation-aws-lambda/test/lambda-test/async.js index c6c2e529cf..9e84459170 100644 --- a/packages/instrumentation-aws-lambda/test/lambda-test/async.js +++ b/packages/instrumentation-aws-lambda/test/lambda-test/async.js @@ -19,19 +19,26 @@ exports.handler = async function (event, context) { return 'ok'; }; +exports.sqshandler = async function (event, context) { + event.Records.forEach(r => {}); + return 'ok'; +}; + exports.error = async function (event, context) { throw new Error('handler error'); -} +}; exports.stringerror = async function (event, context) { throw 'handler error'; -} +}; exports.context = async function (event, context) { return api.trace.getSpan(api.context.active()).spanContext().traceId; }; exports.handler_return_baggage = async function (event, context) { - const [baggageEntryKey, baggageEntryValue] = api.propagation.getBaggage(api.context.active()).getAllEntries()[0]; + const [baggageEntryKey, baggageEntryValue] = api.propagation + .getBaggage(api.context.active()) + .getAllEntries()[0]; return `${baggageEntryKey}=${baggageEntryValue.value}`; -} +}; diff --git a/packages/instrumentation-aws-lambda/test/lambda-test/sync.js b/packages/instrumentation-aws-lambda/test/lambda-test/sync.js index e9f38d8ea7..a4134f6b74 100644 --- a/packages/instrumentation-aws-lambda/test/lambda-test/sync.js +++ b/packages/instrumentation-aws-lambda/test/lambda-test/sync.js @@ -19,21 +19,27 @@ exports.handler = function (event, context, callback) { callback(null, 'ok'); }; +exports.sqshandler = function (event, context, callback) { + // Dummy forEach loop, to trigger sqs instrumentation + event.Records.forEach(r => {}); + callback(null, 'ok'); +}; + exports.error = function (event, context, callback) { throw new Error('handler error'); -} +}; exports.callbackerror = function (event, context, callback) { callback(new Error('handler error')); -} +}; exports.stringerror = function (event, context, callback) { throw 'handler error'; -} +}; exports.callbackstringerror = function (event, context, callback) { callback('handler error'); -} +}; exports.context = function (event, context, callback) { callback(null, api.trace.getSpan(api.context.active()).spanContext().traceId);