diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/.npmrc b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/.npmrc new file mode 100644 index 000000000000..070f80f05092 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/.npmrc @@ -0,0 +1,2 @@ +@sentry:registry=http://127.0.0.1:4873 +@sentry-internal:registry=http://127.0.0.1:4873 diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/package.json b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/package.json new file mode 100644 index 000000000000..ffe4b207b8cc --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/package.json @@ -0,0 +1,27 @@ +{ + "name": "aws-serverless-streaming", + "version": "1.0.0", + "private": true, + "scripts": { + "start": "node src/run.mjs", + "test": "playwright test", + "clean": "npx rimraf node_modules pnpm-lock.yaml", + "test:build": "pnpm install", + "test:assert": "pnpm test" + }, + "dependencies": { + "@sentry/aws-serverless": "* || latest" + }, + "devDependencies": { + "@sentry-internal/test-utils": "link:../../../test-utils", + "@playwright/test": "~1.53.2" + }, + "volta": { + "extends": "../../package.json" + }, + "pnpm": { + "overrides": { + "@opentelemetry/instrumentation-aws-lambda": "file:/Users/martin/code/opentelemetry-js-contrib/packages/instrumentation-aws-lambda" + } + } +} diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/playwright.config.ts b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/playwright.config.ts new file mode 100644 index 000000000000..174593c307df --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/playwright.config.ts @@ -0,0 +1,3 @@ +import { getPlaywrightConfig } from '@sentry-internal/test-utils'; + +export default getPlaywrightConfig(); diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/lambda-function.mjs b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/lambda-function.mjs new file mode 100644 index 000000000000..957f95e26126 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/lambda-function.mjs @@ -0,0 +1,16 @@ +import * as Sentry from '@sentry/aws-serverless'; + +let handler = async (event, responseStream, context) => { + Sentry.startSpan({ name: 'streaming-span', op: 'stream' }, () => { + responseStream.write('Starting stream\n'); + }); + + responseStream.write('Continuing stream\n'); + responseStream.write('Stream completed\n'); + responseStream.end(); +}; + +handler[Symbol.for('aws.lambda.runtime.handler.streaming')] = 'response'; +handler = Sentry.wrapHandler(handler); + +export { handler }; diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/package.json b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/package.json new file mode 100644 index 000000000000..43afe1b9fe77 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/package.json @@ -0,0 +1,5 @@ +{ + "//": "This is a mock package.json file which is usually created by AWS when deploying the lambda. OTEL instrumentation tries to read this file to get the lambda version", + "name": "lambda", + "version": "1.0.0" +} diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/run-lambda.mjs b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/run-lambda.mjs new file mode 100644 index 000000000000..6d5290ea0711 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/run-lambda.mjs @@ -0,0 +1,18 @@ +import { handler } from './lambda-function.mjs'; +import { Writable } from 'node:stream'; + +const event = {}; + +const context = { + invokedFunctionArn: 'arn:aws:lambda:us-east-1:123453789012:function:my-streaming-lambda', + functionName: 'my-streaming-lambda', +}; + +const responseStream = new Writable({ + write: (chunk, encoding, callback) => { + console.log('Streamed chunk:', chunk.toString()); + callback(); + }, +}); + +await handler(event, responseStream, context); diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/run.mjs b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/run.mjs new file mode 100644 index 000000000000..2f67c14a54f7 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/src/run.mjs @@ -0,0 +1,16 @@ +import child_process from 'child_process'; + +child_process.execSync('node ./src/run-lambda.mjs', { + stdio: 'inherit', + env: { + ...process.env, + // On AWS, LAMBDA_TASK_ROOT is usually /var/task but for testing, we set it to the CWD to correctly apply our handler + LAMBDA_TASK_ROOT: process.cwd(), + _HANDLER: 'src/lambda-function.handler', + + NODE_OPTIONS: '--import @sentry/aws-serverless/awslambda-auto', + SENTRY_DSN: 'http://public@localhost:3031/1337', + SENTRY_TRACES_SAMPLE_RATE: '1.0', + }, + cwd: process.cwd(), +}); diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/start-event-proxy.mjs b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/start-event-proxy.mjs new file mode 100644 index 000000000000..1bf1428d50d1 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/start-event-proxy.mjs @@ -0,0 +1,6 @@ +import { startEventProxyServer } from '@sentry-internal/test-utils'; + +startEventProxyServer({ + port: 3031, + proxyServerName: 'aws-serverless-streaming', +}); diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/tests/basic.test.ts b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/tests/basic.test.ts new file mode 100644 index 000000000000..37ad3bee2f23 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless-streaming/tests/basic.test.ts @@ -0,0 +1,54 @@ +import * as child_process from 'child_process'; +import { expect, test } from '@playwright/test'; +import { waitForTransaction } from '@sentry-internal/test-utils'; + +test('AWS Serverless SDK sends events from streaming handler', async ({ request }) => { + const transactionEventPromise = waitForTransaction('aws-serverless-streaming', transactionEvent => { + return transactionEvent?.transaction === 'my-streaming-lambda'; + }); + + // Waiting for 1s here because attaching the listener for events in `waitForTransaction` is not synchronous + // Since in this test, we don't start a browser via playwright, we don't have the usual delays (page.goto, etc) + // which are usually enough for us to never have noticed this race condition before. + // This is a workaround but probably sufficient as long as we only experience it in this test. + await new Promise(resolve => + setTimeout(() => { + resolve(); + }, 1000), + ); + + child_process.execSync('pnpm start', { + stdio: 'inherit', + }); + + const transactionEvent = await transactionEventPromise; + + expect(transactionEvent.transaction).toEqual('my-streaming-lambda'); + expect(transactionEvent.contexts?.trace).toEqual({ + data: { + 'sentry.sample_rate': 1, + 'sentry.source': 'custom', + 'sentry.origin': 'auto.otel.aws-lambda', + 'sentry.op': 'function.aws.lambda', + 'cloud.account.id': '123453789012', + 'faas.id': 'arn:aws:lambda:us-east-1:123453789012:function:my-streaming-lambda', + 'faas.coldstart': true, + 'otel.kind': 'SERVER', + }, + op: 'function.aws.lambda', + origin: 'auto.otel.aws-lambda', + span_id: expect.stringMatching(/[a-f0-9]{16}/), + status: 'ok', + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); + + expect(transactionEvent.spans).toHaveLength(1); + + const streamingSpan = transactionEvent.spans?.[0]; + expect(streamingSpan).toMatchObject({ + description: 'streaming-span', + op: 'stream', + status: 'ok', + origin: 'manual', + }); +}); diff --git a/packages/aws-serverless/src/integration/awslambda.ts b/packages/aws-serverless/src/integration/awslambda.ts index 00bca1a9219c..6e7be9ad3c3e 100644 --- a/packages/aws-serverless/src/integration/awslambda.ts +++ b/packages/aws-serverless/src/integration/awslambda.ts @@ -1,8 +1,8 @@ import { AwsLambdaInstrumentation } from '@opentelemetry/instrumentation-aws-lambda'; import type { IntegrationFn } from '@sentry/core'; import { defineIntegration, SEMANTIC_ATTRIBUTE_SENTRY_OP, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '@sentry/core'; -import { generateInstrumentOnce } from '@sentry/node'; -import { eventContextExtractor } from '../utils'; +import { captureException, generateInstrumentOnce } from '@sentry/node'; +import { eventContextExtractor, markEventUnhandled } from '../utils'; interface AwsLambdaOptions { /** @@ -27,6 +27,11 @@ export const instrumentAwsLambda = generateInstrumentOnce( span.setAttribute(SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, 'auto.otel.aws-lambda'); span.setAttribute(SEMANTIC_ATTRIBUTE_SENTRY_OP, 'function.aws.lambda'); }, + responseHook(_span, { err }) { + if (err) { + captureException(err, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.otel')); + } + }, }; }, ); diff --git a/packages/aws-serverless/src/sdk.ts b/packages/aws-serverless/src/sdk.ts index 9bad62f3a848..de6dc11b6ac5 100644 --- a/packages/aws-serverless/src/sdk.ts +++ b/packages/aws-serverless/src/sdk.ts @@ -1,33 +1,16 @@ -import type { Integration, Options, Scope } from '@sentry/core'; -import { applySdkMetadata, consoleSandbox, debug, getSDKSource } from '@sentry/core'; +import type { Integration, Options } from '@sentry/core'; +import { applySdkMetadata, debug, getSDKSource } from '@sentry/core'; import type { NodeClient, NodeOptions } from '@sentry/node'; -import { - captureException, - captureMessage, - flush, - getCurrentScope, - getDefaultIntegrationsWithoutPerformance, - initWithoutDefaultIntegrations, - withScope, -} from '@sentry/node'; -import type { Context, Handler } from 'aws-lambda'; +import { getDefaultIntegrationsWithoutPerformance, initWithoutDefaultIntegrations } from '@sentry/node'; +import type { Handler } from 'aws-lambda'; import { existsSync } from 'fs'; import { basename, resolve } from 'path'; -import { performance } from 'perf_hooks'; -import { types } from 'util'; import { DEBUG_BUILD } from './debug-build'; import { awsIntegration } from './integration/aws'; import { awsLambdaIntegration } from './integration/awslambda'; -import { markEventUnhandled } from './utils'; +import { wrapHandler } from './wrappers/wrap-handler'; -const { isPromise } = types; - -// https://www.npmjs.com/package/aws-lambda-consumer -type SyncHandler = ( - event: Parameters[0], - context: Parameters[1], - callback: Parameters[2], -) => void; +export { wrapHandler }; export type AsyncHandler = ( event: Parameters[0], @@ -89,25 +72,6 @@ function tryRequire(taskRoot: string, subdir: string, mod: string): T { return require(require.resolve(mod, { paths: [taskRoot, subdir] })); } -/** */ -function isPromiseAllSettledResult(result: T[]): boolean { - return result.every( - v => - Object.prototype.hasOwnProperty.call(v, 'status') && - (Object.prototype.hasOwnProperty.call(v, 'value') || Object.prototype.hasOwnProperty.call(v, 'reason')), - ); -} - -type PromiseSettledResult = { status: 'rejected' | 'fulfilled'; reason?: T }; - -/** */ -function getRejectedReasons(results: PromiseSettledResult[]): T[] { - return results.reduce((rejected: T[], result) => { - if (result.status === 'rejected' && result.reason) rejected.push(result.reason); - return rejected; - }, []); -} - /** */ export function tryPatchHandler(taskRoot: string, handlerPath: string): void { type HandlerBag = HandlerModule | Handler | null | undefined; @@ -159,161 +123,3 @@ export function tryPatchHandler(taskRoot: string, handlerPath: string): void { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion (mod as HandlerModule)[functionName!] = wrapHandler(obj); } - -/** - * Tries to invoke context.getRemainingTimeInMillis if not available returns 0 - * Some environments use AWS lambda but don't support this function - * @param context - */ -function tryGetRemainingTimeInMillis(context: Context): number { - return typeof context.getRemainingTimeInMillis === 'function' ? context.getRemainingTimeInMillis() : 0; -} - -/** - * Adds additional information from the environment and AWS Context to the Sentry Scope. - * - * @param scope Scope that should be enhanced - * @param context AWS Lambda context that will be used to extract some part of the data - * @param startTime performance.now() when wrapHandler was invoked - */ -function enhanceScopeWithEnvironmentData(scope: Scope, context: Context, startTime: number): void { - scope.setContext('aws.lambda', { - aws_request_id: context.awsRequestId, - function_name: context.functionName, - function_version: context.functionVersion, - invoked_function_arn: context.invokedFunctionArn, - execution_duration_in_millis: performance.now() - startTime, - remaining_time_in_millis: tryGetRemainingTimeInMillis(context), - 'sys.argv': process.argv, - }); - - scope.setContext('aws.cloudwatch.logs', { - log_group: context.logGroupName, - log_stream: context.logStreamName, - url: `https://console.aws.amazon.com/cloudwatch/home?region=${ - process.env.AWS_REGION - }#logsV2:log-groups/log-group/${encodeURIComponent(context.logGroupName)}/log-events/${encodeURIComponent( - context.logStreamName, - )}?filterPattern="${context.awsRequestId}"`, - }); -} - -/** - * Wraps a lambda handler adding it error capture and tracing capabilities. - * - * @param handler Handler - * @param options Options - * @returns Handler - */ -export function wrapHandler( - handler: Handler, - wrapOptions: Partial = {}, -): Handler { - const START_TIME = performance.now(); - - // eslint-disable-next-line deprecation/deprecation - if (typeof wrapOptions.startTrace !== 'undefined') { - consoleSandbox(() => { - // eslint-disable-next-line no-console - console.warn( - 'The `startTrace` option is deprecated and will be removed in a future major version. If you want to disable tracing, set `SENTRY_TRACES_SAMPLE_RATE` to `0.0`.', - ); - }); - } - - const options: WrapperOptions = { - flushTimeout: 2000, - callbackWaitsForEmptyEventLoop: false, - captureTimeoutWarning: true, - timeoutWarningLimit: 500, - captureAllSettledReasons: false, - startTrace: true, // TODO(v11): Remove this option. Set to true here to satisfy the type, but has no effect. - ...wrapOptions, - }; - - let timeoutWarningTimer: NodeJS.Timeout; - - // AWSLambda is like Express. It makes a distinction about handlers based on its last argument - // async (event) => async handler - // async (event, context) => async handler - // (event, context, callback) => sync handler - // Nevertheless whatever option is chosen by user, we convert it to async handler. - const asyncHandler: AsyncHandler = - handler.length > 2 - ? (event, context) => - new Promise((resolve, reject) => { - const rv = (handler as SyncHandler)(event, context, (error, result) => { - if (error === null || error === undefined) { - resolve(result!); // eslint-disable-line @typescript-eslint/no-non-null-assertion - } else { - reject(error); - } - }) as unknown; - - // This should never happen, but still can if someone writes a handler as - // `async (event, context, callback) => {}` - if (isPromise(rv)) { - void (rv as Promise>).then(resolve, reject); - } - }) - : (handler as AsyncHandler); - - return async (event, context) => { - context.callbackWaitsForEmptyEventLoop = options.callbackWaitsForEmptyEventLoop; - - // In seconds. You cannot go any more granular than this in AWS Lambda. - const configuredTimeout = Math.ceil(tryGetRemainingTimeInMillis(context) / 1000); - const configuredTimeoutMinutes = Math.floor(configuredTimeout / 60); - const configuredTimeoutSeconds = configuredTimeout % 60; - - const humanReadableTimeout = - configuredTimeoutMinutes > 0 - ? `${configuredTimeoutMinutes}m${configuredTimeoutSeconds}s` - : `${configuredTimeoutSeconds}s`; - - // When `callbackWaitsForEmptyEventLoop` is set to false, which it should when using `captureTimeoutWarning`, - // we don't have a guarantee that this message will be delivered. Because of that, we don't flush it. - if (options.captureTimeoutWarning) { - const timeoutWarningDelay = tryGetRemainingTimeInMillis(context) - options.timeoutWarningLimit; - - timeoutWarningTimer = setTimeout(() => { - withScope(scope => { - scope.setTag('timeout', humanReadableTimeout); - captureMessage(`Possible function timeout: ${context.functionName}`, 'warning'); - }); - }, timeoutWarningDelay) as unknown as NodeJS.Timeout; - } - - async function processResult(): Promise { - const scope = getCurrentScope(); - - let rv: TResult; - try { - enhanceScopeWithEnvironmentData(scope, context, START_TIME); - - rv = await asyncHandler(event, context); - - // We manage lambdas that use Promise.allSettled by capturing the errors of failed promises - if (options.captureAllSettledReasons && Array.isArray(rv) && isPromiseAllSettledResult(rv)) { - const reasons = getRejectedReasons(rv); - reasons.forEach(exception => { - captureException(exception, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.promise')); - }); - } - } catch (e) { - captureException(e, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.handler')); - throw e; - } finally { - clearTimeout(timeoutWarningTimer); - await flush(options.flushTimeout).catch(e => { - DEBUG_BUILD && debug.error(e); - }); - } - return rv; - } - - return withScope(async () => { - return processResult(); - }); - }; -} diff --git a/packages/aws-serverless/src/wrappers/common.ts b/packages/aws-serverless/src/wrappers/common.ts new file mode 100644 index 000000000000..a060bc18f40a --- /dev/null +++ b/packages/aws-serverless/src/wrappers/common.ts @@ -0,0 +1,111 @@ +import type { Scope } from '@sentry/core'; +import { captureMessage, withScope } from '@sentry/node'; +import type { Context } from 'aws-lambda'; + +export interface WrapperOptions { + flushTimeout: number; + callbackWaitsForEmptyEventLoop: boolean; + captureTimeoutWarning: boolean; + timeoutWarningLimit: number; + /** + * Capture all errors when `Promise.allSettled` is returned by the handler + * The {@link wrapHandler} will not fail the lambda even if there are errors + * @default false + */ + captureAllSettledReasons: boolean; + // TODO(v11): Remove this option since its no longer used. + /** + * @deprecated This option has no effect and will be removed in a future major version. + * If you want to disable tracing, set `SENTRY_TRACES_SAMPLE_RATE` to `0.0`, otherwise OpenTelemetry will automatically trace the handler. + */ + startTrace: boolean; +} + +export const AWS_HANDLER_HIGHWATERMARK = Symbol.for('aws.lambda.runtime.handler.highWaterMark'); +export const AWS_HANDLER_STREAMING = Symbol.for('aws.lambda.runtime.handler.streaming'); +export const AWS_STREAM_RESPONSE = 'response'; + +/** + * + */ +export function createDefaultWrapperOptions(wrapOptions: Partial = {}): WrapperOptions { + return { + flushTimeout: 2000, + callbackWaitsForEmptyEventLoop: false, + captureTimeoutWarning: true, + timeoutWarningLimit: 500, + captureAllSettledReasons: false, + startTrace: true, // TODO(v11): Remove this option. Set to true here to satisfy the type, but has no effect. + ...wrapOptions, + }; +} + +/** + * + */ +export function setupTimeoutWarning(context: Context, options: WrapperOptions): NodeJS.Timeout | undefined { + if (!options.captureTimeoutWarning) { + return undefined; + } + + const timeoutWarningDelay = tryGetRemainingTimeInMillis(context) - options.timeoutWarningLimit; + const humanReadableTimeout = getHumanReadableTimeout(context); + + return setTimeout(() => { + withScope(scope => { + scope.setTag('timeout', humanReadableTimeout); + captureMessage(`Possible function timeout: ${context.functionName}`, 'warning'); + }); + }, timeoutWarningDelay) as unknown as NodeJS.Timeout; +} + +/** + * Adds additional information from the environment and AWS Context to the Sentry Scope. + * + * @param scope Scope that should be enhanced + * @param context AWS Lambda context that will be used to extract some part of the data + * @param startTime performance.now() when wrapHandler was invoked + */ +export function enhanceScopeWithEnvironmentData(scope: Scope, context: Context, startTime: number): void { + scope.setContext('aws.lambda', { + aws_request_id: context.awsRequestId, + function_name: context.functionName, + function_version: context.functionVersion, + invoked_function_arn: context.invokedFunctionArn, + execution_duration_in_millis: performance.now() - startTime, + remaining_time_in_millis: tryGetRemainingTimeInMillis(context), + 'sys.argv': process.argv, + }); + + scope.setContext('aws.cloudwatch.logs', { + log_group: context.logGroupName, + log_stream: context.logStreamName, + url: `https://console.aws.amazon.com/cloudwatch/home?region=${ + process.env.AWS_REGION + }#logsV2:log-groups/log-group/${encodeURIComponent(context.logGroupName)}/log-events/${encodeURIComponent( + context.logStreamName, + )}?filterPattern="${context.awsRequestId}"`, + }); +} + +/** + * + */ +export function getHumanReadableTimeout(context: Context): string { + const configuredTimeout = Math.ceil(tryGetRemainingTimeInMillis(context) / 1000); + const configuredTimeoutMinutes = Math.floor(configuredTimeout / 60); + const configuredTimeoutSeconds = configuredTimeout % 60; + + return configuredTimeoutMinutes > 0 + ? `${configuredTimeoutMinutes}m${configuredTimeoutSeconds}s` + : `${configuredTimeoutSeconds}s`; +} + +/** + * Tries to invoke context.getRemainingTimeInMillis if not available returns 0 + * Some environments use AWS lambda but don't support this function + * @param context + */ +function tryGetRemainingTimeInMillis(context: Context): number { + return typeof context.getRemainingTimeInMillis === 'function' ? context.getRemainingTimeInMillis() : 0; +} diff --git a/packages/aws-serverless/src/wrappers/wrap-handler.ts b/packages/aws-serverless/src/wrappers/wrap-handler.ts new file mode 100644 index 000000000000..b8bdb71db492 --- /dev/null +++ b/packages/aws-serverless/src/wrappers/wrap-handler.ts @@ -0,0 +1,165 @@ +import { consoleSandbox, debug } from '@sentry/core'; +import { captureException, flush, getCurrentScope, withScope } from '@sentry/node'; +import type { Context, Handler, StreamifyHandler } from 'aws-lambda'; +import { isPromise } from 'util/types'; +import { DEBUG_BUILD } from '../debug-build'; +import { markEventUnhandled } from '../utils'; +import { + type WrapperOptions, + AWS_HANDLER_STREAMING, + AWS_STREAM_RESPONSE, + createDefaultWrapperOptions, + enhanceScopeWithEnvironmentData, + setupTimeoutWarning, +} from './common'; +import { wrapStreamingHandler } from './wrap-streaming-handler'; + +/** + * Wraps a lambda handler adding it error capture and tracing capabilities. + * + * @param handler Handler + * @param options Options + * @returns Handler + */ +export function wrapHandler( + handler: Handler, + wrapOptions?: Partial, +): Handler; + +/** + * Wraps a streaming lambda handler adding it error capture and tracing capabilities. + * + * @param handler Streaming Handler + * @param options Options + * @returns Streaming Handler + */ +export function wrapHandler( + handler: StreamifyHandler, + wrapOptions?: Partial, +): StreamifyHandler; + +/** + * Implementation function that wraps both regular and streaming handlers + */ +export function wrapHandler( + handler: Handler | StreamifyHandler, + wrapOptions: Partial = {}, +): Handler | StreamifyHandler { + // eslint-disable-next-line deprecation/deprecation + if (typeof wrapOptions.startTrace !== 'undefined') { + consoleSandbox(() => { + // eslint-disable-next-line no-console + console.warn( + 'The `startTrace` option is deprecated and will be removed in a future major version. If you want to disable tracing, set `SENTRY_TRACES_SAMPLE_RATE` to `0.0`.', + ); + }); + } + + if (isStreamingHandler(handler)) { + return wrapStreamingHandler(handler, wrapOptions); + } + + const START_TIME = performance.now(); + const options = createDefaultWrapperOptions(wrapOptions); + let timeoutWarningTimer: NodeJS.Timeout | undefined; + + // AWSLambda is like Express. It makes a distinction about handlers based on its last argument + // async (event) => async handler + // async (event, context) => async handler + // (event, context, callback) => sync handler + // Nevertheless whatever option is chosen by user, we convert it to async handler. + const regularHandler = handler as Handler; + const asyncHandler: AsyncHandler = + regularHandler.length > 2 + ? (event, context) => + new Promise((resolve, reject) => { + const rv = (regularHandler as SyncHandler)(event, context, (error, result) => { + if (error === null || error === undefined) { + resolve(result!); // eslint-disable-line @typescript-eslint/no-non-null-assertion + } else { + reject(error); + } + }) as unknown; + + // This should never happen, but still can if someone writes a handler as + // `async (event, context, callback) => {}` + if (isPromise(rv)) { + void (rv as Promise>).then(resolve, reject); + } + }) + : (regularHandler as AsyncHandler); + + return async (event: TEvent, context: Context) => { + context.callbackWaitsForEmptyEventLoop = options.callbackWaitsForEmptyEventLoop; + + timeoutWarningTimer = setupTimeoutWarning(context, options); + + async function processResult(): Promise { + const scope = getCurrentScope(); + + let rv: TResult; + try { + enhanceScopeWithEnvironmentData(scope, context, START_TIME); + + rv = await asyncHandler(event, context); + + // We manage lambdas that use Promise.allSettled by capturing the errors of failed promises + if (options.captureAllSettledReasons && Array.isArray(rv) && isPromiseAllSettledResult(rv)) { + const reasons = getRejectedReasons(rv); + reasons.forEach(exception => { + captureException(exception, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.promise')); + }); + } + } catch (e) { + captureException(e, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.handler')); + throw e; + } finally { + if (timeoutWarningTimer) { + clearTimeout(timeoutWarningTimer); + } + + await flush(options.flushTimeout).catch(e => { + DEBUG_BUILD && debug.error(e); + }); + } + return rv; + } + + return withScope(() => processResult()); + }; +} + +function isStreamingHandler( + handler: Handler | StreamifyHandler, +): handler is StreamifyHandler { + return (handler as unknown as Record)[AWS_HANDLER_STREAMING] === AWS_STREAM_RESPONSE; +} + +type AsyncHandler = ( + event: Parameters[0], + context: Parameters[1], +) => Promise[2]>[1]>>; + +// https://www.npmjs.com/package/aws-lambda-consumer +type SyncHandler = ( + event: Parameters[0], + context: Parameters[1], + callback: Parameters[2], +) => void; + +type PromiseSettledResult = { status: 'rejected' | 'fulfilled'; reason?: T }; + +function getRejectedReasons(results: PromiseSettledResult[]): T[] { + return results.reduce((rejected: T[], result) => { + if (result.status === 'rejected' && result.reason) rejected.push(result.reason); + return rejected; + }, []); +} + +function isPromiseAllSettledResult(result: T[]): boolean { + return result.every( + v => + Object.prototype.hasOwnProperty.call(v, 'status') && + (Object.prototype.hasOwnProperty.call(v, 'value') || Object.prototype.hasOwnProperty.call(v, 'reason')), + ); +} diff --git a/packages/aws-serverless/src/wrappers/wrap-streaming-handler.ts b/packages/aws-serverless/src/wrappers/wrap-streaming-handler.ts new file mode 100644 index 000000000000..18df2bc03c88 --- /dev/null +++ b/packages/aws-serverless/src/wrappers/wrap-streaming-handler.ts @@ -0,0 +1,70 @@ +import { debug } from '@sentry/core'; +import { captureException, flush, getCurrentScope, withScope } from '@sentry/node'; +import type { Context, StreamifyHandler } from 'aws-lambda'; +import { DEBUG_BUILD } from '../debug-build'; +import { markEventUnhandled } from '../utils'; +import { + type WrapperOptions, + AWS_HANDLER_HIGHWATERMARK, + AWS_HANDLER_STREAMING, + createDefaultWrapperOptions, + enhanceScopeWithEnvironmentData, + setupTimeoutWarning, +} from './common'; + +type HttpResponseStream = Parameters[1]; + +/** + * + */ +export function wrapStreamingHandler( + handler: StreamifyHandler, + wrapOptions: Partial = {}, +): StreamifyHandler { + const START_TIME = performance.now(); + const options = createDefaultWrapperOptions(wrapOptions); + let timeoutWarningTimer: NodeJS.Timeout | undefined; + + const wrappedHandler = async ( + event: TEvent, + responseStream: HttpResponseStream, + context: Context, + ): Promise => { + context.callbackWaitsForEmptyEventLoop = options.callbackWaitsForEmptyEventLoop; + + timeoutWarningTimer = setupTimeoutWarning(context, options); + + async function processStreamingResult(): Promise { + const scope = getCurrentScope(); + + try { + enhanceScopeWithEnvironmentData(scope, context, START_TIME); + + responseStream.on('error', error => { + captureException(error, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.stream')); + }); + + return await handler(event, responseStream, context); + } catch (e) { + captureException(e, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.handler')); + throw e; + } finally { + if (timeoutWarningTimer) { + clearTimeout(timeoutWarningTimer); + } + + await flush(options.flushTimeout).catch(e => DEBUG_BUILD && debug.error(e)); + } + } + + return withScope(() => processStreamingResult()); + }; + + const handlerWithSymbols = handler as unknown as Record; + (wrappedHandler as unknown as Record)[AWS_HANDLER_STREAMING] = + handlerWithSymbols[AWS_HANDLER_STREAMING]; + (wrappedHandler as unknown as Record)[AWS_HANDLER_HIGHWATERMARK] = + handlerWithSymbols[AWS_HANDLER_HIGHWATERMARK]; + + return wrappedHandler; +} diff --git a/packages/aws-serverless/test/sdk.test.ts b/packages/aws-serverless/test/sdk.test.ts index 648ef4caeaec..d5a560f6e84b 100644 --- a/packages/aws-serverless/test/sdk.test.ts +++ b/packages/aws-serverless/test/sdk.test.ts @@ -367,6 +367,193 @@ describe('AWSLambda', () => { }); }); + describe('wrapHandler() on streaming handlers', () => { + const STREAMING_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming'); + + // Mock response stream with common stream interface + const mockResponseStream = { + write: vi.fn(), + end: vi.fn(), + destroy: vi.fn(), + on: vi.fn(), + setContentType: vi.fn(), + writable: true, + writableEnded: false, + writableFinished: false, + }; + + beforeEach(() => { + vi.clearAllMocks(); + mockResponseStream.write.mockClear(); + mockResponseStream.end.mockClear(); + mockResponseStream.destroy.mockClear(); + mockResponseStream.on.mockClear(); + }); + + test('successful execution', async () => { + expect.assertions(5); + + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + return 42; + }); + // Add the streaming symbol to mark it as a streaming handler + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler); + const rv = await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + + expect(rv).toStrictEqual(42); + expectScopeSettings(); + expect(streamingHandler).toHaveBeenCalledWith(fakeEvent, mockResponseStream, fakeContext); + expect(mockFlush).toBeCalledWith(2000); + }); + + test('preserves streaming symbol on wrapped handler', () => { + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + return 42; + }); + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler); + + expect((wrappedHandler as any)[STREAMING_SYMBOL]).toBe('response'); + }); + + test('event, responseStream and context are correctly passed along', async () => { + expect.assertions(3); + + const streamingHandler = vi.fn(async (event, responseStream, context) => { + expect(event).toHaveProperty('fortySix'); + expect(responseStream).toBe(mockResponseStream); + expect(context).toHaveProperty('ytho'); + return 'success'; + }); + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler); + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + }); + + test('capture error from handler execution', async () => { + expect.assertions(4); + + const error = new Error('streaming handler error'); + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + throw error; + }); + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler); + + try { + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + } catch { + expectScopeSettings(); + expect(mockCaptureException).toBeCalledWith(error, expect.any(Function)); + expect(mockFlush).toBeCalled(); + } + }); + + test('capture stream errors', async () => { + expect.assertions(3); + + const streamError = new Error('stream error'); + const streamingHandler = vi.fn(async (_event, responseStream, _context) => { + // Simulate stream error by calling the error listener + const errorListener = (responseStream.on as any).mock.calls.find((call: any[]) => call[0] === 'error')?.[1]; + if (errorListener) { + errorListener(streamError); + } + return 'success'; + }); + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler); + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + + expect(mockResponseStream.on).toHaveBeenCalledWith('error', expect.any(Function)); + expect(mockCaptureException).toHaveBeenCalledWith(streamError, expect.any(Function)); + expect(streamingHandler).toHaveBeenCalledWith(fakeEvent, mockResponseStream, fakeContext); + }); + + test('streaming handler with flushTimeout option', async () => { + expect.assertions(2); + + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + return 'flushed'; + }); + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler, { flushTimeout: 5000 }); + const result = await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + + expect(result).toBe('flushed'); + expect(mockFlush).toBeCalledWith(5000); + }); + + test('streaming handler with captureTimeoutWarning enabled', async () => { + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + // Simulate some delay to trigger timeout warning + await new Promise(resolve => setTimeout(resolve, DEFAULT_EXECUTION_TIME)); + return 'completed'; + }); + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler); + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + + expect(mockWithScope).toBeCalledTimes(2); + expect(mockCaptureMessage).toBeCalled(); + expect(mockScope.setTag).toBeCalledWith('timeout', '1s'); + }); + + test('marks streaming handler captured errors as unhandled', async () => { + expect.assertions(3); + + const error = new Error('streaming error'); + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + throw error; + }); + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler); + + try { + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + } catch { + expect(mockCaptureException).toBeCalledWith(error, expect.any(Function)); + + const scopeFunction = mockCaptureException.mock.calls[0]?.[1]; + const event: Event = { exception: { values: [{}] } }; + let evtProcessor: ((e: Event) => Event) | undefined = undefined; + if (scopeFunction) { + scopeFunction({ addEventProcessor: vi.fn().mockImplementation(proc => (evtProcessor = proc)) }); + } + + expect(evtProcessor).toBeInstanceOf(Function); + // @ts-expect-error just mocking around... + expect(evtProcessor!(event).exception.values[0]?.mechanism).toEqual({ + handled: false, + type: 'auto.function.aws-serverless.handler', + }); + } + }); + + test('should not throw when flush rejects with streaming handler', async () => { + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + return 'flush-error-test'; + }); + (streamingHandler as any)[STREAMING_SYMBOL] = 'response'; + + const wrappedHandler = wrapHandler(streamingHandler); + mockFlush.mockImplementationOnce(() => Promise.reject(new Error('flush failed'))); + + await expect((wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext)).resolves.toBe( + 'flush-error-test', + ); + }); + }); + test('marks the captured error as unhandled', async () => { expect.assertions(3);