Skip to content

Commit 7303ab1

Browse files
authored
feat(aws): Add support for streaming handlers (#17463)
closes #15774
1 parent 647dbfe commit 7303ab1

File tree

5 files changed

+498
-81
lines changed

5 files changed

+498
-81
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import * as Sentry from '@sentry/aws-serverless';
2+
3+
export const handler = awslambda.streamifyResponse(async (event, responseStream, context) => {
4+
Sentry.startSpan({ name: 'manual-span', op: 'test' }, async () => {
5+
responseStream.write('Hello, world!');
6+
responseStream.end();
7+
});
8+
});

dev-packages/e2e-tests/test-applications/aws-serverless/tests/layer.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,52 @@ test.describe('Lambda layer', () => {
194194
}),
195195
);
196196
});
197+
198+
test('streaming handlers work', async ({ lambdaClient }) => {
199+
const transactionEventPromise = waitForTransaction('aws-serverless-lambda-sam', transactionEvent => {
200+
return transactionEvent?.transaction === 'LayerStreaming';
201+
});
202+
203+
await lambdaClient.send(
204+
new InvokeCommand({
205+
FunctionName: 'LayerStreaming',
206+
Payload: JSON.stringify({}),
207+
}),
208+
);
209+
210+
const transactionEvent = await transactionEventPromise;
211+
212+
expect(transactionEvent.transaction).toEqual('LayerStreaming');
213+
expect(transactionEvent.contexts?.trace).toEqual({
214+
data: {
215+
'sentry.sample_rate': 1,
216+
'sentry.source': 'custom',
217+
'sentry.origin': 'auto.otel.aws-lambda',
218+
'sentry.op': 'function.aws.lambda',
219+
'cloud.account.id': '012345678912',
220+
'faas.execution': expect.any(String),
221+
'faas.id': 'arn:aws:lambda:us-east-1:012345678912:function:LayerStreaming',
222+
'faas.coldstart': true,
223+
'otel.kind': 'SERVER',
224+
},
225+
op: 'function.aws.lambda',
226+
origin: 'auto.otel.aws-lambda',
227+
span_id: expect.stringMatching(/[a-f0-9]{16}/),
228+
status: 'ok',
229+
trace_id: expect.stringMatching(/[a-f0-9]{32}/),
230+
});
231+
232+
expect(transactionEvent.spans).toHaveLength(1);
233+
234+
expect(transactionEvent.spans).toContainEqual(
235+
expect.objectContaining({
236+
data: expect.objectContaining({
237+
'sentry.op': 'test',
238+
'sentry.origin': 'manual',
239+
}),
240+
description: 'manual-span',
241+
op: 'test',
242+
}),
243+
);
244+
});
197245
});

packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts

