Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,15 @@
"rimraf": "5.0.5",
"sinon": "15.2.0",
"ts-mocha": "10.0.0",
"typescript": "4.4.4"
"typescript": "4.9.5"
},
"dependencies": {
"@aws-sdk/client-cloudwatch-logs": "3.621.0",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/auto-configuration-propagators": "0.3.2",
"@opentelemetry/auto-instrumentations-node": "0.56.0",
"@opentelemetry/api-events": "0.57.1",
"@opentelemetry/baggage-span-processor": "0.3.1",
"@opentelemetry/sdk-events": "0.57.1",
"@opentelemetry/sdk-logs": "0.57.1",
"@opentelemetry/core": "1.30.1",
"@opentelemetry/exporter-metrics-otlp-grpc": "0.57.1",
"@opentelemetry/exporter-metrics-otlp-http": "0.57.1",
Expand All @@ -119,10 +118,11 @@
"@opentelemetry/propagator-aws-xray": "1.26.2",
"@opentelemetry/resource-detector-aws": "1.12.0",
"@opentelemetry/resources": "1.30.1",
"@opentelemetry/sdk-events": "0.57.1",
"@opentelemetry/sdk-logs": "0.57.1",
"@opentelemetry/sdk-metrics": "1.30.1",
"@opentelemetry/sdk-node": "0.57.1",
"@opentelemetry/sdk-trace-base": "1.30.1",
"@opentelemetry/sdk-logs": "0.57.1",
"@opentelemetry/semantic-conventions": "1.28.0"
},
"files": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,15 @@ import { OTLPUdpSpanExporter } from './otlp-udp-exporter';
import { AwsXRayRemoteSampler } from './sampler/aws-xray-remote-sampler';
// This file is generated via `npm run compile`
import { LIB_VERSION } from './version';
import { AWSCloudWatchEMFExporter } from './exporter/aws/metrics/aws-cloudwatch-emf-exporter';
import { OTLPAwsLogExporter } from './exporter/otlp/aws/logs/otlp-aws-log-exporter';

import { isAgentObservabilityEnabled } from './utils';
import { BaggageSpanProcessor } from '@opentelemetry/baggage-span-processor';
import { logs } from '@opentelemetry/api-logs';

const AWS_TRACES_OTLP_ENDPOINT_PATTERN = '^https://xray\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/traces$';
const AWS_LOGS_OTLP_ENDPOINT_PATTERN = '^https://logs\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/logs$';

const AWS_OTLP_LOGS_GROUP_HEADER = 'x-aws-log-group';
const AWS_OTLP_LOGS_STREAM_HEADER = 'x-aws-log-stream';

const APPLICATION_SIGNALS_ENABLED_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_ENABLED';
const APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT';
const METRIC_EXPORT_INTERVAL_CONFIG: string = 'OTEL_METRIC_EXPORT_INTERVAL';
Expand All @@ -99,6 +96,17 @@ const FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = 'T1U';
const LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10;
export const LAMBDA_APPLICATION_SIGNALS_REMOTE_ENVIRONMENT: string = 'LAMBDA_APPLICATION_SIGNALS_REMOTE_ENVIRONMENT';

const AWS_OTLP_LOGS_GROUP_HEADER = 'x-aws-log-group';
const AWS_OTLP_LOGS_STREAM_HEADER = 'x-aws-log-stream';
const AWS_EMF_METRICS_NAMESPACE = 'x-aws-metric-namespace';

interface OtlpLogHeaderSetting {
logGroup?: string;
logStream?: string;
namespace?: string;
isValid: boolean;
}

