Skip to content

Commit cf9e2f5

Browse files
authored
feat(instrumentation-aws-lambda): support streaming handlers (#2970)
1 parent 09d57b4 commit cf9e2f5

File tree

4 files changed

+672
-54
lines changed

4 files changed

+672
-54
lines changed

packages/instrumentation-aws-lambda/src/instrumentation.ts

Lines changed: 156 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import {
4848
Callback,
4949
Context,
5050
Handler,
51+
StreamifyHandler,
5152
} from 'aws-lambda';
5253

5354
import { AwsLambdaInstrumentationConfig, EventContextExtractor } from './types';
@@ -65,6 +66,10 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
6566
};
6667

6768
export const lambdaMaxInitInMilliseconds = 10_000;
69+
export const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for(
70+
'aws.lambda.runtime.handler.streaming'
71+
);
72+
export const AWS_HANDLER_STREAMING_RESPONSE = 'response';
6873

6974
export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstrumentationConfig> {
7075
private declare _traceForceFlusher?: () => Promise<void>;
@@ -171,12 +176,31 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
171176
}
172177

173178
private _getHandler(handlerLoadStartTime: number) {
174-
return (original: Handler) => {
175-
return this._getPatchHandler(original, handlerLoadStartTime);
179+
return (
180+
original: Handler | StreamifyHandler
181+
): Handler | StreamifyHandler => {
182+
const patchedHandler = this._getPatchHandler(
183+
original,
184+
handlerLoadStartTime
185+
);
186+
187+
if (this._isStreamingHandler(original)) {
188+
// Streaming handlers have special symbols that we need to copy over to the patched handler.
189+
for (const symbol of Object.getOwnPropertySymbols(original)) {
190+
(patchedHandler as unknown as Record<symbol, unknown>)[symbol] = (
191+
original as unknown as Record<symbol, unknown>
192+
)[symbol];
193+
}
194+
}
195+
196+
return patchedHandler;
176197
};
177198
}
178199

179-
private _getPatchHandler(original: Handler, lambdaStartTime: number) {
200+
private _getPatchHandler(
201+
original: Handler | StreamifyHandler,
202+
lambdaStartTime: number
203+
): Handler | StreamifyHandler {
180204
diag.debug('patch handler function');
181205
const plugin = this;
182206

@@ -211,6 +235,43 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
211235
}
212236
}
213237

238+
if (this._isStreamingHandler(original)) {
239+
return function patchedStreamingHandler(
240+
this: never,
241+
// The event can be a user type, it truly is any.
242+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
243+
event: any,
244+
responseStream: Parameters<StreamifyHandler>[1],
245+
context: Context
246+
) {
247+
_onRequest();
248+
249+
const parent = plugin._determineParent(event, context);
250+
const span = plugin._createSpanForRequest(
251+
event,
252+
context,
253+
requestIsColdStart,
254+
parent
255+
);
256+
plugin._applyRequestHook(span, event, context);
257+
258+
return otelContext.with(trace.setSpan(parent, span), () => {
259+
const maybePromise = safeExecuteInTheMiddle(
260+
() => original.apply(this, [event, responseStream, context]),
261+
error => {
262+
if (error != null) {
263+
// Exception thrown synchronously before resolving promise.
264+
plugin._applyResponseHook(span, error);
265+
plugin._endSpan(span, error, () => {});
266+
}
267+
}
268+
) as Promise<{}> | undefined;
269+
270+
return plugin._handlePromiseResult(span, maybePromise);
271+
});
272+
};
273+
}
274+
214275
return function patchedHandler(
215276
this: never,
216277
// The event can be a user type, it truly is any.
@@ -221,43 +282,16 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
221282
) {
222283
_onRequest();
223284

224-
const config = plugin.getConfig();
225-
const parent = AwsLambdaInstrumentation._determineParent(
285+
const parent = plugin._determineParent(event, context);
286+
287+
const span = plugin._createSpanForRequest(
226288
event,
227289
context,
228-
config.eventContextExtractor ||
229-
AwsLambdaInstrumentation._defaultEventContextExtractor
230-
);
231-
232-
const name = context.functionName;
233-
const span = plugin.tracer.startSpan(
234-
name,
235-
{
236-
kind: SpanKind.SERVER,
237-
attributes: {
238-
[ATTR_FAAS_EXECUTION]: context.awsRequestId,
239-
[ATTR_FAAS_ID]: context.invokedFunctionArn,
240-
[ATTR_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(
241-
context.invokedFunctionArn
242-
),
243-
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
244-
...AwsLambdaInstrumentation._extractOtherEventFields(event),
245-
},
246-
},
290+
requestIsColdStart,
247291
parent
248292
);
249293

250-
const { requestHook } = config;
251-
if (requestHook) {
252-
safeExecuteInTheMiddle(
253-
() => requestHook(span, { event, context }),
254-
e => {
255-
if (e)
256-
diag.error('aws-lambda instrumentation: requestHook error', e);
257-
},
258-
true
259-
);
260-
}
294+
plugin._applyRequestHook(span, event, context);
261295

262296
return otelContext.with(trace.setSpan(parent, span), () => {
263297
// Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling
@@ -275,27 +309,97 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
275309
}
276310
}
277311
) as Promise<{}> | undefined;
278-
if (typeof maybePromise?.then === 'function') {
279-
return maybePromise.then(
280-
value => {
281-
plugin._applyResponseHook(span, null, value);
282-
return new Promise(resolve =>
283-
plugin._endSpan(span, undefined, () => resolve(value))
284-
);
285-
},
286-
(err: Error | string) => {
287-
plugin._applyResponseHook(span, err);
288-
return new Promise((resolve, reject) =>
289-
plugin._endSpan(span, err, () => reject(err))
290-
);
291-
}
292-
);
293-
}
294-
return maybePromise;
312+
313+
return plugin._handlePromiseResult(span, maybePromise);
295314
});
296315
};
297316
}
298317