Lines changed: 144 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// - Added Sentry `wrapHandler` around the OTel patch handler.
44
// - Cancel init when handler string is invalid (TS)
55
// - Hardcoded package version and name
6+
// - Added support for streaming handlers
67
/* eslint-disable */
78
/*
89
* Copyright The OpenTelemetry Authors
@@ -50,7 +51,7 @@ import {
5051
SEMRESATTRS_CLOUD_ACCOUNT_ID,
5152
SEMRESATTRS_FAAS_ID,
5253
} from '@opentelemetry/semantic-conventions';
53-
import type { APIGatewayProxyEventHeaders, Callback, Context, Handler } from 'aws-lambda';
54+
import type { APIGatewayProxyEventHeaders, Callback, Context, Handler, StreamifyHandler } from 'aws-lambda';
5455
import * as fs from 'fs';
5556
import * as path from 'path';
5657
import type { LambdaModule } from './internal-types';
@@ -73,6 +74,9 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
7374
};
7475

7576
export const lambdaMaxInitInMilliseconds = 10_000;
77+
const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming');
78+
const AWS_HANDLER_HIGHWATERMARK_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming.highWaterMark');
79+
const AWS_HANDLER_STREAMING_RESPONSE = 'response';
7680

7781
/**
7882
*
@@ -101,6 +105,21 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
101105
return [];
102106
}
103107

108+
// Provide a temporary awslambda polyfill for CommonJS modules during loading
109+
// This prevents ReferenceError when modules use awslambda.streamifyResponse at load time
110+
// taken from https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/main/src/UserFunction.js#L205C7-L211C9
111+
if (typeof globalThis.awslambda === 'undefined') {
112+
(globalThis as any).awslambda = {
113+
streamifyResponse: (handler: any, options: any) => {
114+
handler[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE;
115+
if (typeof options?.highWaterMark === 'number') {
116+
handler[AWS_HANDLER_HIGHWATERMARK_SYMBOL] = parseInt(options.highWaterMark);
117+
}
118+
return handler;
119+
},
120+
};
121+
}
122+
104123
const handler = path.basename(handlerDef);
105124
const moduleRoot = handlerDef.substring(0, handlerDef.length - handler.length);
106125

@@ -187,16 +206,33 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
187206
/**
188207
*
189208
*/
190-
private _getHandler(handlerLoadStartTime: number) {
191-
return (original: Handler) => {
192-
return wrapHandler(this._getPatchHandler(original, handlerLoadStartTime));
209+
private _getHandler<T extends Handler | StreamifyHandler>(handlerLoadStartTime: number) {
210+
return (original: T): T => {
211+
if (this._isStreamingHandler(original)) {
212+
const patchedHandler = this._getPatchHandler(original, handlerLoadStartTime);
213+
214+
// Streaming handlers have special symbols that we need to copy over to the patched handler.
215+
(patchedHandler as unknown as Record<symbol, unknown>)[AWS_HANDLER_STREAMING_SYMBOL] = (
216+
original as unknown as Record<symbol, unknown>
217+
)[AWS_HANDLER_STREAMING_SYMBOL];
218+
(patchedHandler as unknown as Record<symbol, unknown>)[AWS_HANDLER_HIGHWATERMARK_SYMBOL] = (
219+
original as unknown as Record<symbol, unknown>
220+
)[AWS_HANDLER_HIGHWATERMARK_SYMBOL];
221+
222+
return wrapHandler(patchedHandler) as T;
223+
}
224+
225+
return wrapHandler(this._getPatchHandler(original, handlerLoadStartTime)) as T;
193226
};
194227
}
195228

229+
private _getPatchHandler(original: Handler, lambdaStartTime: number): Handler;
230+
private _getPatchHandler(original: StreamifyHandler, lambdaStartTime: number): StreamifyHandler;
231+
196232
/**
197233
*
198234
*/
199-
private _getPatchHandler(original: Handler, lambdaStartTime: number) {
235+
private _getPatchHandler(original: Handler | StreamifyHandler, lambdaStartTime: number): Handler | StreamifyHandler {
200236
diag.debug('patch handler function');
201237
const plugin = this;
202238

@@ -229,6 +265,36 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
229265
}
230266
}
231267

268+
if (this._isStreamingHandler(original)) {
269+
return function patchedStreamingHandler(
270+
this: never,
271+
// The event can be a user type, it truly is any.
272+
event: any,
273+
responseStream: Parameters<StreamifyHandler>[1],
274+
context: Context,
275+
) {
276+
_onRequest();
277+
const parent = plugin._determineParent(event, context);
278+
const span = plugin._createSpanForRequest(event, context, requestIsColdStart, parent);
279+
plugin._applyRequestHook(span, event, context);
280+
281+
return otelContext.with(trace.setSpan(parent, span), () => {
282+
const maybePromise = safeExecuteInTheMiddle(
283+
() => original.apply(this, [event, responseStream, context]),
284+
error => {
285+
if (error != null) {
286+
// Exception thrown synchronously before resolving promise.
287+
plugin._applyResponseHook(span, error);
288+
plugin._endSpan(span, error, () => {});
289+
}
290+
},
291+
) as Promise<{}> | undefined;
292+
293+
return plugin._handlePromiseResult(span, maybePromise);
294+
});
295+
};
296+
}
297+
232298
return function patchedHandler(
233299
this: never,
234300
// The event can be a user type, it truly is any.
@@ -239,39 +305,10 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
239305
) {
240306
_onRequest();
241307

242-
const config = plugin.getConfig();
243-
const parent = AwsLambdaInstrumentation._determineParent(
244-
event,
245-
context,
246-
config.eventContextExtractor || AwsLambdaInstrumentation._defaultEventContextExtractor,
247-
);
308+
const parent = plugin._determineParent(event, context);
248309

249-
const name = context.functionName;
250-
const span = plugin.tracer.startSpan(
251-
name,
252-
{
253-
kind: SpanKind.SERVER,
254-
attributes: {
255-
[SEMATTRS_FAAS_EXECUTION]: context.awsRequestId,
256-
[SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn,
257-
[SEMRESATTRS_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(context.invokedFunctionArn),
258-
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
259-
...AwsLambdaInstrumentation._extractOtherEventFields(event),
260-
},
261-
},
262-
parent,
263-
);
264-
265-
const { requestHook } = config;
266-
if (requestHook) {
267-
safeExecuteInTheMiddle(
268-
() => requestHook(span, { event, context }),
269-
e => {
270-
if (e) diag.error('aws-lambda instrumentation: requestHook error', e);
271-
},
272-
true,
273-
);
274-
}
310+
const span = plugin._createSpanForRequest(event, context, requestIsColdStart, parent);
311+
plugin._applyRequestHook(span, event, context);
275312

276313
return otelContext.with(trace.setSpan(parent, span), () => {
277314
// Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling
@@ -289,23 +326,80 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
289326
}
290327
},
291328
) as Promise<{}> | undefined;
292-
if (typeof maybePromise?.then === 'function') {
293-
return maybePromise.then(
294-
value => {
295-
plugin._applyResponseHook(span, null, value);
296-
return new Promise(resolve => plugin._endSpan(span, undefined, () => resolve(value)));
297-
},
298-
(err: Error | string) => {
299-
plugin._applyResponseHook(span, err);
300-
return new Promise((resolve, reject) => plugin._endSpan(span, err, () => reject(err)));
301-
},
302-
);
303-
}
304-
return maybePromise;
329+
330+
return plugin._handlePromiseResult(span, maybePromise);
305331
});
306332
};
307333
}
308334

