Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion nodejs/src/wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,72 @@ const extraLambdaConfig = awsContextPropDisabled ?
{
// enable trace chaining; FIXME should fix this in upstream?
eventContextExtractor: (event: any, context: any) => {
const eventContext = propagation.extract(otelContext.active(), context.clientContext?.Custom);
// Start with any custom client context (e.g., API Gateway via Lambda proxy)
let carrier: Record<string, string> = {};
if (context?.clientContext?.Custom && typeof context.clientContext.Custom === 'object') {
carrier = { ...carrier, ...context.clientContext.Custom };
}

const setAttr = (key: any, value: any) => {
if (typeof key === 'string' && typeof value === 'string' && value.length > 0) {
carrier[key] = value;
}
};

// Handle SQS and SNS triggered events
const records = Array.isArray(event?.Records) ? event.Records : [];
for (const rec of records) {
// SQS
const isSqs = rec?.eventSource === 'aws:sqs' || rec?.EventSource === 'aws:sqs' || (typeof rec?.eventSourceARN === 'string' && rec.eventSourceARN.includes(':sqs:'));
if (isSqs && rec?.messageAttributes && typeof rec.messageAttributes === 'object') {
for (const [k, v] of Object.entries(rec.messageAttributes)) {
const attr: any = v;
const val = typeof attr?.stringValue === 'string' ? attr.stringValue
: typeof attr?.StringValue === 'string' ? attr.StringValue
: Array.isArray(attr?.stringListValues) && attr.stringListValues.length ? attr.stringListValues[0]
: undefined;
setAttr(k, val);
}
}

// SNS
const sns = rec?.Sns;
const isSns = rec?.eventSource === 'aws:sns' || rec?.EventSource === 'aws:sns' || !!sns;
if (isSns && sns?.MessageAttributes && typeof sns.MessageAttributes === 'object') {
for (const [k, v] of Object.entries(sns.MessageAttributes)) {
const attr: any = v;
const val = typeof attr?.Value === 'string' ? attr.Value
: typeof attr?.value === 'string' ? attr.value
: undefined;
setAttr(k, val);
}
}
}

// Direct SNS invoke shape fallback
if (event?.Sns?.MessageAttributes && typeof event.Sns.MessageAttributes === 'object') {
for (const [k, v] of Object.entries(event.Sns.MessageAttributes)) {
const attr: any = v;
const val = typeof attr?.Value === 'string' ? attr.Value
: typeof attr?.value === 'string' ? attr.value
: undefined;
setAttr(k, val);
}
}

// Generic MessageAttributes fallback (best-effort)
if (event?.MessageAttributes && typeof event.MessageAttributes === 'object') {
for (const [k, v] of Object.entries(event.MessageAttributes)) {
const attr: any = v;
const val = typeof attr?.stringValue === 'string' ? attr.stringValue
: typeof attr?.StringValue === 'string' ? attr.StringValue
: typeof attr?.Value === 'string' ? attr.Value
: undefined;
setAttr(k, val);
}
}

const eventContext = propagation.extract(otelContext.active(), carrier);
return eventContext;
},
} :
Expand Down