Skip to content

Commit 59540b7

Browse files
committed
feat(aws): Add support for streaming handlers
1 parent 123be24 commit 59540b7

File tree

5 files changed

+493
-81
lines changed

5 files changed

+493
-81
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import * as Sentry from '@sentry/aws-serverless';
2+
3+
export const handler = awslambda.streamifyResponse(async (event, responseStream, context) => {
4+
Sentry.startSpan({ name: 'manual-span', op: 'test' }, async () => {
5+
responseStream.write('Hello, world!');
6+
responseStream.end();
7+
});
8+
});

dev-packages/e2e-tests/test-applications/aws-serverless/tests/layer.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,52 @@ test.describe('Lambda layer', () => {
194194
}),
195195
);
196196
});
197+
198+
test('streaming handlers work', async ({ lambdaClient }) => {
199+
const transactionEventPromise = waitForTransaction('aws-serverless-lambda-sam', transactionEvent => {
200+
return transactionEvent?.transaction === 'LayerStreaming';
201+
});
202+
203+
await lambdaClient.send(
204+
new InvokeCommand({
205+
FunctionName: 'LayerStreaming',
206+
Payload: JSON.stringify({}),
207+
}),
208+
);
209+
210+
const transactionEvent = await transactionEventPromise;
211+
212+
expect(transactionEvent.transaction).toEqual('LayerStreaming');
213+
expect(transactionEvent.contexts?.trace).toEqual({
214+
data: {
215+
'sentry.sample_rate': 1,
216+
'sentry.source': 'custom',
217+
'sentry.origin': 'auto.otel.aws-lambda',
218+
'sentry.op': 'function.aws.lambda',
219+
'cloud.account.id': '012345678912',
220+
'faas.execution': expect.any(String),
221+
'faas.id': 'arn:aws:lambda:us-east-1:012345678912:function:LayerStreaming',
222+
'faas.coldstart': true,
223+
'otel.kind': 'SERVER',
224+
},
225+
op: 'function.aws.lambda',
226+
origin: 'auto.otel.aws-lambda',
227+
span_id: expect.stringMatching(/[a-f0-9]{16}/),
228+
status: 'ok',
229+
trace_id: expect.stringMatching(/[a-f0-9]{32}/),
230+
});
231+
232+
expect(transactionEvent.spans).toHaveLength(1);
233+
234+
expect(transactionEvent.spans).toContainEqual(
235+
expect.objectContaining({
236+
data: expect.objectContaining({
237+
'sentry.op': 'test',
238+
'sentry.origin': 'manual',
239+
}),
240+
description: 'manual-span',
241+
op: 'test',
242+
}),
243+
);
244+
});
197245
});

packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts

Lines changed: 139 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// - Added Sentry `wrapHandler` around the OTel patch handler.
44
// - Cancel init when handler string is invalid (TS)
55
// - Hardcoded package version and name
6+
// - Added support for streaming handlers
67
/* eslint-disable */
78
/*
89
* Copyright The OpenTelemetry Authors
@@ -50,7 +51,7 @@ import {
5051
SEMRESATTRS_CLOUD_ACCOUNT_ID,
5152
SEMRESATTRS_FAAS_ID,
5253
} from '@opentelemetry/semantic-conventions';
53-
import type { APIGatewayProxyEventHeaders, Callback, Context, Handler } from 'aws-lambda';
54+
import type { APIGatewayProxyEventHeaders, Callback, Context, Handler, StreamifyHandler } from 'aws-lambda';
5455
import * as fs from 'fs';
5556
import * as path from 'path';
5657
import type { LambdaModule } from './internal-types';
@@ -73,6 +74,8 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
7374
};
7475

7576
export const lambdaMaxInitInMilliseconds = 10_000;
77+
const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming');
78+
const AWS_HANDLER_STREAMING_RESPONSE = 'response';
7679

7780
/**
7881
*
@@ -101,6 +104,18 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
101104
return [];
102105
}
103106

107+
// Provide a temporary awslambda polyfill for CommonJS modules during loading
108+
// This prevents ReferenceError when modules use awslambda.streamifyResponse at load time
109+
if (typeof globalThis.awslambda === 'undefined') {
110+
(globalThis as any).awslambda = {
111+
streamifyResponse: (handler: any) => {
112+
// Add the streaming symbols that the instrumentation looks for
113+
handler[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE;
114+
return handler;
115+
},
116+
};
117+
}
118+
104119
const handler = path.basename(handlerDef);
105120
const moduleRoot = handlerDef.substring(0, handlerDef.length - handler.length);
106121

@@ -187,16 +202,32 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
187202
/**
188203
*
189204
*/
190-
private _getHandler(handlerLoadStartTime: number) {
191-
return (original: Handler) => {
192-
return wrapHandler(this._getPatchHandler(original, handlerLoadStartTime));
205+
private _getHandler<T extends Handler | StreamifyHandler>(handlerLoadStartTime: number) {
206+
return (original: T): T => {
207+
if (this._isStreamingHandler(original)) {
208+
const patchedHandler = this._getPatchHandler(original, handlerLoadStartTime);
209+
210+
// Streaming handlers have special symbols that we need to copy over to the patched handler.
211+
for (const symbol of Object.getOwnPropertySymbols(original)) {
212+
(patchedHandler as unknown as Record<symbol, unknown>)[symbol] = (
213+
original as unknown as Record<symbol, unknown>
214+
)[symbol];
215+
}
216+
217+
return wrapHandler(patchedHandler) as T;
218+
}
219+
220+
return wrapHandler(this._getPatchHandler(original, handlerLoadStartTime)) as T;
193221
};
194222
}
195223

224+
private _getPatchHandler(original: Handler, lambdaStartTime: number): Handler;
225+
private _getPatchHandler(original: StreamifyHandler, lambdaStartTime: number): StreamifyHandler;
226+
196227
/**
197228
*
198229
*/
199-
private _getPatchHandler(original: Handler, lambdaStartTime: number) {
230+
private _getPatchHandler(original: Handler | StreamifyHandler, lambdaStartTime: number): Handler | StreamifyHandler {
200231
diag.debug('patch handler function');
201232
const plugin = this;
202233

@@ -229,6 +260,36 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
229260
}
230261
}
231262

263+
if (this._isStreamingHandler(original)) {
264+
return function patchedStreamingHandler(
265+
this: never,
266+
// The event can be a user type, it truly is any.
267+
event: any,
268+
responseStream: Parameters<StreamifyHandler>[1],
269+
context: Context,
270+
) {
271+
_onRequest();
272+
const parent = plugin._determineParent(event, context);
273+
const span = plugin._createSpanForRequest(event, context, requestIsColdStart, parent);
274+
plugin._applyRequestHook(span, event, context);
275+
276+
return otelContext.with(trace.setSpan(parent, span), () => {
277+
const maybePromise = safeExecuteInTheMiddle(
278+
() => original.apply(this, [event, responseStream, context]),
279+
error => {
280+
if (error != null) {
281+
// Exception thrown synchronously before resolving promise.
282+
plugin._applyResponseHook(span, error);
283+
plugin._endSpan(span, error, () => {});
284+
}
285+
},
286+
) as Promise<{}> | undefined;
287+
288+
return plugin._handlePromiseResult(span, maybePromise);
289+
});
290+
};
291+
}
292+
232293
return function patchedHandler(
233294
this: never,
234295
// The event can be a user type, it truly is any.
@@ -239,39 +300,10 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
239300
) {
240301
_onRequest();
241302

242-
const config = plugin.getConfig();
243-
const parent = AwsLambdaInstrumentation._determineParent(
244-
event,
245-
context,
246-
config.eventContextExtractor || AwsLambdaInstrumentation._defaultEventContextExtractor,
247-
);
248-
249-
const name = context.functionName;
250-
const span = plugin.tracer.startSpan(
251-
name,
252-
{
253-
kind: SpanKind.SERVER,
254-
attributes: {
255-
[SEMATTRS_FAAS_EXECUTION]: context.awsRequestId,
256-
[SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn,
257-
[SEMRESATTRS_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(context.invokedFunctionArn),
258-
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
259-
...AwsLambdaInstrumentation._extractOtherEventFields(event),
260-
},
261-
},
262-
parent,
263-
);
303+
const parent = plugin._determineParent(event, context);
264304

265-
const { requestHook } = config;
266-
if (requestHook) {
267-
safeExecuteInTheMiddle(
268-
() => requestHook(span, { event, context }),
269-
e => {
270-
if (e) diag.error('aws-lambda instrumentation: requestHook error', e);
271-
},
272-
true,
273-
);
274-
}
305+
const span = plugin._createSpanForRequest(event, context, requestIsColdStart, parent);
306+
plugin._applyRequestHook(span, event, context);
275307

276308
return otelContext.with(trace.setSpan(parent, span), () => {
277309
// Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling
@@ -289,23 +321,80 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
289321
}
290322
},
291323
) as Promise<{}> | undefined;
292-
if (typeof maybePromise?.then === 'function') {
293-
return maybePromise.then(
294-
value => {
295-
plugin._applyResponseHook(span, null, value);
296-
return new Promise(resolve => plugin._endSpan(span, undefined, () => resolve(value)));
297-
},
298-
(err: Error | string) => {
299-
plugin._applyResponseHook(span, err);
300-
return new Promise((resolve, reject) => plugin._endSpan(span, err, () => reject(err)));
301-
},
302-
);
303-
}
304-
return maybePromise;
324+
325+
return plugin._handlePromiseResult(span, maybePromise);
305326
});
306327
};
307328
}
308329