318+
private _createSpanForRequest(
319+
event: any,
320+
context: Context,
321+
requestIsColdStart: boolean,
322+
parent: OtelContext
323+
): Span {
324+
const name = context.functionName;
325+
return this.tracer.startSpan(
326+
name,
327+
{
328+
kind: SpanKind.SERVER,
329+
attributes: {
330+
[ATTR_FAAS_EXECUTION]: context.awsRequestId,
331+
[ATTR_FAAS_ID]: context.invokedFunctionArn,
332+
[ATTR_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(
333+
context.invokedFunctionArn
334+
),
335+
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
336+
...AwsLambdaInstrumentation._extractOtherEventFields(event),
337+
},
338+
},
339+
parent
340+
);
341+
}
342+
343+
private _applyRequestHook(span: Span, event: any, context: Context): void {
344+
const { requestHook } = this.getConfig();
345+
if (requestHook) {
346+
safeExecuteInTheMiddle(
347+
() => requestHook(span, { event, context }),
348+
e => {
349+
if (e) diag.error('aws-lambda instrumentation: requestHook error', e);
350+
},
351+
true
352+
);
353+
}
354+
}
355+
356+
private _handlePromiseResult(
357+
span: Span,
358+
maybePromise: Promise<{}> | undefined
359+
): Promise<{}> | undefined {
360+
if (typeof maybePromise?.then === 'function') {
361+
return maybePromise.then(
362+
value => {
363+
this._applyResponseHook(span, null, value);
364+
return new Promise(resolve =>
365+
this._endSpan(span, undefined, () => resolve(value))
366+
);
367+
},
368+
(err: Error | string) => {
369+
this._applyResponseHook(span, err);
370+
return new Promise((resolve, reject) =>
371+
this._endSpan(span, err, () => reject(err))
372+
);
373+
}
374+
);
375+
}
376+
377+
// Handle synchronous return values by ending the span and applying response hook
378+
this._applyResponseHook(span, null, maybePromise);
379+
this._endSpan(span, undefined, () => {});
380+
return maybePromise;
381+
}
382+
383+
private _determineParent(event: any, context: Context): OtelContext {
384+
const config = this.getConfig();
385+
return AwsLambdaInstrumentation._determineParent(
386+
event,
387+
context,
388+
config.eventContextExtractor ||
389+
AwsLambdaInstrumentation._defaultEventContextExtractor
390+
);
391+
}
392+
393+
private _isStreamingHandler<TEvent, TResult>(
394+
handler: Handler<TEvent, TResult> | StreamifyHandler<TEvent, TResult>
395+
): handler is StreamifyHandler<TEvent, TResult> {
396+
return (
397+
(handler as unknown as Record<symbol, unknown>)[
398+
AWS_HANDLER_STREAMING_SYMBOL
399+
] === AWS_HANDLER_STREAMING_RESPONSE
400+
);
401+
}
402+
299403
override setTracerProvider(tracerProvider: TracerProvider) {
300404
super.setTracerProvider(tracerProvider);
301405
this._traceForceFlusher = this._traceForceFlush(tracerProvider);

packages/instrumentation-aws-lambda/src/internal-types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
import { Handler } from 'aws-lambda';
16+
import { Handler, StreamifyHandler } from 'aws-lambda';
1717

18-
export type LambdaModule = Record<string, Handler>;
18+
export type LambdaModule = Record<string, Handler | StreamifyHandler>;

0 commit comments

Comments
 (0)