3
3
// - Added Sentry `wrapHandler` around the OTel patch handler.
4
4
// - Cancel init when handler string is invalid (TS)
5
5
// - Hardcoded package version and name
6
+ // - Added support for streaming handlers
6
7
/* eslint-disable */
7
8
/*
8
9
* Copyright The OpenTelemetry Authors
@@ -50,7 +51,7 @@ import {
50
51
SEMRESATTRS_CLOUD_ACCOUNT_ID ,
51
52
SEMRESATTRS_FAAS_ID ,
52
53
} from '@opentelemetry/semantic-conventions' ;
53
- import type { APIGatewayProxyEventHeaders , Callback , Context , Handler } from 'aws-lambda' ;
54
+ import type { APIGatewayProxyEventHeaders , Callback , Context , Handler , StreamifyHandler } from 'aws-lambda' ;
54
55
import * as fs from 'fs' ;
55
56
import * as path from 'path' ;
56
57
import type { LambdaModule } from './internal-types' ;
@@ -73,6 +74,8 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
73
74
} ;
74
75
75
76
export const lambdaMaxInitInMilliseconds = 10_000 ;
77
+ const AWS_HANDLER_STREAMING_SYMBOL = Symbol . for ( 'aws.lambda.runtime.handler.streaming' ) ;
78
+ const AWS_HANDLER_STREAMING_RESPONSE = 'response' ;
76
79
77
80
/**
78
81
*
@@ -101,6 +104,18 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
101
104
return [ ] ;
102
105
}
103
106
107
+ // Provide a temporary awslambda polyfill for CommonJS modules during loading
108
+ // This prevents ReferenceError when modules use awslambda.streamifyResponse at load time
109
+ if ( typeof globalThis . awslambda === 'undefined' ) {
110
+ ( globalThis as any ) . awslambda = {
111
+ streamifyResponse : ( handler : any ) => {
112
+ // Add the streaming symbols that the instrumentation looks for
113
+ handler [ AWS_HANDLER_STREAMING_SYMBOL ] = AWS_HANDLER_STREAMING_RESPONSE ;
114
+ return handler ;
115
+ } ,
116
+ } ;
117
+ }
118
+
104
119
const handler = path . basename ( handlerDef ) ;
105
120
const moduleRoot = handlerDef . substring ( 0 , handlerDef . length - handler . length ) ;
106
121
@@ -187,16 +202,32 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
187
202
/**
188
203
*
189
204
*/
190
- private _getHandler ( handlerLoadStartTime : number ) {
191
- return ( original : Handler ) => {
192
- return wrapHandler ( this . _getPatchHandler ( original , handlerLoadStartTime ) ) ;
205
+ private _getHandler < T extends Handler | StreamifyHandler > ( handlerLoadStartTime : number ) {
206
+ return ( original : T ) : T => {
207
+ if ( this . _isStreamingHandler ( original ) ) {
208
+ const patchedHandler = this . _getPatchHandler ( original , handlerLoadStartTime ) ;
209
+
210
+ // Streaming handlers have special symbols that we need to copy over to the patched handler.
211
+ for ( const symbol of Object . getOwnPropertySymbols ( original ) ) {
212
+ ( patchedHandler as unknown as Record < symbol , unknown > ) [ symbol ] = (
213
+ original as unknown as Record < symbol , unknown >
214
+ ) [ symbol ] ;
215
+ }
216
+
217
+ return wrapHandler ( patchedHandler ) as T ;
218
+ }
219
+
220
+ return wrapHandler ( this . _getPatchHandler ( original , handlerLoadStartTime ) ) as T ;
193
221
} ;
194
222
}
195
223
224
+ private _getPatchHandler ( original : Handler , lambdaStartTime : number ) : Handler ;
225
+ private _getPatchHandler ( original : StreamifyHandler , lambdaStartTime : number ) : StreamifyHandler ;
226
+
196
227
/**
197
228
*
198
229
*/
199
- private _getPatchHandler ( original : Handler , lambdaStartTime : number ) {
230
+ private _getPatchHandler ( original : Handler | StreamifyHandler , lambdaStartTime : number ) : Handler | StreamifyHandler {
200
231
diag . debug ( 'patch handler function' ) ;
201
232
const plugin = this ;
202
233
@@ -229,6 +260,36 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
229
260
}
230
261
}
231
262
263
+ if ( this . _isStreamingHandler ( original ) ) {
264
+ return function patchedStreamingHandler (
265
+ this : never ,
266
+ // The event can be a user type, it truly is any.
267
+ event : any ,
268
+ responseStream : Parameters < StreamifyHandler > [ 1 ] ,
269
+ context : Context ,
270
+ ) {
271
+ _onRequest ( ) ;
272
+ const parent = plugin . _determineParent ( event , context ) ;
273
+ const span = plugin . _createSpanForRequest ( event , context , requestIsColdStart , parent ) ;
274
+ plugin . _applyRequestHook ( span , event , context ) ;
275
+
276
+ return otelContext . with ( trace . setSpan ( parent , span ) , ( ) => {
277
+ const maybePromise = safeExecuteInTheMiddle (
278
+ ( ) => original . apply ( this , [ event , responseStream , context ] ) ,
279
+ error => {
280
+ if ( error != null ) {
281
+ // Exception thrown synchronously before resolving promise.
282
+ plugin . _applyResponseHook ( span , error ) ;
283
+ plugin . _endSpan ( span , error , ( ) => { } ) ;
284
+ }
285
+ } ,
286
+ ) as Promise < { } > | undefined ;
287
+
288
+ return plugin . _handlePromiseResult ( span , maybePromise ) ;
289
+ } ) ;
290
+ } ;
291
+ }
292
+
232
293
return function patchedHandler (
233
294
this : never ,
234
295
// The event can be a user type, it truly is any.
@@ -239,39 +300,10 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
239
300
) {
240
301
_onRequest ( ) ;
241
302
242
- const config = plugin . getConfig ( ) ;
243
- const parent = AwsLambdaInstrumentation . _determineParent (
244
- event ,
245
- context ,
246
- config . eventContextExtractor || AwsLambdaInstrumentation . _defaultEventContextExtractor ,
247
- ) ;
248
-
249
- const name = context . functionName ;
250
- const span = plugin . tracer . startSpan (
251
- name ,
252
- {
253
- kind : SpanKind . SERVER ,
254
- attributes : {
255
- [ SEMATTRS_FAAS_EXECUTION ] : context . awsRequestId ,
256
- [ SEMRESATTRS_FAAS_ID ] : context . invokedFunctionArn ,
257
- [ SEMRESATTRS_CLOUD_ACCOUNT_ID ] : AwsLambdaInstrumentation . _extractAccountId ( context . invokedFunctionArn ) ,
258
- [ ATTR_FAAS_COLDSTART ] : requestIsColdStart ,
259
- ...AwsLambdaInstrumentation . _extractOtherEventFields ( event ) ,
260
- } ,
261
- } ,
262
- parent ,
263
- ) ;
303
+ const parent = plugin . _determineParent ( event , context ) ;
264
304
265
- const { requestHook } = config ;
266
- if ( requestHook ) {
267
- safeExecuteInTheMiddle (
268
- ( ) => requestHook ( span , { event, context } ) ,
269
- e => {
270
- if ( e ) diag . error ( 'aws-lambda instrumentation: requestHook error' , e ) ;
271
- } ,
272
- true ,
273
- ) ;
274
- }
305
+ const span = plugin . _createSpanForRequest ( event , context , requestIsColdStart , parent ) ;
306
+ plugin . _applyRequestHook ( span , event , context ) ;
275
307
276
308
return otelContext . with ( trace . setSpan ( parent , span ) , ( ) => {
277
309
// Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling
@@ -289,23 +321,80 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
289
321
}
290
322
} ,
291
323
) as Promise < { } > | undefined ;
292
- if ( typeof maybePromise ?. then === 'function' ) {
293
- return maybePromise . then (
294
- value => {
295
- plugin . _applyResponseHook ( span , null , value ) ;
296
- return new Promise ( resolve => plugin . _endSpan ( span , undefined , ( ) => resolve ( value ) ) ) ;
297
- } ,
298
- ( err : Error | string ) => {
299
- plugin . _applyResponseHook ( span , err ) ;
300
- return new Promise ( ( resolve , reject ) => plugin . _endSpan ( span , err , ( ) => reject ( err ) ) ) ;
301
- } ,
302
- ) ;
303
- }
304
- return maybePromise ;
324
+
325
+ return plugin . _handlePromiseResult ( span , maybePromise ) ;
305
326
} ) ;
306
327
} ;
307
328
}
308
329
330
+ private _createSpanForRequest ( event : any , context : Context , requestIsColdStart : boolean , parent : OtelContext ) : Span {
331
+ const name = context . functionName ;
332
+ return this . tracer . startSpan (
333
+ name ,
334
+ {
335
+ kind : SpanKind . SERVER ,
336
+ attributes : {
337
+ [ SEMATTRS_FAAS_EXECUTION ] : context . awsRequestId ,
338
+ [ SEMRESATTRS_FAAS_ID ] : context . invokedFunctionArn ,
339
+ [ SEMRESATTRS_CLOUD_ACCOUNT_ID ] : AwsLambdaInstrumentation . _extractAccountId ( context . invokedFunctionArn ) ,
340
+ [ ATTR_FAAS_COLDSTART ] : requestIsColdStart ,
341
+ ...AwsLambdaInstrumentation . _extractOtherEventFields ( event ) ,
342
+ } ,
343
+ } ,
344
+ parent ,
345
+ ) ;
346
+ }
347
+
348
+ private _applyRequestHook ( span : Span , event : any , context : Context ) : void {
349
+ const { requestHook } = this . getConfig ( ) ;
350
+ if ( requestHook ) {
351
+ safeExecuteInTheMiddle (
352
+ ( ) => requestHook ( span , { event, context } ) ,
353
+ e => {
354
+ if ( e ) diag . error ( 'aws-lambda instrumentation: requestHook error' , e ) ;
355
+ } ,
356
+ true ,
357
+ ) ;
358
+ }
359
+ }
360
+
361
+ private _handlePromiseResult ( span : Span , maybePromise : Promise < { } > | undefined ) : Promise < { } > | undefined {
362
+ if ( typeof maybePromise ?. then === 'function' ) {
363
+ return maybePromise . then (
364
+ value => {
365
+ this . _applyResponseHook ( span , null , value ) ;
366
+ return new Promise ( resolve => this . _endSpan ( span , undefined , ( ) => resolve ( value ) ) ) ;
367
+ } ,
368
+ ( err : Error | string ) => {
369
+ this . _applyResponseHook ( span , err ) ;
370
+ return new Promise ( ( resolve , reject ) => this . _endSpan ( span , err , ( ) => reject ( err ) ) ) ;
371
+ } ,
372
+ ) ;
373
+ }
374
+
375
+ // Handle synchronous return values by ending the span and applying response hook
376
+ this . _applyResponseHook ( span , null , maybePromise ) ;
377
+ this . _endSpan ( span , undefined , ( ) => { } ) ;
378
+ return maybePromise ;
379
+ }
380
+
381
+ private _determineParent ( event : any , context : Context ) : OtelContext {
382
+ const config = this . getConfig ( ) ;
383
+ return AwsLambdaInstrumentation . _determineParent (
384
+ event ,
385
+ context ,
386
+ config . eventContextExtractor || AwsLambdaInstrumentation . _defaultEventContextExtractor ,
387
+ ) ;
388
+ }
389
+
390
+ private _isStreamingHandler < TEvent , TResult > (
391
+ handler : Handler < TEvent , TResult > | StreamifyHandler < TEvent , TResult > ,
392
+ ) : handler is StreamifyHandler < TEvent , TResult > {
393
+ return (
394
+ ( handler as unknown as Record < symbol , unknown > ) [ AWS_HANDLER_STREAMING_SYMBOL ] === AWS_HANDLER_STREAMING_RESPONSE
395
+ ) ;
396
+ }
397
+
309
398
/**
310
399
*
311
400
*/
0 commit comments