/**
* Aws Application Signals Config Provider creates a configuration object that can be provided to
* the OTel NodeJS SDK for Auto Instrumentation with Application Signals Functionality.
Expand All @@ -120,6 +128,7 @@ export class AwsOpentelemetryConfigurator {
private spanProcessors: SpanProcessor[];
private logRecordProcessors: LogRecordProcessor[];
private propagator: TextMapPropagator;
private metricReader: PeriodicExportingMetricReader | undefined;

/**
* The constructor will setup the AwsOpentelemetryConfigurator object to be able to provide a
Expand Down Expand Up @@ -204,6 +213,9 @@ export class AwsOpentelemetryConfigurator {
this.spanProcessors = awsSpanProcessorProvider.getSpanProcessors();
this.logRecordProcessors = AwsLoggerProcessorProvider.getlogRecordProcessors();
AwsOpentelemetryConfigurator.customizeSpanProcessors(this.spanProcessors, this.resource);

const isEmfEnabled = checkEmfExporterEnabled();
this.customizeMetricReader(isEmfEnabled);
}

private customizeVersions(autoResource: Resource): Resource {
Expand Down Expand Up @@ -236,6 +248,10 @@ export class AwsOpentelemetryConfigurator {
textMapPropagator: this.propagator,
};

if (this.metricReader) {
config.metricReader = this.metricReader;
}

return config;
}

Expand All @@ -248,6 +264,20 @@ export class AwsOpentelemetryConfigurator {
return isApplicationSignalsEnabled.toLowerCase() === 'true';
}

static geMetricExportInterval(): number {
let exportIntervalMillis: number = Number(process.env[METRIC_EXPORT_INTERVAL_CONFIG]);
diag.debug(`AWS Application Signals Metrics export interval: ${exportIntervalMillis}`);

// Cap export interval to 60 seconds. This is currently required for metrics-trace correlation to work correctly.
if (isNaN(exportIntervalMillis) || exportIntervalMillis.valueOf() > DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS) {
exportIntervalMillis = DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS;

diag.info(`AWS Application Signals metrics export interval capped to ${exportIntervalMillis}`);
}

return exportIntervalMillis;
}

static exportUnsampledSpanForAgentObservability(spanProcessors: SpanProcessor[], resource: Resource): void {
if (!isAgentObservabilityEnabled()) {
return;
Expand Down Expand Up @@ -296,22 +326,13 @@ export class AwsOpentelemetryConfigurator {

diag.info('AWS Application Signals enabled.');

let exportIntervalMillis: number = Number(process.env[METRIC_EXPORT_INTERVAL_CONFIG]);
diag.debug(`AWS Application Signals Metrics export interval: ${exportIntervalMillis}`);

if (isNaN(exportIntervalMillis) || exportIntervalMillis.valueOf() > DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS) {
exportIntervalMillis = DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS;

diag.info(`AWS Application Signals metrics export interval capped to ${exportIntervalMillis}`);
}

spanProcessors.push(AttributePropagatingSpanProcessorBuilder.create().build());

const applicationSignalsMetricExporter: PushMetricExporter =
ApplicationSignalsExporterProvider.Instance.createExporter();
const periodicExportingMetricReader: PeriodicExportingMetricReader = new PeriodicExportingMetricReader({
exporter: applicationSignalsMetricExporter,
exportIntervalMillis: exportIntervalMillis,
exportIntervalMillis: AwsOpentelemetryConfigurator.geMetricExportInterval(),
});

// Register BatchUnsampledSpanProcessor to export unsampled traces in Lambda
Expand Down Expand Up @@ -347,6 +368,18 @@ export class AwsOpentelemetryConfigurator {
}
}

private customizeMetricReader(isEmfEnabled: boolean) {
if (isEmfEnabled) {
const emfExporter = createEmfExporter();
if (emfExporter) {
const periodicExportingMetricReader = new PeriodicExportingMetricReader({
exporter: emfExporter,
});
this.metricReader = periodicExportingMetricReader;
}
}
}

static customizeSampler(sampler: Sampler): Sampler {
if (AwsOpentelemetryConfigurator.isApplicationSignalsEnabled()) {
return AlwaysRecordSampler.create(sampler);
Expand Down Expand Up @@ -517,7 +550,7 @@ export class AwsLoggerProcessorProvider {
if (
otlpExporterLogsEndpoint &&
isAwsOtlpEndpoint(otlpExporterLogsEndpoint, 'logs') &&
validateLogsHeaders()
validateAndFetchLogsHeader().isValid
) {
diag.debug('Detected CloudWatch Logs OTLP endpoint. Switching exporter to OTLPAwsLogExporter');
exporters.push(
Expand All @@ -538,7 +571,7 @@ export class AwsLoggerProcessorProvider {
if (
otlpExporterLogsEndpoint &&
isAwsOtlpEndpoint(otlpExporterLogsEndpoint, 'logs') &&
validateLogsHeaders()
validateAndFetchLogsHeader().isValid
) {
diag.debug('Detected CloudWatch Logs OTLP endpoint. Switching exporter to OTLPAwsLogExporter');
exporters.push(
Expand Down Expand Up @@ -857,6 +890,8 @@ function getSamplerProbabilityFromEnv(environment: Required<ENVIRONMENT>): numbe
return probability;
}

// END The OpenTelemetry Authors code

function getSpanExportBatchSize() {
if (isLambdaEnvironment()) {
return LAMBDA_SPAN_EXPORT_BATCH_SIZE;
Expand All @@ -880,8 +915,7 @@ function getXrayDaemonEndpoint() {
/**
* Determines if the given endpoint is either the AWS OTLP Traces or Logs endpoint.
*/

