Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/instrumentation-aws-lambda/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
},
"dependencies": {
"@opentelemetry/instrumentation": "^0.203.0",
"@opentelemetry/propagation-utils": "^0.31.3",
"@opentelemetry/semantic-conventions": "^1.27.0",
"@types/aws-lambda": "8.10.150"
},
Expand Down
52 changes: 52 additions & 0 deletions packages/instrumentation-aws-lambda/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ import {
ROOT_CONTEXT,
Attributes,
} from '@opentelemetry/api';
import { pubsubPropagation } from '@opentelemetry/propagation-utils';
import {
ATTR_URL_FULL,
SEMATTRS_FAAS_EXECUTION,
SEMRESATTRS_CLOUD_ACCOUNT_ID,
SEMRESATTRS_FAAS_ID,
MESSAGINGOPERATIONVALUES_PROCESS,
SEMATTRS_MESSAGING_DESTINATION,
SEMATTRS_MESSAGING_MESSAGE_ID,
SEMATTRS_MESSAGING_OPERATION,
SEMATTRS_MESSAGING_SYSTEM,
} from '@opentelemetry/semantic-conventions';
import { ATTR_FAAS_COLDSTART } from './semconv';

Expand All @@ -52,6 +58,7 @@ import {
Callback,
Context,
Handler,
SQSRecord,
} from 'aws-lambda';

import { AwsLambdaInstrumentationConfig, EventContextExtractor } from './types';
Expand All @@ -68,6 +75,18 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
},
};

export const sqsContextGetter: TextMapGetter = {
keys(carrier): string[] {
if (carrier == null) {
return [];
}
return Object.keys(carrier);
},
get(carrier, key: string) {
return carrier?.[key]?.stringValue || carrier?.[key]?.value;
},
};

export const lambdaMaxInitInMilliseconds = 10_000;

export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstrumentationConfig> {
Expand Down Expand Up @@ -265,6 +284,39 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
}

return otelContext.with(trace.setSpan(parent, span), () => {
if (event.Records && event.Records[0].eventSource === 'aws:sqs') {
const messages = event.Records;
const queueArn = messages[0]?.eventSourceARN;
const queueName = queueArn?.split(':').pop() ?? 'unknown';

pubsubPropagation.patchMessagesArrayToStartProcessSpans({
messages,
parentContext: trace.setSpan(otelContext.active(), span),
tracer: plugin.tracer,
messageToSpanDetails: (message: SQSRecord) => ({
name: queueName,
parentContext: propagation.extract(
ROOT_CONTEXT,
message.messageAttributes || {},
sqsContextGetter
),
attributes: {
[SEMATTRS_MESSAGING_SYSTEM]: 'aws.sqs',
[SEMATTRS_MESSAGING_DESTINATION]: queueName,
[SEMATTRS_MESSAGING_MESSAGE_ID]: message.messageId,
[SEMATTRS_MESSAGING_OPERATION]:
MESSAGINGOPERATIONVALUES_PROCESS,
},
}),
});

pubsubPropagation.patchArrayForProcessSpans(
messages,
plugin.tracer,
otelContext.active()
);
}

// Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling
// the handler and see if the result is a Promise or not. In such a case, the callback is usually ignored. If
// the handler happened to both call the callback and complete a returned Promise, whichever happens first will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
import { AWSXRayPropagator } from '@opentelemetry/propagator-aws-xray';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import { AWSXRayLambdaPropagator } from '@opentelemetry/propagator-aws-xray-lambda';
import { sqsContextGetter } from '../../src/instrumentation';

const memoryExporter = new InMemorySpanExporter();

Expand Down Expand Up @@ -827,4 +828,113 @@ describe('lambda handler', () => {
);
});
});

describe('sync handler sqs propagation', () => {
it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => {
initializeHandler('lambda-test/sync.sqshandler');
const producerTraceId = '1df415edd0ad7f83e573f6504381dcec';
const producerSpanId = '83b7424a259945cb';
const sqsEvent = {
Records: [
{
messageAttributes: {
traceparent: {
stringValue: `00-${producerTraceId}-${producerSpanId}-01`,
dataType: 'String',
},
},
eventSource: 'aws:sqs',
eventSourceARN:
'arn:aws:sqs:eu-central-1:783764587482:launch-queue',
},
],
};

await lambdaRequire('lambda-test/sync').sqshandler(
sqsEvent,
ctx,
() => {}
);
const spans = memoryExporter.getFinishedSpans();

assert.strictEqual(spans.length, 2);
assert.equal(
spans[0].parentSpanContext?.traceId,
spans[1].spanContext().traceId
);
assert.equal(
spans[0].parentSpanContext?.spanId,
spans[1].spanContext().spanId
);
assert.equal(spans[0].links[0]?.context.traceId, producerTraceId);
assert.equal(spans[0].links[0].context.spanId, producerSpanId);
});
});

