From 74ab471233651aa8f86af2d4ad8cc96a37e95322 Mon Sep 17 00:00:00 2001 From: Martin Sonnberger Date: Mon, 4 Aug 2025 10:01:25 +0200 Subject: [PATCH] feat(instrumentation-aws-lamda): support streaming handlers --- .../src/instrumentation.ts | 230 +++++++--- .../src/internal-types.ts | 4 +- .../test/integrations/lambda-handler.test.ts | 400 ++++++++++++++++++ .../test/lambda-test/streaming.js | 94 ++++ 4 files changed, 674 insertions(+), 54 deletions(-) create mode 100644 packages/instrumentation-aws-lambda/test/lambda-test/streaming.js diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index b67a72ca4e..cf02551d86 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -52,6 +52,7 @@ import { Callback, Context, Handler, + StreamifyHandler, } from 'aws-lambda'; import { AwsLambdaInstrumentationConfig, EventContextExtractor } from './types'; @@ -69,6 +70,10 @@ const headerGetter: TextMapGetter = { }; export const lambdaMaxInitInMilliseconds = 10_000; +export const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for( + 'aws.lambda.runtime.handler.streaming' +); +export const AWS_HANDLER_STREAMING_RESPONSE = 'response'; export class AwsLambdaInstrumentation extends InstrumentationBase { private declare _traceForceFlusher?: () => Promise; @@ -91,6 +96,19 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { + // Add the streaming symbols that the instrumentation looks for + handler[AWS_HANDLER_STREAMING_SYMBOL] = + AWS_HANDLER_STREAMING_RESPONSE; + return handler; + }, + }; + } + const handler = path.basename(handlerDef); const moduleRoot = handlerDef.substring( 0, @@ -175,12 +193,40 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { + return ( + original: Handler | StreamifyHandler + ): Handler | StreamifyHandler => { + if (this._isStreamingHandler(original)) { + const patchedHandler = this._getPatchHandler( + original, + handlerLoadStartTime + ); + + // Streaming handlers have special symbols that we need to copy over to the patched handler. + for (const symbol of Object.getOwnPropertySymbols(original)) { + (patchedHandler as unknown as Record)[symbol] = ( + original as unknown as Record + )[symbol]; + } + + return patchedHandler; + } + return this._getPatchHandler(original, handlerLoadStartTime); }; } - private _getPatchHandler(original: Handler, lambdaStartTime: number) { + private _getPatchHandler(original: Handler, lambdaStartTime: number): Handler; + + private _getPatchHandler( + original: StreamifyHandler, + lambdaStartTime: number + ): StreamifyHandler; + + private _getPatchHandler( + original: Handler | StreamifyHandler, + lambdaStartTime: number + ): Handler | StreamifyHandler { diag.debug('patch handler function'); const plugin = this; @@ -215,6 +261,43 @@ export class AwsLambdaInstrumentation extends InstrumentationBase[1], + context: Context + ) { + _onRequest(); + + const parent = plugin._determineParent(event, context); + const span = plugin._createSpanForRequest( + event, + context, + requestIsColdStart, + parent + ); + plugin._applyRequestHook(span, event, context); + + return otelContext.with(trace.setSpan(parent, span), () => { + const maybePromise = safeExecuteInTheMiddle( + () => original.apply(this, [event, responseStream, context]), + error => { + if (error != null) { + // Exception thrown synchronously before resolving promise. + plugin._applyResponseHook(span, error); + plugin._endSpan(span, error, () => {}); + } + } + ) as Promise<{}> | undefined; + + return plugin._handlePromiseResult(span, maybePromise); + }); + }; + } + return function patchedHandler( this: never, // The event can be a user type, it truly is any. @@ -225,44 +308,16 @@ export class AwsLambdaInstrumentation extends InstrumentationBase requestHook(span, { event, context }), - e => { - if (e) - diag.error('aws-lambda instrumentation: requestHook error', e); - }, - true - ); - } + plugin._applyRequestHook(span, event, context); return otelContext.with(trace.setSpan(parent, span), () => { // Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling @@ -280,27 +335,98 @@ export class AwsLambdaInstrumentation extends InstrumentationBase | undefined; - if (typeof maybePromise?.then === 'function') { - return maybePromise.then( - value => { - plugin._applyResponseHook(span, null, value); - return new Promise(resolve => - plugin._endSpan(span, undefined, () => resolve(value)) - ); - }, - (err: Error | string) => { - plugin._applyResponseHook(span, err); - return new Promise((resolve, reject) => - plugin._endSpan(span, err, () => reject(err)) - ); - } - ); - } - return maybePromise; + + return plugin._handlePromiseResult(span, maybePromise); }); }; } + private _createSpanForRequest( + event: any, + context: Context, + requestIsColdStart: boolean, + parent: OtelContext + ): Span { + const name = context.functionName; + return this.tracer.startSpan( + name, + { + kind: SpanKind.SERVER, + attributes: { + [SEMATTRS_FAAS_EXECUTION]: context.awsRequestId, + [SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn, + [SEMRESATTRS_CLOUD_ACCOUNT_ID]: + AwsLambdaInstrumentation._extractAccountId( + context.invokedFunctionArn + ), + [ATTR_FAAS_COLDSTART]: requestIsColdStart, + ...AwsLambdaInstrumentation._extractOtherEventFields(event), + }, + }, + parent + ); + } + + private _applyRequestHook(span: Span, event: any, context: Context): void { + const { requestHook } = this.getConfig(); + if (requestHook) { + safeExecuteInTheMiddle( + () => requestHook(span, { event, context }), + e => { + if (e) diag.error('aws-lambda instrumentation: requestHook error', e); + }, + true + ); + } + } + + private _handlePromiseResult( + span: Span, + maybePromise: Promise<{}> | undefined + ): Promise<{}> | undefined { + if (typeof maybePromise?.then === 'function') { + return maybePromise.then( + value => { + this._applyResponseHook(span, null, value); + return new Promise(resolve => + this._endSpan(span, undefined, () => resolve(value)) + ); + }, + (err: Error | string) => { + this._applyResponseHook(span, err); + return new Promise((resolve, reject) => + this._endSpan(span, err, () => reject(err)) + ); + } + ); + } + + // Handle synchronous return values by ending the span and applying response hook + this._applyResponseHook(span, null, maybePromise); + this._endSpan(span, undefined, () => {}); + return maybePromise; + } + + private _determineParent(event: any, context: Context): OtelContext { + const config = this.getConfig(); + return AwsLambdaInstrumentation._determineParent( + event, + context, + config.eventContextExtractor || + AwsLambdaInstrumentation._defaultEventContextExtractor + ); + } + + private _isStreamingHandler( + handler: Handler | StreamifyHandler + ): handler is StreamifyHandler { + return ( + (handler as unknown as Record)[ + AWS_HANDLER_STREAMING_SYMBOL + ] === AWS_HANDLER_STREAMING_RESPONSE + ); + } + override setTracerProvider(tracerProvider: TracerProvider) { super.setTracerProvider(tracerProvider); this._traceForceFlusher = this._traceForceFlush(tracerProvider); diff --git a/packages/instrumentation-aws-lambda/src/internal-types.ts b/packages/instrumentation-aws-lambda/src/internal-types.ts index 55a387e39a..50728ccbb5 100644 --- a/packages/instrumentation-aws-lambda/src/internal-types.ts +++ b/packages/instrumentation-aws-lambda/src/internal-types.ts @@ -13,6 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { Handler } from 'aws-lambda'; +import { Handler, StreamifyHandler } from 'aws-lambda'; -export type LambdaModule = Record; +export type LambdaModule = Record; diff --git a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts index 8ec2e8614f..ac413ee203 100644 --- a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts +++ b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts @@ -24,6 +24,10 @@ import { AwsLambdaInstrumentationConfig, lambdaMaxInitInMilliseconds, } from '../../src'; +import { + AWS_HANDLER_STREAMING_RESPONSE, + AWS_HANDLER_STREAMING_SYMBOL, +} from '../../src/instrumentation'; import { BatchSpanProcessor, InMemorySpanExporter, @@ -827,4 +831,400 @@ describe('lambda handler', () => { ); }); }); + + describe('streaming handlers', () => { + const createMockResponseStream = () => ({ + write: () => {}, + end: () => {}, + }); + + describe('async streaming handler success', () => { + it('should export a valid span', async () => { + initializeHandler('lambda-test/streaming.handler'); + + const responseStream = createMockResponseStream(); + const result = await lambdaRequire('lambda-test/streaming').handler( + 'arg', + responseStream, + ctx + ); + assert.strictEqual(result, 'stream-ok'); + + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assert.strictEqual(spans.length, 1); + assertSpanSuccess(span); + assert.strictEqual(span.parentSpanContext?.spanId, undefined); + }); + + it('should record coldstart for streaming handlers', async () => { + initializeHandler('lambda-test/streaming.handler'); + + const handlerModule = lambdaRequire('lambda-test/streaming'); + const responseStream1 = createMockResponseStream(); + const responseStream2 = createMockResponseStream(); + + const result1 = await handlerModule.handler( + 'arg', + responseStream1, + ctx + ); + const result2 = await handlerModule.handler( + 'arg', + responseStream2, + ctx + ); + + const spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2); + const [span1, span2] = spans; + + assert.strictEqual(result1, 'stream-ok'); + assertSpanSuccess(span1); + assert.strictEqual(span1.parentSpanContext?.spanId, undefined); + assert.strictEqual(span1.attributes[SEMATTRS_FAAS_COLDSTART], true); + + assert.strictEqual(result2, 'stream-ok'); + assertSpanSuccess(span2); + assert.strictEqual(span2.parentSpanContext?.spanId, undefined); + assert.strictEqual(span2.attributes[SEMATTRS_FAAS_COLDSTART], false); + }); + + it('context should have parent trace', async () => { + initializeHandler('lambda-test/streaming.context'); + + const responseStream = createMockResponseStream(); + const result = await lambdaRequire('lambda-test/streaming').context( + 'arg', + responseStream, + ctx + ); + + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assert.strictEqual(span.spanContext().traceId, result); + }); + }); + + describe('streaming handler errors', () => { + it('should record error', async () => { + initializeHandler('lambda-test/streaming.error'); + + let err: Error; + try { + const responseStream = createMockResponseStream(); + await lambdaRequire('lambda-test/streaming').error( + 'arg', + responseStream, + ctx + ); + } catch (e: any) { + err = e; + } + assert.strictEqual(err!.message, 'handler error'); + + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assert.strictEqual(spans.length, 1); + assertSpanFailure(span); + assert.strictEqual(span.parentSpanContext?.spanId, undefined); + }); + + it('should record string error', async () => { + initializeHandler('lambda-test/streaming.stringerror'); + + let err: string; + try { + const responseStream = createMockResponseStream(); + await lambdaRequire('lambda-test/streaming').stringerror( + 'arg', + responseStream, + ctx + ); + } catch (e: any) { + err = e; + } + assert.strictEqual(err!, 'handler error'); + + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assertSpanFailure(span); + assert.strictEqual(span.parentSpanContext?.spanId, undefined); + }); + + it('should record error after writing to stream', async () => { + initializeHandler('lambda-test/streaming.errorAfterWrite'); + + let err: Error; + try { + const responseStream = createMockResponseStream(); + await lambdaRequire('lambda-test/streaming').errorAfterWrite( + 'arg', + responseStream, + ctx + ); + } catch (e: any) { + err = e; + } + assert.strictEqual(err!.message, 'handler error after write'); + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assert.strictEqual(spans.length, 1); + assert.strictEqual(span.status.code, SpanStatusCode.ERROR); + assert.strictEqual(span.status.message, 'handler error after write'); + }); + + it('should record promise rejection error', async () => { + initializeHandler('lambda-test/streaming.promiseReject'); + + let err: Error; + try { + const responseStream = createMockResponseStream(); + await lambdaRequire('lambda-test/streaming').promiseReject( + 'arg', + responseStream, + ctx + ); + } catch (e: any) { + err = e; + } + assert.strictEqual(err!.message, 'promise rejection error'); + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assert.strictEqual(spans.length, 1); + assert.strictEqual(span.status.code, SpanStatusCode.ERROR); + assert.strictEqual(span.status.message, 'promise rejection error'); + }); + }); + + describe('sync streaming handler', () => { + it('should export a valid span for sync streaming handler', async () => { + initializeHandler('lambda-test/streaming.syncHandler'); + + const responseStream = createMockResponseStream(); + const result = await lambdaRequire('lambda-test/streaming').syncHandler( + 'arg', + responseStream, + ctx + ); + assert.strictEqual(result, 'sync-ok'); + + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assert.strictEqual(spans.length, 1); + assertSpanSuccess(span); + assert.strictEqual(span.parentSpanContext?.spanId, undefined); + }); + }); + + describe('streaming handler with remote parent', () => { + beforeEach(() => { + propagation.disable(); + }); + + it('uses globally registered propagator with streaming handler', async () => { + propagation.setGlobalPropagator(new AWSXRayPropagator()); + initializeHandler('lambda-test/streaming.handler'); + + const proxyEvent = { + headers: { + 'x-amzn-trace-id': sampledAwsHeader, + }, + }; + + const responseStream = createMockResponseStream(); + const result = await lambdaRequire('lambda-test/streaming').handler( + proxyEvent, + responseStream, + ctx + ); + assert.strictEqual(result, 'stream-ok'); + + const spans = memoryExporter.getFinishedSpans(); + + assert.strictEqual(spans.length, 1); + assert.equal( + spans[0].spanContext().traceId, + sampledAwsSpanContext.traceId + ); + assert.equal( + spans[0].parentSpanContext?.spanId, + sampledAwsSpanContext.spanId + ); + }); + + it('uses custom eventContextExtractor with streaming handler', async () => { + propagation.setGlobalPropagator(new W3CTraceContextPropagator()); + const customExtractor = (event: any): OtelContext => { + const propagator = new AWSXRayPropagator(); + return propagator.extract( + context.active(), + event.contextCarrier, + defaultTextMapGetter + ); + }; + + initializeHandler('lambda-test/streaming.handler', { + eventContextExtractor: customExtractor, + }); + + const otherEvent = { + contextCarrier: { + traceparent: sampledGenericSpan, + 'x-amzn-trace-id': sampledAwsHeader, + }, + }; + + const responseStream = createMockResponseStream(); + const result = await lambdaRequire('lambda-test/streaming').handler( + otherEvent, + responseStream, + ctx + ); + + assert.strictEqual(result, 'stream-ok'); + + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assert.strictEqual(spans.length, 1); + assertSpanSuccess(span); + assert.strictEqual( + span.spanContext().traceId, + sampledAwsSpanContext.traceId + ); + assert.strictEqual( + span.parentSpanContext?.spanId, + sampledAwsSpanContext.spanId + ); + }); + }); + + describe('streaming handler hooks', () => { + describe('requestHook with streaming', () => { + it('should apply requestHook to streaming handler', async () => { + initializeHandler('lambda-test/streaming.handler', { + requestHook: (span, { context }) => { + span.setAttribute(SEMRESATTRS_FAAS_NAME, context.functionName); + }, + }); + + const responseStream = createMockResponseStream(); + await lambdaRequire('lambda-test/streaming').handler( + 'arg', + responseStream, + ctx + ); + + const spans = memoryExporter.getFinishedSpans(); + const [span] = spans; + assert.strictEqual(spans.length, 1); + assert.strictEqual( + span.attributes[SEMRESATTRS_FAAS_NAME], + ctx.functionName + ); + assertSpanSuccess(span); + }); + }); + + describe('responseHook with streaming', () => { + const RES_ATTR = 'test.res'; + const ERR_ATTR = 'test.error'; + + const config: AwsLambdaInstrumentationConfig = { + responseHook: (span, { err, res }) => { + if (err) + span.setAttribute( + ERR_ATTR, + typeof err === 'string' ? err : err.message + ); + if (res) + span.setAttribute( + RES_ATTR, + typeof res === 'string' ? res : JSON.stringify(res) + ); + }, + }; + + it('streaming - success', async () => { + initializeHandler('lambda-test/streaming.handler', config); + + const responseStream = createMockResponseStream(); + const res = await lambdaRequire('lambda-test/streaming').handler( + 'arg', + responseStream, + ctx + ); + + const [span] = memoryExporter.getFinishedSpans(); + assert.strictEqual(span.attributes[RES_ATTR], res); + }); + + it('streaming - error', async () => { + initializeHandler('lambda-test/streaming.error', config); + + let err: Error; + try { + const responseStream = createMockResponseStream(); + await lambdaRequire('lambda-test/streaming').error( + 'arg', + responseStream, + ctx + ); + } catch (e: any) { + err = e; + } + const [span] = memoryExporter.getFinishedSpans(); + assert.strictEqual(span.attributes[ERR_ATTR], err!.message); + }); + + it('streaming - string error', async () => { + initializeHandler('lambda-test/streaming.stringerror', config); + + let err: string; + try { + const responseStream = createMockResponseStream(); + await lambdaRequire('lambda-test/streaming').stringerror( + 'arg', + responseStream, + ctx + ); + } catch (e: any) { + err = e; + } + const [span] = memoryExporter.getFinishedSpans(); + assert.strictEqual(span.attributes[ERR_ATTR], err!); + }); + }); + }); + + describe('symbol preservation', () => { + it('should preserve AWS_HANDLER_STREAMING_SYMBOL on streaming handlers', async () => { + initializeHandler('lambda-test/streaming.handler'); + + const handlerModule = lambdaRequire('lambda-test/streaming'); + const handler = handlerModule.handler; + + assert.strictEqual( + handler[AWS_HANDLER_STREAMING_SYMBOL], + AWS_HANDLER_STREAMING_RESPONSE, + 'AWS_HANDLER_STREAMING_SYMBOL should be preserved after instrumentation' + ); + }); + + it('should preserve high water mark symbol on streaming handlers', async () => { + initializeHandler( + 'lambda-test/streaming.handlerWithCustomHighWaterMark' + ); + + const handlerModule = lambdaRequire('lambda-test/streaming'); + const handler = handlerModule.handlerWithCustomHighWaterMark; + + assert.strictEqual( + handler[handlerModule.HIGH_WATER_MARK_SYMBOL], + 32768, + 'highWaterMark symbol should be preserved after instrumentation' + ); + }); + }); + }); }); diff --git a/packages/instrumentation-aws-lambda/test/lambda-test/streaming.js b/packages/instrumentation-aws-lambda/test/lambda-test/streaming.js new file mode 100644 index 0000000000..5757d16e7f --- /dev/null +++ b/packages/instrumentation-aws-lambda/test/lambda-test/streaming.js @@ -0,0 +1,94 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* eslint-disable no-unused-vars */ +const api = require('@opentelemetry/api'); +const { + AWS_HANDLER_STREAMING_SYMBOL, + AWS_HANDLER_STREAMING_RESPONSE, +} = require('../../src/instrumentation'); + +const HIGH_WATER_MARK_SYMBOL = Symbol.for( + 'aws.lambda.runtime.handler.streaming.highWaterMark' +); + +exports.HIGH_WATER_MARK_SYMBOL = HIGH_WATER_MARK_SYMBOL; + +function streamifyResponse(handler, highWaterMark) { + handler[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + if (typeof highWaterMark === 'number') { + handler[HIGH_WATER_MARK_SYMBOL] = highWaterMark; + } + + return handler; +} + +exports.handler = streamifyResponse( + async (_event, responseStream, _context) => { + responseStream.write('{"message": "ok"}'); + responseStream.end(); + return 'stream-ok'; + } +); + +exports.error = streamifyResponse(async (_event, _responseStream, _context) => { + throw new Error('handler error'); +}); + +exports.stringerror = streamifyResponse( + async (_event, _responseStream, _context) => { + throw 'handler error'; + } +); + +exports.context = streamifyResponse( + async (_event, responseStream, _lambdaContext) => { + const traceId = api.trace + .getSpan(api.context.active()) + .spanContext().traceId; + responseStream.write(`{"traceId": "${traceId}"}`); + responseStream.end(); + return traceId; + } +); + +exports.errorAfterWrite = streamifyResponse( + async (_event, responseStream, _context) => { + responseStream.write('{"start": "ok"}'); + throw new Error('handler error after write'); + } +); + +exports.syncHandler = streamifyResponse((_event, responseStream, _context) => { + responseStream.write('{"sync": "ok"}'); + responseStream.end(); + return 'sync-ok'; +}); + +exports.promiseReject = streamifyResponse( + async (_event, _responseStream, _context) => { + return Promise.reject(new Error('promise rejection error')); + } +); + +exports.handlerWithCustomHighWaterMark = streamifyResponse( + async (_event, responseStream, _context) => { + responseStream.write('{"message": "custom"}'); + responseStream.end(); + return 'custom-ok'; + }, + 32768 // Custom high water mark +);