function isAwsOtlpEndpoint(otlpEndpoint: string, service: string): boolean {
export function isAwsOtlpEndpoint(otlpEndpoint: string, service: string): boolean {
let pattern = '';
if (service === 'xray') {
pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN;
Expand All @@ -898,40 +932,98 @@ function isAwsOtlpEndpoint(otlpEndpoint: string, service: string): boolean {
* Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
* AWS OTLP Logs endpoint.
*/
function validateLogsHeaders() {
const logsHeaders = process.env['OTEL_EXPORTER_OTLP_LOGS_HEADERS'];
export function validateAndFetchLogsHeader(): OtlpLogHeaderSetting {
const logHeaders = process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS;

if (!logsHeaders) {
if (!logHeaders) {
diag.warn(
'Missing required configuration: The environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS must be set with ' +
`required headers ${AWS_OTLP_LOGS_GROUP_HEADER} and ${AWS_OTLP_LOGS_STREAM_HEADER}. ` +
`Example: OTEL_EXPORTER_OTLP_LOGS_HEADERS="${AWS_OTLP_LOGS_GROUP_HEADER}=my-log-group,${AWS_OTLP_LOGS_STREAM_HEADER}=my-log-stream"`
);
return false;
return {
logGroup: '',
logStream: '',
namespace: '',
isValid: false,
};
}

let hasLogGroup = false;
let hasLogStream = false;
let logGroup: string | undefined = undefined;
let logStream: string | undefined = undefined;
let namespace: string | undefined = undefined;
let filteredLogHeadersCount: number = 0;

for (const pair of logHeaders.split(',')) {
const splitIndex = pair.indexOf('=');
if (splitIndex > -1) {
const key = pair.substring(0, splitIndex);
const value = pair.substring(splitIndex + 1);

for (const pair of logsHeaders.split(',')) {
if (pair.includes('=')) {
const [key, value] = pair.split('=', 2);
if (key === AWS_OTLP_LOGS_GROUP_HEADER && value) {
hasLogGroup = true;
logGroup = value;
filteredLogHeadersCount++;
} else if (key === AWS_OTLP_LOGS_STREAM_HEADER && value) {
hasLogStream = true;
logStream = value;
filteredLogHeadersCount++;
} else if (key === AWS_EMF_METRICS_NAMESPACE && value) {
namespace = value;
}
}
}

if (!hasLogGroup || !hasLogStream) {
const isValid = filteredLogHeadersCount === 2 && !!logGroup && !!logStream;
if (!isValid) {
diag.warn(
'Incomplete configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS ' +
`to have values for ${AWS_OTLP_LOGS_GROUP_HEADER} and ${AWS_OTLP_LOGS_STREAM_HEADER}`
);
}

return {
logGroup: logGroup,
logStream: logStream,
namespace: namespace,
isValid: isValid,
};
}

export function checkEmfExporterEnabled(): boolean {
const exporterValue = process.env.OTEL_METRICS_EXPORTER;
if (exporterValue === undefined) {
return false;
}

const exporters = exporterValue.split(',').map(exporter => exporter.trim());

const index = exporters.indexOf('awsemf');
if (index === -1) {
return false;
}

exporters.splice(index, 1);

const newValue = exporters ? exporters.join(',') : undefined;

if (typeof newValue === 'string' && newValue !== '') {
process.env.OTEL_METRICS_EXPORTER = newValue;
} else {
delete process.env.OTEL_METRICS_EXPORTER;
}

return true;
}

// END The OpenTelemetry Authors code
export function createEmfExporter(): AWSCloudWatchEMFExporter | undefined {
const headersResult = validateAndFetchLogsHeader();
if (!headersResult.isValid) {
return undefined;
}

// If headersResult.isValid is true, then headersResult.logGroup and headersResult.logStream are guaranteed to be strings
return new AWSCloudWatchEMFExporter(
headersResult.namespace,
headersResult.logGroup as string,
headersResult.logStream as string
);
}
Loading
Loading