Skip to content

Commit ef456cf

Browse files
committed
Propagate pubsub context when lambda event is an sqs event
1 parent cc7eff4 commit ef456cf

File tree

3 files changed

+105
-5
lines changed

3 files changed

+105
-5
lines changed

packages/instrumentation-aws-lambda/src/instrumentation.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,20 @@ import {
3939
ROOT_CONTEXT,
4040
Attributes,
4141
} from '@opentelemetry/api';
42+
import { pubsubPropagation } from '@opentelemetry/propagation-utils';
4243
import {
4344
ATTR_URL_FULL,
4445
SEMATTRS_FAAS_EXECUTION,
4546
SEMRESATTRS_CLOUD_ACCOUNT_ID,
4647
SEMRESATTRS_FAAS_ID,
48+
MESSAGINGDESTINATIONKINDVALUES_QUEUE,
49+
MESSAGINGOPERATIONVALUES_PROCESS,
50+
SEMATTRS_MESSAGING_DESTINATION,
51+
SEMATTRS_MESSAGING_DESTINATION_KIND,
52+
SEMATTRS_MESSAGING_MESSAGE_ID,
53+
SEMATTRS_MESSAGING_OPERATION,
54+
SEMATTRS_MESSAGING_SYSTEM,
55+
SEMATTRS_MESSAGING_URL,
4756
} from '@opentelemetry/semantic-conventions';
4857
import { ATTR_FAAS_COLDSTART } from './semconv';
4958

@@ -68,6 +77,15 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
6877
},
6978
};
7079

80+
const sqsContextGetter = {
81+
keys(carrier: any): string[] {
82+
return Object.keys(carrier || {});
83+
},
84+
get(carrier: any, key: string) {
85+
return carrier?.[key]?.stringValue;
86+
},
87+
};
88+
7189
export const lambdaMaxInitInMilliseconds = 10_000;
7290

7391
export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstrumentationConfig> {
@@ -265,6 +283,42 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
265283
}
266284

267285
return otelContext.with(trace.setSpan(parent, span), () => {
286+
if (event.Records) {
287+
const messages = event.Records;
288+
const queueArn = messages[0]?.eventSourceARN;
289+
const queueName = queueArn?.split(':').pop() ?? 'unknown';
290+
291+
pubsubPropagation.patchMessagesArrayToStartProcessSpans({
292+
messages,
293+
parentContext: trace.setSpan(otelContext.active(), span),
294+
tracer: plugin.tracer,
295+
messageToSpanDetails: (message: any) => ({
296+
name: queueName,
297+
parentContext: propagation.extract(
298+
ROOT_CONTEXT,
299+
message.messageAttributes || {},
300+
sqsContextGetter
301+
),
302+
attributes: {
303+
[SEMATTRS_MESSAGING_SYSTEM]: 'aws.sqs',
304+
[SEMATTRS_MESSAGING_DESTINATION]: queueName,
305+
[SEMATTRS_MESSAGING_DESTINATION_KIND]:
306+
MESSAGINGDESTINATIONKINDVALUES_QUEUE,
307+
[SEMATTRS_MESSAGING_MESSAGE_ID]: message.messageId,
308+
[SEMATTRS_MESSAGING_URL]: queueArn,
309+
[SEMATTRS_MESSAGING_OPERATION]:
310+
MESSAGINGOPERATIONVALUES_PROCESS,
311+
},
312+
}),
313+
});
314+
315+
pubsubPropagation.patchArrayForProcessSpans(
316+
messages,
317+
plugin.tracer,
318+
otelContext.active()
319+
);
320+
}
321+
268322
// Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling
269323
// the handler and see if the result is a Promise or not. In such a case, the callback is usually ignored. If
270324
// the handler happened to both call the callback and complete a returned Promise, whichever happens first will

packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import {
3030
ReadableSpan,
3131
} from '@opentelemetry/sdk-trace-base';
3232
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
33-
import { Context } from 'aws-lambda';
33+
import { APIGatewayProxyEvent, Context, SQSEvent } from 'aws-lambda';
3434
import * as assert from 'assert';
3535
import {
3636
ATTR_URL_FULL,
@@ -827,4 +827,44 @@ describe('lambda handler', () => {
827827
);
828828
});
829829
});
830+
831+
describe('sqs test', () => {
832+
it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => {
833+
initializeHandler('lambda-test/sync.sqshandler');
834+
const producerTraceId = '1df415edd0ad7f83e573f6504381dcec';
835+
const producerSpanId = '83b7424a259945cb';
836+
const event = {
837+
Records: [
838+
{
839+
messageAttributes: {
840+
traceparent: {
841+
stringValue: `00-${producerTraceId}-${producerSpanId}-01`,
842+
dataType: 'String',
843+
},
844+
},
845+
eventSource: 'aws:sqs',
846+
eventSourceARN:
847+
'arn:aws:sqs:eu-central-1:783764587482:launch-queue',
848+
},
849+
],
850+
};
851+
852+
await lambdaRequire('lambda-test/sync').sqshandler(event, ctx, () => {});
853+
const spans = memoryExporter.getFinishedSpans();
854+
855+
assert.strictEqual(spans.length, 2);
856+
857+
assert.equal(
858+
spans[0].parentSpanContext?.traceId,
859+
spans[1].spanContext().traceId
860+
);
861+
assert.equal(
862+
spans[0].parentSpanContext?.spanId,
863+
spans[1].spanContext().spanId
864+
);
865+
866+
assert.equal(spans[0].links[0]?.context.traceId, producerTraceId);
867+
assert.equal(spans[0].links[0].context.spanId, producerSpanId);
868+
});
869+
});
830870
});

packages/instrumentation-aws-lambda/test/lambda-test/sync.js

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,27 @@ exports.handler = function (event, context, callback) {
1919
callback(null, 'ok');
2020
};
2121

22+
exports.sqshandler = function (event, context, callback) {
23+
// Dummy forEach loop, to trigger sqs instrumentation
24+
event.Records.forEach(_r => {});
25+
callback(null, 'ok');
26+
};
27+
2228
exports.error = function (event, context, callback) {
2329
throw new Error('handler error');
24-
}
30+
};
2531

2632
exports.callbackerror = function (event, context, callback) {
2733
callback(new Error('handler error'));
28-
}
34+
};
2935

3036
exports.stringerror = function (event, context, callback) {
3137
throw 'handler error';
32-
}
38+
};
3339

3440
exports.callbackstringerror = function (event, context, callback) {
3541
callback('handler error');
36-
}
42+
};
3743

3844
exports.context = function (event, context, callback) {
3945
callback(null, api.trace.getSpan(api.context.active()).spanContext().traceId);

0 commit comments

Comments
 (0)