335+
private _createSpanForRequest(event: any, context: Context, requestIsColdStart: boolean, parent: OtelContext): Span {
336+
const name = context.functionName;
337+
return this.tracer.startSpan(
338+
name,
339+
{
340+
kind: SpanKind.SERVER,
341+
attributes: {
342+
[SEMATTRS_FAAS_EXECUTION]: context.awsRequestId,
343+
[SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn,
344+
[SEMRESATTRS_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(context.invokedFunctionArn),
345+
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
346+
...AwsLambdaInstrumentation._extractOtherEventFields(event),
347+
},
348+
},
349+
parent,
350+
);
351+
}
352+
353+
private _applyRequestHook(span: Span, event: any, context: Context): void {
354+
const { requestHook } = this.getConfig();
355+
if (requestHook) {
356+
safeExecuteInTheMiddle(
357+
() => requestHook(span, { event, context }),
358+
e => {
359+
if (e) diag.error('aws-lambda instrumentation: requestHook error', e);
360+
},
361+
true,
362+
);
363+
}
364+
}
365+
366+
private _handlePromiseResult(span: Span, maybePromise: Promise<{}> | undefined): Promise<{}> | undefined {
367+
if (typeof maybePromise?.then === 'function') {
368+
return maybePromise.then(
369+
value => {
370+
this._applyResponseHook(span, null, value);
371+
return new Promise(resolve => this._endSpan(span, undefined, () => resolve(value)));
372+
},
373+
(err: Error | string) => {
374+
this._applyResponseHook(span, err);
375+
return new Promise((resolve, reject) => this._endSpan(span, err, () => reject(err)));
376+
},
377+
);
378+
}
379+
380+
// Handle synchronous return values by ending the span and applying response hook
381+
this._applyResponseHook(span, null, maybePromise);
382+
this._endSpan(span, undefined, () => {});
383+
return maybePromise;
384+
}
385+
386+
private _determineParent(event: any, context: Context): OtelContext {
387+
const config = this.getConfig();
388+
return AwsLambdaInstrumentation._determineParent(
389+
event,
390+
context,
391+
config.eventContextExtractor || AwsLambdaInstrumentation._defaultEventContextExtractor,
392+
);
393+
}
394+
395+
private _isStreamingHandler<TEvent, TResult>(
396+
handler: Handler<TEvent, TResult> | StreamifyHandler<TEvent, TResult>,
397+
): handler is StreamifyHandler<TEvent, TResult> {
398+
return (
399+
(handler as unknown as Record<symbol, unknown>)[AWS_HANDLER_STREAMING_SYMBOL] === AWS_HANDLER_STREAMING_RESPONSE
400+
);
401+
}
402+
309403
/**
310404
*
311405
*/

0 commit comments

Comments
 (0)