@@ -73,18 +73,15 @@ import { OTLPUdpSpanExporter } from './otlp-udp-exporter';
73
73
import { AwsXRayRemoteSampler } from './sampler/aws-xray-remote-sampler' ;
74
74
// This file is generated via `npm run compile`
75
75
import { LIB_VERSION } from './version' ;
76
+ import { AWSCloudWatchEMFExporter } from './exporter/aws/metrics/aws-cloudwatch-emf-exporter' ;
76
77
import { OTLPAwsLogExporter } from './exporter/otlp/aws/logs/otlp-aws-log-exporter' ;
77
-
78
78
import { isAgentObservabilityEnabled } from './utils' ;
79
79
import { BaggageSpanProcessor } from '@opentelemetry/baggage-span-processor' ;
80
80
import { logs } from '@opentelemetry/api-logs' ;
81
81
82
82
const AWS_TRACES_OTLP_ENDPOINT_PATTERN = '^https://xray\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/traces$' ;
83
83
const AWS_LOGS_OTLP_ENDPOINT_PATTERN = '^https://logs\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/logs$' ;
84
84
85
- const AWS_OTLP_LOGS_GROUP_HEADER = 'x-aws-log-group' ;
86
- const AWS_OTLP_LOGS_STREAM_HEADER = 'x-aws-log-stream' ;
87
-
88
85
const APPLICATION_SIGNALS_ENABLED_CONFIG : string = 'OTEL_AWS_APPLICATION_SIGNALS_ENABLED' ;
89
86
const APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG : string = 'OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT' ;
90
87
const METRIC_EXPORT_INTERVAL_CONFIG : string = 'OTEL_METRIC_EXPORT_INTERVAL' ;
@@ -99,6 +96,17 @@ const FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = 'T1U';
99
96
const LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10 ;
100
97
export const LAMBDA_APPLICATION_SIGNALS_REMOTE_ENVIRONMENT : string = 'LAMBDA_APPLICATION_SIGNALS_REMOTE_ENVIRONMENT' ;
101
98
99
+ const AWS_OTLP_LOGS_GROUP_HEADER = 'x-aws-log-group' ;
100
+ const AWS_OTLP_LOGS_STREAM_HEADER = 'x-aws-log-stream' ;
101
+ const AWS_EMF_METRICS_NAMESPACE = 'x-aws-metric-namespace' ;
102
+
103
+ interface OtlpLogHeaderSetting {
104
+ logGroup ?: string ;
105
+ logStream ?: string ;
106
+ namespace ?: string ;
107
+ isValid : boolean ;
108
+ }
109
+
102
110
/**
103
111
* Aws Application Signals Config Provider creates a configuration object that can be provided to
104
112
* the OTel NodeJS SDK for Auto Instrumentation with Application Signals Functionality.
@@ -120,6 +128,7 @@ export class AwsOpentelemetryConfigurator {
120
128
private spanProcessors : SpanProcessor [ ] ;
121
129
private logRecordProcessors : LogRecordProcessor [ ] ;
122
130
private propagator : TextMapPropagator ;
131
+ private metricReader : PeriodicExportingMetricReader | undefined ;
123
132
124
133
/**
125
134
* The constructor will setup the AwsOpentelemetryConfigurator object to be able to provide a
@@ -204,6 +213,9 @@ export class AwsOpentelemetryConfigurator {
204
213
this . spanProcessors = awsSpanProcessorProvider . getSpanProcessors ( ) ;
205
214
this . logRecordProcessors = AwsLoggerProcessorProvider . getlogRecordProcessors ( ) ;
206
215
AwsOpentelemetryConfigurator . customizeSpanProcessors ( this . spanProcessors , this . resource ) ;
216
+
217
+ const isEmfEnabled = checkEmfExporterEnabled ( ) ;
218
+ this . customizeMetricReader ( isEmfEnabled ) ;
207
219
}
208
220
209
221
private customizeVersions ( autoResource : Resource ) : Resource {
@@ -236,6 +248,10 @@ export class AwsOpentelemetryConfigurator {
236
248
textMapPropagator : this . propagator ,
237
249
} ;
238
250
251
+ if ( this . metricReader ) {
252
+ config . metricReader = this . metricReader ;
253
+ }
254
+
239
255
return config ;
240
256
}
241
257
@@ -248,6 +264,20 @@ export class AwsOpentelemetryConfigurator {
248
264
return isApplicationSignalsEnabled . toLowerCase ( ) === 'true' ;
249
265
}
250
266
267
+ static geMetricExportInterval ( ) : number {
268
+ let exportIntervalMillis : number = Number ( process . env [ METRIC_EXPORT_INTERVAL_CONFIG ] ) ;
269
+ diag . debug ( `AWS Application Signals Metrics export interval: ${ exportIntervalMillis } ` ) ;
270
+
271
+ // Cap export interval to 60 seconds. This is currently required for metrics-trace correlation to work correctly.
272
+ if ( isNaN ( exportIntervalMillis ) || exportIntervalMillis . valueOf ( ) > DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS ) {
273
+ exportIntervalMillis = DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS ;
274
+
275
+ diag . info ( `AWS Application Signals metrics export interval capped to ${ exportIntervalMillis } ` ) ;
276
+ }
277
+
278
+ return exportIntervalMillis ;
279
+ }
280
+
251
281
static exportUnsampledSpanForAgentObservability ( spanProcessors : SpanProcessor [ ] , resource : Resource ) : void {
252
282
if ( ! isAgentObservabilityEnabled ( ) ) {
253
283
return ;
@@ -296,22 +326,13 @@ export class AwsOpentelemetryConfigurator {
296
326
297
327
diag . info ( 'AWS Application Signals enabled.' ) ;
298
328
299
- let exportIntervalMillis : number = Number ( process . env [ METRIC_EXPORT_INTERVAL_CONFIG ] ) ;
300
- diag . debug ( `AWS Application Signals Metrics export interval: ${ exportIntervalMillis } ` ) ;
301
-
302
- if ( isNaN ( exportIntervalMillis ) || exportIntervalMillis . valueOf ( ) > DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS ) {
303
- exportIntervalMillis = DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS ;
304
-
305
- diag . info ( `AWS Application Signals metrics export interval capped to ${ exportIntervalMillis } ` ) ;
306
- }
307
-
308
329
spanProcessors . push ( AttributePropagatingSpanProcessorBuilder . create ( ) . build ( ) ) ;
309
330
310
331
const applicationSignalsMetricExporter : PushMetricExporter =
311
332
ApplicationSignalsExporterProvider . Instance . createExporter ( ) ;
312
333
const periodicExportingMetricReader : PeriodicExportingMetricReader = new PeriodicExportingMetricReader ( {
313
334
exporter : applicationSignalsMetricExporter ,
314
- exportIntervalMillis : exportIntervalMillis ,
335
+ exportIntervalMillis : AwsOpentelemetryConfigurator . geMetricExportInterval ( ) ,
315
336
} ) ;
316
337
317
338
// Register BatchUnsampledSpanProcessor to export unsampled traces in Lambda
@@ -347,6 +368,18 @@ export class AwsOpentelemetryConfigurator {
347
368
}
348
369
}
349
370
371
+ private customizeMetricReader ( isEmfEnabled : boolean ) {
372
+ if ( isEmfEnabled ) {
373
+ const emfExporter = createEmfExporter ( ) ;
374
+ if ( emfExporter ) {
375
+ const periodicExportingMetricReader = new PeriodicExportingMetricReader ( {
376
+ exporter : emfExporter ,
377
+ } ) ;
378
+ this . metricReader = periodicExportingMetricReader ;
379
+ }
380
+ }
381
+ }
382
+
350
383
static customizeSampler ( sampler : Sampler ) : Sampler {
351
384
if ( AwsOpentelemetryConfigurator . isApplicationSignalsEnabled ( ) ) {
352
385
return AlwaysRecordSampler . create ( sampler ) ;
@@ -517,7 +550,7 @@ export class AwsLoggerProcessorProvider {
517
550
if (
518
551
otlpExporterLogsEndpoint &&
519
552
isAwsOtlpEndpoint ( otlpExporterLogsEndpoint , 'logs' ) &&
520
- validateLogsHeaders ( )
553
+ validateAndFetchLogsHeader ( ) . isValid
521
554
) {
522
555
diag . debug ( 'Detected CloudWatch Logs OTLP endpoint. Switching exporter to OTLPAwsLogExporter' ) ;
523
556
exporters . push (
@@ -538,7 +571,7 @@ export class AwsLoggerProcessorProvider {
538
571
if (
539
572
otlpExporterLogsEndpoint &&
540
573
isAwsOtlpEndpoint ( otlpExporterLogsEndpoint , 'logs' ) &&
541
- validateLogsHeaders ( )
574
+ validateAndFetchLogsHeader ( ) . isValid
542
575
) {
543
576
diag . debug ( 'Detected CloudWatch Logs OTLP endpoint. Switching exporter to OTLPAwsLogExporter' ) ;
544
577
exporters . push (
@@ -857,6 +890,8 @@ function getSamplerProbabilityFromEnv(environment: Required<ENVIRONMENT>): numbe
857
890
return probability ;
858
891
}
859
892
893
+ // END The OpenTelemetry Authors code
894
+
860
895
function getSpanExportBatchSize ( ) {
861
896
if ( isLambdaEnvironment ( ) ) {
862
897
return LAMBDA_SPAN_EXPORT_BATCH_SIZE ;
@@ -880,8 +915,7 @@ function getXrayDaemonEndpoint() {
880
915
/**
881
916
* Determines if the given endpoint is either the AWS OTLP Traces or Logs endpoint.
882
917
*/
883
-
884
- function isAwsOtlpEndpoint ( otlpEndpoint : string , service : string ) : boolean {
918
+ export function isAwsOtlpEndpoint ( otlpEndpoint : string , service : string ) : boolean {
885
919
let pattern = '' ;
886
920
if ( service === 'xray' ) {
887
921
pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN ;
@@ -898,40 +932,98 @@ function isAwsOtlpEndpoint(otlpEndpoint: string, service: string): boolean {
898
932
* Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
899
933
* AWS OTLP Logs endpoint.
900
934
*/
901
- function validateLogsHeaders ( ) {
902
- const logsHeaders = process . env [ ' OTEL_EXPORTER_OTLP_LOGS_HEADERS' ] ;
935
+ export function validateAndFetchLogsHeader ( ) : OtlpLogHeaderSetting {
936
+ const logHeaders = process . env . OTEL_EXPORTER_OTLP_LOGS_HEADERS ;
903
937
904
- if ( ! logsHeaders ) {
938
+ if ( ! logHeaders ) {
905
939
diag . warn (
906
940
'Missing required configuration: The environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS must be set with ' +
907
941
`required headers ${ AWS_OTLP_LOGS_GROUP_HEADER } and ${ AWS_OTLP_LOGS_STREAM_HEADER } . ` +
908
942
`Example: OTEL_EXPORTER_OTLP_LOGS_HEADERS="${ AWS_OTLP_LOGS_GROUP_HEADER } =my-log-group,${ AWS_OTLP_LOGS_STREAM_HEADER } =my-log-stream"`
909
943
) ;
910
- return false ;
944
+ return {
945
+ logGroup : '' ,
946
+ logStream : '' ,
947
+ namespace : '' ,
948
+ isValid : false ,
949
+ } ;
911
950
}
912
951
913
- let hasLogGroup = false ;
914
- let hasLogStream = false ;
952
+ let logGroup : string | undefined = undefined ;
953
+ let logStream : string | undefined = undefined ;
954
+ let namespace : string | undefined = undefined ;
955
+ let filteredLogHeadersCount : number = 0 ;
956
+
957
+ for ( const pair of logHeaders . split ( ',' ) ) {
958
+ const splitIndex = pair . indexOf ( '=' ) ;
959
+ if ( splitIndex > - 1 ) {
960
+ const key = pair . substring ( 0 , splitIndex ) ;
961
+ const value = pair . substring ( splitIndex + 1 ) ;
915
962
916
- for ( const pair of logsHeaders . split ( ',' ) ) {
917
- if ( pair . includes ( '=' ) ) {
918
- const [ key , value ] = pair . split ( '=' , 2 ) ;
919
963
if ( key === AWS_OTLP_LOGS_GROUP_HEADER && value ) {
920
- hasLogGroup = true ;
964
+ logGroup = value ;
965
+ filteredLogHeadersCount ++ ;
921
966
} else if ( key === AWS_OTLP_LOGS_STREAM_HEADER && value ) {
922
- hasLogStream = true ;
967
+ logStream = value ;
968
+ filteredLogHeadersCount ++ ;
969
+ } else if ( key === AWS_EMF_METRICS_NAMESPACE && value ) {
970
+ namespace = value ;
923
971
}
924
972
}
925
973
}
926
974
927
- if ( ! hasLogGroup || ! hasLogStream ) {
975
+ const isValid = filteredLogHeadersCount === 2 && ! ! logGroup && ! ! logStream ;
976
+ if ( ! isValid ) {
928
977
diag . warn (
929
978
'Incomplete configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS ' +
930
979
`to have values for ${ AWS_OTLP_LOGS_GROUP_HEADER } and ${ AWS_OTLP_LOGS_STREAM_HEADER } `
931
980
) ;
981
+ }
982
+
983
+ return {
984
+ logGroup : logGroup ,
985
+ logStream : logStream ,
986
+ namespace : namespace ,
987
+ isValid : isValid ,
988
+ } ;
989
+ }
990
+
991
+ export function checkEmfExporterEnabled ( ) : boolean {
992
+ const exporterValue = process . env . OTEL_METRICS_EXPORTER ;
993
+ if ( exporterValue === undefined ) {
994
+ return false ;
995
+ }
996
+
997
+ const exporters = exporterValue . split ( ',' ) . map ( exporter => exporter . trim ( ) ) ;
998
+
999
+ const index = exporters . indexOf ( 'awsemf' ) ;
1000
+ if ( index === - 1 ) {
932
1001
return false ;
933
1002
}
1003
+
1004
+ exporters . splice ( index , 1 ) ;
1005
+
1006
+ const newValue = exporters ? exporters . join ( ',' ) : undefined ;
1007
+
1008
+ if ( typeof newValue === 'string' && newValue !== '' ) {
1009
+ process . env . OTEL_METRICS_EXPORTER = newValue ;
1010
+ } else {
1011
+ delete process . env . OTEL_METRICS_EXPORTER ;
1012
+ }
1013
+
934
1014
return true ;
935
1015
}
936
1016
937
- // END The OpenTelemetry Authors code
1017
+ export function createEmfExporter ( ) : AWSCloudWatchEMFExporter | undefined {
1018
+ const headersResult = validateAndFetchLogsHeader ( ) ;
1019
+ if ( ! headersResult . isValid ) {
1020
+ return undefined ;
1021
+ }
1022
+
1023
+ // If headersResult.isValid is true, then headersResult.logGroup and headersResult.logStream are guaranteed to be strings
1024
+ return new AWSCloudWatchEMFExporter (
1025
+ headersResult . namespace ,
1026
+ headersResult . logGroup as string ,
1027
+ headersResult . logStream as string
1028
+ ) ;
1029
+ }
0 commit comments