330+
private _createSpanForRequest(event: any, context: Context, requestIsColdStart: boolean, parent: OtelContext): Span {
331+
const name = context.functionName;
332+
return this.tracer.startSpan(
333+
name,
334+
{
335+
kind: SpanKind.SERVER,
336+
attributes: {
337+
[SEMATTRS_FAAS_EXECUTION]: context.awsRequestId,
338+
[SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn,
339+
[SEMRESATTRS_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(context.invokedFunctionArn),
340+
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
341+
...AwsLambdaInstrumentation._extractOtherEventFields(event),
342+
},
343+
},
344+
parent,
345+
);
346+
}
347+
348+
private _applyRequestHook(span: Span, event: any, context: Context): void {
349+
const { requestHook } = this.getConfig();
350+
if (requestHook) {
351+
safeExecuteInTheMiddle(
352+
() => requestHook(span, { event, context }),
353+
e => {
354+
if (e) diag.error('aws-lambda instrumentation: requestHook error', e);
355+
},
356+
true,
357+
);
358+
}
359+
}
360+
361+
private _handlePromiseResult(span: Span, maybePromise: Promise<{}> | undefined): Promise<{}> | undefined {
362+
if (typeof maybePromise?.then === 'function') {
363+
return maybePromise.then(
364+
value => {
365+
this._applyResponseHook(span, null, value);
366+
return new Promise(resolve => this._endSpan(span, undefined, () => resolve(value)));
367+
},
368+
(err: Error | string) => {
369+
this._applyResponseHook(span, err);
370+
return new Promise((resolve, reject) => this._endSpan(span, err, () => reject(err)));
371+
},
372+
);
373+
}
374+
375+
// Handle synchronous return values by ending the span and applying response hook
376+
this._applyResponseHook(span, null, maybePromise);
377+
this._endSpan(span, undefined, () => {});
378+
return maybePromise;
379+
}
380+
381+
private _determineParent(event: any, context: Context): OtelContext {
382+
const config = this.getConfig();
383+
return AwsLambdaInstrumentation._determineParent(
384+
event,
385+
context,
386+
config.eventContextExtractor || AwsLambdaInstrumentation._defaultEventContextExtractor,
387+
);
388+
}
389+
390+
private _isStreamingHandler<TEvent, TResult>(
391+
handler: Handler<TEvent, TResult> | StreamifyHandler<TEvent, TResult>,
392+
): handler is StreamifyHandler<TEvent, TResult> {
393+
return (
394+
(handler as unknown as Record<symbol, unknown>)[AWS_HANDLER_STREAMING_SYMBOL] === AWS_HANDLER_STREAMING_RESPONSE
395+
);
396+
}
397+
309398
/**
310399
*
311400
*/

0 commit comments

Comments
 (0)