describe('async handler sqs propagation', () => {
it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => {
initializeHandler('lambda-test/async.sqshandler');
const producerTraceId = '1df415edd0ad7f83e573f6504381dcec';
const producerSpanId = '83b7424a259945cb';
const sqsEvent = {
Records: [
{
messageAttributes: {
traceparent: {
stringValue: `00-${producerTraceId}-${producerSpanId}-01`,
dataType: 'String',
},
},
eventSource: 'aws:sqs',
eventSourceARN:
'arn:aws:sqs:eu-central-1:783764587482:launch-queue',
},
],
};

await lambdaRequire('lambda-test/async').sqshandler(sqsEvent, ctx);
const spans = memoryExporter.getFinishedSpans();

assert.strictEqual(spans.length, 2);
assert.equal(
spans[0].parentSpanContext?.traceId,
spans[1].spanContext().traceId
);
assert.equal(
spans[0].parentSpanContext?.spanId,
spans[1].spanContext().spanId
);
assert.equal(spans[0].links[0]?.context.traceId, producerTraceId);
assert.equal(spans[0].links[0].context.spanId, producerSpanId);
});
});

describe('sqsContextGetter', () => {
it('returns the keys for a given message attributes carrier', () => {
const carrier = {
'x-amzn-trace-id': {
stringValue: 'dummy',
stringListValues: [],
binaryListValues: [],
dataType: 'String',
},
traceparent: {
stringValue: 'dummy',
stringListValues: [],
binaryListValues: [],
dataType: 'String',
},
};

const keys = sqsContextGetter.keys(carrier);
assert.deepEqual(keys, ['x-amzn-trace-id', 'traceparent']);
});

it('returns empty array for null or undefined carrier', () => {
const keysNull = sqsContextGetter.keys(null);
const keysUndefined = sqsContextGetter.keys(undefined);
assert.deepEqual(keysNull, []);
assert.deepEqual(keysUndefined, []);
});
});
});
15 changes: 11 additions & 4 deletions packages/instrumentation-aws-lambda/test/lambda-test/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@ exports.handler = async function (event, context) {
return 'ok';
};

exports.sqshandler = async function (event, context) {
event.Records.forEach(r => {});
return 'ok';
};

exports.error = async function (event, context) {
throw new Error('handler error');
}
};

exports.stringerror = async function (event, context) {
throw 'handler error';
}
};

exports.context = async function (event, context) {
return api.trace.getSpan(api.context.active()).spanContext().traceId;
};

exports.handler_return_baggage = async function (event, context) {
const [baggageEntryKey, baggageEntryValue] = api.propagation.getBaggage(api.context.active()).getAllEntries()[0];
const [baggageEntryKey, baggageEntryValue] = api.propagation
.getBaggage(api.context.active())
.getAllEntries()[0];
return `${baggageEntryKey}=${baggageEntryValue.value}`;
}
};
14 changes: 10 additions & 4 deletions packages/instrumentation-aws-lambda/test/lambda-test/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,27 @@ exports.handler = function (event, context, callback) {
callback(null, 'ok');
};

exports.sqshandler = function (event, context, callback) {
// Dummy forEach loop, to trigger sqs instrumentation
event.Records.forEach(r => {});
callback(null, 'ok');
};

exports.error = function (event, context, callback) {
throw new Error('handler error');
}
};

exports.callbackerror = function (event, context, callback) {
callback(new Error('handler error'));
}
};

exports.stringerror = function (event, context, callback) {
throw 'handler error';
}
};

exports.callbackstringerror = function (event, context, callback) {
callback('handler error');
}
};

exports.context = function (event, context, callback) {
callback(null, api.trace.getSpan(api.context.active()).spanContext().traceId);
Expand Down
Loading