Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import * as Sentry from '@sentry/aws-serverless';

export const handler = awslambda.streamifyResponse(async (event, responseStream, context) => {
Sentry.startSpan({ name: 'manual-span', op: 'test' }, async () => {
responseStream.write('Hello, world!');
responseStream.end();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,52 @@ test.describe('Lambda layer', () => {
}),
);
});

test('streaming handlers work', async ({ lambdaClient }) => {
const transactionEventPromise = waitForTransaction('aws-serverless-lambda-sam', transactionEvent => {
return transactionEvent?.transaction === 'LayerStreaming';
});

await lambdaClient.send(
new InvokeCommand({
FunctionName: 'LayerStreaming',
Payload: JSON.stringify({}),
}),
);

const transactionEvent = await transactionEventPromise;

expect(transactionEvent.transaction).toEqual('LayerStreaming');
expect(transactionEvent.contexts?.trace).toEqual({
data: {
'sentry.sample_rate': 1,
'sentry.source': 'custom',
'sentry.origin': 'auto.otel.aws-lambda',
'sentry.op': 'function.aws.lambda',
'cloud.account.id': '012345678912',
'faas.execution': expect.any(String),
'faas.id': 'arn:aws:lambda:us-east-1:012345678912:function:LayerStreaming',
'faas.coldstart': true,
'otel.kind': 'SERVER',
},
op: 'function.aws.lambda',
origin: 'auto.otel.aws-lambda',
span_id: expect.stringMatching(/[a-f0-9]{16}/),
status: 'ok',
trace_id: expect.stringMatching(/[a-f0-9]{32}/),
});

expect(transactionEvent.spans).toHaveLength(1);

expect(transactionEvent.spans).toContainEqual(
expect.objectContaining({
data: expect.objectContaining({
'sentry.op': 'test',
'sentry.origin': 'manual',
}),
description: 'manual-span',
op: 'test',
}),
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// - Added Sentry `wrapHandler` around the OTel patch handler.
// - Cancel init when handler string is invalid (TS)
// - Hardcoded package version and name
// - Added support for streaming handlers
/* eslint-disable */
/*
* Copyright The OpenTelemetry Authors
Expand Down Expand Up @@ -50,7 +51,7 @@ import {
SEMRESATTRS_CLOUD_ACCOUNT_ID,
SEMRESATTRS_FAAS_ID,
} from '@opentelemetry/semantic-conventions';
import type { APIGatewayProxyEventHeaders, Callback, Context, Handler } from 'aws-lambda';
import type { APIGatewayProxyEventHeaders, Callback, Context, Handler, StreamifyHandler } from 'aws-lambda';
import * as fs from 'fs';
import * as path from 'path';
import type { LambdaModule } from './internal-types';
Expand All @@ -73,6 +74,9 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
};

export const lambdaMaxInitInMilliseconds = 10_000;
const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming');
const AWS_HANDLER_HIGHWATERMARK_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming.highWaterMark');
const AWS_HANDLER_STREAMING_RESPONSE = 'response';

/**
*
Expand Down Expand Up @@ -101,6 +105,21 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
return [];
}

// Provide a temporary awslambda polyfill for CommonJS modules during loading
// This prevents ReferenceError when modules use awslambda.streamifyResponse at load time
// taken from https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/main/src/UserFunction.js#L205C7-L211C9
if (typeof globalThis.awslambda === 'undefined') {
(globalThis as any).awslambda = {
streamifyResponse: (handler: any, options: any) => {
handler[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE;
if (typeof options?.highWaterMark === 'number') {
handler[AWS_HANDLER_HIGHWATERMARK_SYMBOL] = parseInt(options.highWaterMark);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Redundant parseInt() Causes Data Loss

The parseInt() call for options.highWaterMark in the awslambda polyfill is redundant as the value is already a number. This can cause data loss by truncating decimal values if highWaterMark is a float.

Fix in Cursor Fix in Web

}
return handler;
},
};
}

const handler = path.basename(handlerDef);
const moduleRoot = handlerDef.substring(0, handlerDef.length - handler.length);

Expand Down Expand Up @@ -187,16 +206,33 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
/**
*
*/
private _getHandler(handlerLoadStartTime: number) {
return (original: Handler) => {
return wrapHandler(this._getPatchHandler(original, handlerLoadStartTime));
private _getHandler<T extends Handler | StreamifyHandler>(handlerLoadStartTime: number) {
return (original: T): T => {
if (this._isStreamingHandler(original)) {
const patchedHandler = this._getPatchHandler(original, handlerLoadStartTime);

// Streaming handlers have special symbols that we need to copy over to the patched handler.
(patchedHandler as unknown as Record<symbol, unknown>)[AWS_HANDLER_STREAMING_SYMBOL] = (
original as unknown as Record<symbol, unknown>
)[AWS_HANDLER_STREAMING_SYMBOL];
(patchedHandler as unknown as Record<symbol, unknown>)[AWS_HANDLER_HIGHWATERMARK_SYMBOL] = (
original as unknown as Record<symbol, unknown>
)[AWS_HANDLER_HIGHWATERMARK_SYMBOL];

return wrapHandler(patchedHandler) as T;
}

return wrapHandler(this._getPatchHandler(original, handlerLoadStartTime)) as T;
};
}

private _getPatchHandler(original: Handler, lambdaStartTime: number): Handler;
private _getPatchHandler(original: StreamifyHandler, lambdaStartTime: number): StreamifyHandler;

/**
*
*/
private _getPatchHandler(original: Handler, lambdaStartTime: number) {
private _getPatchHandler(original: Handler | StreamifyHandler, lambdaStartTime: number): Handler | StreamifyHandler {
diag.debug('patch handler function');
const plugin = this;

Expand Down Expand Up @@ -229,6 +265,36 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
}
}

if (this._isStreamingHandler(original)) {
return function patchedStreamingHandler(
this: never,
// The event can be a user type, it truly is any.
event: any,
responseStream: Parameters<StreamifyHandler>[1],
context: Context,
) {
_onRequest();
const parent = plugin._determineParent(event, context);
const span = plugin._createSpanForRequest(event, context, requestIsColdStart, parent);
plugin._applyRequestHook(span, event, context);

return otelContext.with(trace.setSpan(parent, span), () => {
const maybePromise = safeExecuteInTheMiddle(
() => original.apply(this, [event, responseStream, context]),
error => {
if (error != null) {
// Exception thrown synchronously before resolving promise.
plugin._applyResponseHook(span, error);
plugin._endSpan(span, error, () => {});
}
},
) as Promise<{}> | undefined;

return plugin._handlePromiseResult(span, maybePromise);
});
};
}

return function patchedHandler(
this: never,
// The event can be a user type, it truly is any.
Expand All @@ -239,39 +305,10 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
) {
_onRequest();

const config = plugin.getConfig();
const parent = AwsLambdaInstrumentation._determineParent(
event,
context,
config.eventContextExtractor || AwsLambdaInstrumentation._defaultEventContextExtractor,
);
const parent = plugin._determineParent(event, context);

const name = context.functionName;
const span = plugin.tracer.startSpan(
name,
{
kind: SpanKind.SERVER,
attributes: {
[SEMATTRS_FAAS_EXECUTION]: context.awsRequestId,
[SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn,
[SEMRESATTRS_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(context.invokedFunctionArn),
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
...AwsLambdaInstrumentation._extractOtherEventFields(event),
},
},
parent,
);

const { requestHook } = config;
if (requestHook) {
safeExecuteInTheMiddle(
() => requestHook(span, { event, context }),
e => {
if (e) diag.error('aws-lambda instrumentation: requestHook error', e);
},
true,
);
}
const span = plugin._createSpanForRequest(event, context, requestIsColdStart, parent);
plugin._applyRequestHook(span, event, context);

return otelContext.with(trace.setSpan(parent, span), () => {
// Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling
Expand All @@ -289,23 +326,80 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
}
},
) as Promise<{}> | undefined;
if (typeof maybePromise?.then === 'function') {
return maybePromise.then(
value => {
plugin._applyResponseHook(span, null, value);
return new Promise(resolve => plugin._endSpan(span, undefined, () => resolve(value)));
},
(err: Error | string) => {
plugin._applyResponseHook(span, err);
return new Promise((resolve, reject) => plugin._endSpan(span, err, () => reject(err)));
},
);
}
return maybePromise;

return plugin._handlePromiseResult(span, maybePromise);
});
};
}

private _createSpanForRequest(event: any, context: Context, requestIsColdStart: boolean, parent: OtelContext): Span {
const name = context.functionName;
return this.tracer.startSpan(
name,
{
kind: SpanKind.SERVER,
attributes: {
[SEMATTRS_FAAS_EXECUTION]: context.awsRequestId,
[SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn,
[SEMRESATTRS_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(context.invokedFunctionArn),
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
...AwsLambdaInstrumentation._extractOtherEventFields(event),
},
},
parent,
);
}

private _applyRequestHook(span: Span, event: any, context: Context): void {
const { requestHook } = this.getConfig();
if (requestHook) {
safeExecuteInTheMiddle(
() => requestHook(span, { event, context }),
e => {
if (e) diag.error('aws-lambda instrumentation: requestHook error', e);
},
true,
);
}
}

private _handlePromiseResult(span: Span, maybePromise: Promise<{}> | undefined): Promise<{}> | undefined {
if (typeof maybePromise?.then === 'function') {
return maybePromise.then(
value => {
this._applyResponseHook(span, null, value);
return new Promise(resolve => this._endSpan(span, undefined, () => resolve(value)));
},
(err: Error | string) => {
this._applyResponseHook(span, err);
return new Promise((resolve, reject) => this._endSpan(span, err, () => reject(err)));
},
);
}

// Handle synchronous return values by ending the span and applying response hook
this._applyResponseHook(span, null, maybePromise);
this._endSpan(span, undefined, () => {});
return maybePromise;
}

private _determineParent(event: any, context: Context): OtelContext {
const config = this.getConfig();
return AwsLambdaInstrumentation._determineParent(
event,
context,
config.eventContextExtractor || AwsLambdaInstrumentation._defaultEventContextExtractor,
);
}

private _isStreamingHandler<TEvent, TResult>(
handler: Handler<TEvent, TResult> | StreamifyHandler<TEvent, TResult>,
): handler is StreamifyHandler<TEvent, TResult> {
return (
(handler as unknown as Record<symbol, unknown>)[AWS_HANDLER_STREAMING_SYMBOL] === AWS_HANDLER_STREAMING_RESPONSE
);
}

/**
*
*/
Expand Down
Loading
Loading