diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts index 0886bbb8..8186d99f 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts @@ -80,6 +80,8 @@ import { BaggageSpanProcessor } from '@opentelemetry/baggage-span-processor'; import { logs } from '@opentelemetry/api-logs'; import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys'; import { AwsCloudWatchOtlpBatchLogRecordProcessor } from './exporter/otlp/aws/logs/aws-cw-otlp-batch-log-record-processor'; +import { ConsoleEMFExporter } from './exporter/aws/metrics/console-emf-exporter'; +import { EMFExporterBase } from './exporter/aws/metrics/emf-exporter-base'; const AWS_TRACES_OTLP_ENDPOINT_PATTERN = '^https://xray\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/traces$'; const AWS_LOGS_OTLP_ENDPOINT_PATTERN = '^https://logs\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/logs$'; @@ -382,14 +384,17 @@ export class AwsOpentelemetryConfigurator { } private customizeMetricReader(isEmfEnabled: boolean) { + let exporter: PushMetricExporter | undefined = undefined; + if (isEmfEnabled) { - const emfExporter = createEmfExporter(); - if (emfExporter) { - const periodicExportingMetricReader = new PeriodicExportingMetricReader({ - exporter: emfExporter, - }); - this.metricReader = periodicExportingMetricReader; - } + exporter = createEmfExporter(); + } + + if (exporter) { + const periodicExportingMetricReader = new PeriodicExportingMetricReader({ + exporter: exporter, + }); + this.metricReader = periodicExportingMetricReader; } } @@ -523,8 +528,7 @@ export class AwsLoggerProcessorProvider { return exporters.map(exporter => { if (exporter instanceof ConsoleLogRecordExporter) { return new SimpleLogRecordProcessor(exporter); - } - if (exporter instanceof OTLPAwsLogExporter && isAgentObservabilityEnabled()) { + } else if (exporter instanceof OTLPAwsLogExporter && isAgentObservabilityEnabled()) { return new AwsCloudWatchOtlpBatchLogRecordProcessor(exporter); } return new BatchLogRecordProcessor(exporter); @@ -961,15 +965,17 @@ export function validateAndFetchLogsHeader(): OtlpLogHeaderSetting { const logHeaders = process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS; if (!logHeaders) { - diag.warn( - 'Missing required configuration: The environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS must be set with ' + - `required headers ${AWS_OTLP_LOGS_GROUP_HEADER} and ${AWS_OTLP_LOGS_STREAM_HEADER}. ` + - `Example: OTEL_EXPORTER_OTLP_LOGS_HEADERS="${AWS_OTLP_LOGS_GROUP_HEADER}=my-log-group,${AWS_OTLP_LOGS_STREAM_HEADER}=my-log-stream"` - ); + if (!isLambdaEnvironment()) { + diag.warn( + 'Missing required configuration: The environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS must be set with ' + + `required headers ${AWS_OTLP_LOGS_GROUP_HEADER} and ${AWS_OTLP_LOGS_STREAM_HEADER}. ` + + `Example: OTEL_EXPORTER_OTLP_LOGS_HEADERS="${AWS_OTLP_LOGS_GROUP_HEADER}=my-log-group,${AWS_OTLP_LOGS_STREAM_HEADER}=my-log-stream"` + ); + } return { - logGroup: '', - logStream: '', - namespace: '', + logGroup: undefined, + logStream: undefined, + namespace: undefined, isValid: false, }; } @@ -1030,16 +1036,27 @@ export function checkEmfExporterEnabled(): boolean { return true; } -export function createEmfExporter(): AWSCloudWatchEMFExporter | undefined { - const headersResult = validateAndFetchLogsHeader(); - if (!headersResult.isValid) { - return undefined; +/** + * Create the appropriate EMF exporter based on the environment and configuration. + * + * @returns {EMFExporterBase | undefined} + */ +export function createEmfExporter(): EMFExporterBase | undefined { + let exporter: EMFExporterBase | undefined = undefined; + const otlpLogHeaderSetting = validateAndFetchLogsHeader(); + + if (isLambdaEnvironment() && !otlpLogHeaderSetting.isValid) { + // Lambda without valid logs http headers - use Console EMF exporter + exporter = new ConsoleEMFExporter(otlpLogHeaderSetting.namespace); + } else if (otlpLogHeaderSetting.isValid) { + // Non-Lambda environment - use CloudWatch EMF exporter + // If headersResult.isValid is true, then headersResult.logGroup and headersResult.logStream are guaranteed to be strings + exporter = new AWSCloudWatchEMFExporter( + otlpLogHeaderSetting.namespace, + otlpLogHeaderSetting.logGroup as string, + otlpLogHeaderSetting.logStream as string + ); } - // If headersResult.isValid is true, then headersResult.logGroup and headersResult.logStream are guaranteed to be strings - return new AWSCloudWatchEMFExporter( - headersResult.namespace, - headersResult.logGroup as string, - headersResult.logStream as string - ); + return exporter; } diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/aws-cloudwatch-emf-exporter.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/aws-cloudwatch-emf-exporter.ts index 6edf0fdc..234cb5dd 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/aws-cloudwatch-emf-exporter.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/aws-cloudwatch-emf-exporter.ts @@ -1,84 +1,11 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { Attributes, diag, HrTime } from '@opentelemetry/api'; -import { - Aggregation, - AggregationSelector, - AggregationTemporality, - AggregationTemporalitySelector, - DataPoint, - DataPointType, - ExponentialHistogram, - ExponentialHistogramMetricData, - GaugeMetricData, - Histogram, - HistogramMetricData, - InstrumentType, - PushMetricExporter, - ResourceMetrics, - SumMetricData, -} from '@opentelemetry/sdk-metrics'; -import { Resource } from '@opentelemetry/resources'; -import { ExportResult, ExportResultCode } from '@opentelemetry/core'; +import { diag } from '@opentelemetry/api'; +import { AggregationSelector, AggregationTemporalitySelector } from '@opentelemetry/sdk-metrics'; import { CloudWatchLogsClient } from './cloudwatch-logs-client'; import type { LogEvent, CloudWatchLogsClientConfig } from '@aws-sdk/client-cloudwatch-logs'; - -/** - * Intermediate format for metric data before converting to EMF - */ -export interface MetricRecord { - name: string; - unit: string; - description: string; - timestamp: number; - attributes: Attributes; - - // Only one of the following should be defined - sumData?: number; - histogramData?: HistogramMetricRecordData; - expHistogramData?: ExponentialHistogramMetricRecordData; - value?: number; -} - -interface HistogramMetricRecordData { - Count: number; - Sum: number; - Max: number; - Min: number; -} - -interface ExponentialHistogramMetricRecordData { - Values: number[]; - Counts: number[]; - Count: number; - Sum: number; - Max: number; - Min: number; -} - -interface EMFLog { - _aws: _Aws; - [key: `otel.resource.${string}`]: string; - [metricName: string]: any; - Version: string; -} - -interface _Aws { - CloudWatchMetrics: CloudWatchMetric[]; - Timestamp: number; -} - -interface CloudWatchMetric { - Namespace: string; - Dimensions?: string[][]; - Metrics: Metric[]; -} - -interface Metric { - Name: string; - Unit?: string; -} +import { EMFExporterBase } from './emf-exporter-base'; /** * OpenTelemetry metrics exporter for CloudWatch EMF format. @@ -87,57 +14,8 @@ interface Metric { * sent to CloudWatch Logs. CloudWatch Logs automatically extracts the metrics * from the EMF logs. */ -export class AWSCloudWatchEMFExporter implements PushMetricExporter { - private namespace: string; +export class AWSCloudWatchEMFExporter extends EMFExporterBase { private logClient: CloudWatchLogsClient; - private aggregationTemporalitySelector: AggregationTemporalitySelector; - private aggregationSelector: AggregationSelector; - - // CloudWatch EMF supported units - // Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html - private EMF_SUPPORTED_UNITS: Set = new Set([ - 'Seconds', - 'Microseconds', - 'Milliseconds', - 'Bytes', - 'Kilobytes', - 'Megabytes', - 'Gigabytes', - 'Terabytes', - 'Bits', - 'Kilobits', - 'Megabits', - 'Gigabits', - 'Terabits', - 'Percent', - 'Count', - 'Bytes/Second', - 'Kilobytes/Second', - 'Megabytes/Second', - 'Gigabytes/Second', - 'Terabytes/Second', - 'Bits/Second', - 'Kilobits/Second', - 'Megabits/Second', - 'Gigabits/Second', - 'Terabits/Second', - 'Count/Second', - 'None', - ]); - - // OTel to CloudWatch unit mapping - // Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 - private UNIT_MAPPING: Map = new Map( - Object.entries({ - '1': '', - ns: '', - ms: 'Milliseconds', - s: 'Seconds', - us: 'Microseconds', - By: 'Bytes', - bit: 'Bits', - }) - ); constructor( namespace: string = 'default', @@ -147,461 +25,18 @@ export class AWSCloudWatchEMFExporter implements PushMetricExporter { aggregationSelector?: AggregationSelector, cloudwatchLogsConfig: CloudWatchLogsClientConfig = {} ) { - this.namespace = namespace; - - if (aggregationTemporalitySelector) { - this.aggregationTemporalitySelector = aggregationTemporalitySelector; - } else { - this.aggregationTemporalitySelector = (instrumentType: InstrumentType) => { - return AggregationTemporality.DELTA; - }; - } - - if (aggregationSelector) { - this.aggregationSelector = aggregationSelector; - } else { - this.aggregationSelector = (instrumentType: InstrumentType) => { - switch (instrumentType) { - case InstrumentType.HISTOGRAM: { - return Aggregation.ExponentialHistogram(); - } - } - return Aggregation.Default(); - }; - } + super(namespace, aggregationTemporalitySelector, aggregationSelector); this.logClient = new CloudWatchLogsClient(logGroupName, logStreamName, cloudwatchLogsConfig); } - /** - * Get CloudWatch unit from unit in MetricRecord - */ - private getUnit(record: MetricRecord): string | undefined { - const unit = record.unit; - - if (this.EMF_SUPPORTED_UNITS.has(unit)) { - return unit; - } - - return this.UNIT_MAPPING.get(unit); - } - - /** - * Extract dimension names from attributes. - * For now, use all attributes as dimensions for the dimension selection logic. - */ - private getDimensionNames(attributes: Attributes): string[] { - return Object.keys(attributes); - } - - /** - * Create a hashable key from attributes for grouping metrics. - */ - private getAttributesKey(attributes: Attributes): string { - // Sort the attributes to ensure consistent keys - const sortedAttrs = Object.entries(attributes).sort(); - // Create a string representation of the attributes - return sortedAttrs.toString(); - } - - /** - * Normalize an OpenTelemetry timestamp to milliseconds for CloudWatch. - */ - private normalizeTimestamp(hrTime: HrTime): number { - // Convert from second and nanoseconds to milliseconds - const secondsToMillis = hrTime[0] * 1000; - const nanosToMillis = Math.floor(hrTime[1] / 1_000_000); - return secondsToMillis + nanosToMillis; - } - - /** - * Create a base metric record with instrument information. - */ - private createMetricRecord( - metricName: string, - metricUnit: string, - metricDescription: string, - timestamp: number, - attributes: Attributes - ): MetricRecord { - return { - name: metricName, - unit: metricUnit, - description: metricDescription, - timestamp, - attributes, - }; - } - - /** - * Convert a Gauge or Sum metric datapoint to a metric record. - */ - private convertGaugeAndSum(metric: SumMetricData | GaugeMetricData, dataPoint: DataPoint): MetricRecord { - const timestampMs = this.normalizeTimestamp(dataPoint.endTime); - const record = this.createMetricRecord( - metric.descriptor.name, - metric.descriptor.unit, - metric.descriptor.description, - timestampMs, - dataPoint.attributes - ); - record.value = dataPoint.value; - return record; - } - - /** - * Convert a Histogram metric datapoint to a metric record. - * - * https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 - */ - private convertHistogram(metric: HistogramMetricData, dataPoint: DataPoint): MetricRecord { - const timestampMs = this.normalizeTimestamp(dataPoint.endTime); - const record = this.createMetricRecord( - metric.descriptor.name, - metric.descriptor.unit, - metric.descriptor.description, - timestampMs, - dataPoint.attributes - ); - record.histogramData = { - Count: dataPoint.value.count, - Sum: dataPoint.value.sum ?? 0, - Min: dataPoint.value.min ?? 0, - Max: dataPoint.value.max ?? 0, - }; - return record; - } - - /** - * Convert an ExponentialHistogram metric datapoint to a metric record. - * - * This function follows the logic of CalculateDeltaDatapoints in the Go implementation, - * converting exponential buckets to their midpoint values. - * Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 - */ - private convertExpHistogram( - metric: ExponentialHistogramMetricData, - dataPoint: DataPoint - ): MetricRecord { - // Initialize arrays for values and counts - const arrayValues = []; - const arrayCounts = []; - - const scale = dataPoint.value.scale; - const base = Math.pow(2, Math.pow(2, -scale)); - - // Process positive buckets - if (dataPoint.value?.positive?.bucketCounts) { - const positiveOffset = dataPoint.value.positive.offset; - const positiveBucketCounts = dataPoint.value.positive.bucketCounts; - - let bucketBegin = 0; - let bucketEnd = 0; - - for (const [i, count] of positiveBucketCounts.entries()) { - const index = i + positiveOffset; - - if (bucketBegin === 0) { - bucketBegin = Math.pow(base, index); - } else { - bucketBegin = bucketEnd; - } - - bucketEnd = Math.pow(base, index + 1); - // Calculate midpoint value of the bucket - const metricVal = (bucketBegin + bucketEnd) / 2; - - // Only include buckets with positive counts - if (count > 0) { - arrayValues.push(metricVal); - arrayCounts.push(count); - } - } - } - - // Process zero bucket - const zeroCount = dataPoint.value.zeroCount; - if (zeroCount > 0) { - arrayValues.push(0); - arrayCounts.push(zeroCount); - } - - // Process negative buckets - if (dataPoint.value?.negative?.bucketCounts) { - const negativeOffset = dataPoint.value.negative.offset; - const negativeBucketCounts = dataPoint.value.negative.bucketCounts; - - let bucketBegin = 0; - let bucketEnd = 0; - - for (const [i, count] of negativeBucketCounts.entries()) { - const index = i + negativeOffset; - - if (bucketEnd === 0) { - bucketEnd = -Math.pow(base, index); - } else { - bucketEnd = bucketBegin; - } - - bucketBegin = -Math.pow(base, index + 1); - // Calculate midpoint value of the bucket - const metricVal = (bucketBegin + bucketEnd) / 2; - - // Only include buckets with positive counts - if (count > 0) { - arrayValues.push(metricVal); - arrayCounts.push(count); - } - } - } - - const timestampMs = this.normalizeTimestamp(dataPoint.endTime); - const record = this.createMetricRecord( - metric.descriptor.name, - metric.descriptor.unit, - metric.descriptor.description, - timestampMs, - dataPoint.attributes - ); - - // Set the histogram data in the format expected by CloudWatch EMF - record.expHistogramData = { - Values: arrayValues, - Counts: arrayCounts, - Count: dataPoint.value.count, - Sum: dataPoint.value.sum ?? 0, - Max: dataPoint.value.max ?? 0, - Min: dataPoint.value.min ?? 0, - }; - - return record; - } - - /** - * Create EMF log from metric records. - * - * Since metricRecords is already grouped by attributes, this function - * creates a single EMF log for all records. - */ - private createEmfLog( - metricRecords: MetricRecord[], - resource: Resource, - timestamp: number | undefined = undefined - ): EMFLog { - // Start with base structure and latest EMF version schema - // opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 - const emfLog: EMFLog = { - _aws: { - Timestamp: timestamp || Date.now(), - CloudWatchMetrics: [], - }, - Version: '1', - }; - - // Add resource attributes to EMF log but not as dimensions - // OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes - // as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, - // we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. - // And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. - if (resource && resource.attributes) { - for (const [key, value] of Object.entries(resource.attributes)) { - emfLog[`otel.resource.${key}`] = value?.toString() ?? 'undefined'; - } - } - - // Initialize collections for dimensions and metrics - const metricDefinitions: Metric[] = []; - // Collect attributes from all records (they should be the same for all records in the group) - // Only collect once from the first record and apply to all records - const allAttributes: Attributes = metricRecords.length > 0 ? metricRecords[0].attributes : {}; - - // Process each metric record - for (const record of metricRecords) { - const metricName = record.name; - if (!metricName) { - continue; - } - - if (record.expHistogramData) { - // Base2 Exponential Histogram - emfLog[metricName] = record.expHistogramData; - } else if (record.histogramData) { - // Regular Histogram metrics - emfLog[metricName] = record.histogramData; - } else if (record.value !== undefined) { - // Gauge, Sum, and other aggregations - emfLog[metricName] = record.value; - } else { - diag.debug(`Skipping metric ${metricName} as it does not have valid metric value`); - continue; - } - - const metricData: Metric = { - Name: metricName, - }; - - const unit = this.getUnit(record); - if (unit) { - metricData.Unit = unit; - } - metricDefinitions.push(metricData); - } - - const dimensionNames = this.getDimensionNames(allAttributes); - - // Add attribute values to the root of the EMF log - for (const [name, value] of Object.entries(allAttributes)) { - emfLog[name] = value?.toString() ?? 'undefined'; - } - - // Add CloudWatch Metrics if we have metrics, include dimensions only if they exist - if (metricDefinitions.length > 0) { - const cloudWatchMetric: CloudWatchMetric = { - Namespace: this.namespace, - Metrics: metricDefinitions, - }; - - if (dimensionNames.length > 0) { - cloudWatchMetric.Dimensions = [dimensionNames]; - } - - emfLog._aws.CloudWatchMetrics.push(cloudWatchMetric); - } - - return emfLog; - } - - /** - * Group metric record by attributes and timestamp. - * - * @param record The metric record - * @param timestampMs The timestamp in milliseconds - * @returns {[string, number]} Values for the key to group metrics - */ - private groupByAttributesAndTimestamp(record: MetricRecord): [string, number] { - // Create a key for grouping based on attributes - const attrsKey = this.getAttributesKey(record.attributes); - return [attrsKey, record.timestamp]; - } - - /** - * Method to handle safely pushing a MetricRecord into a Map of a Map of a list of MetricRecords - * - * @param groupedMetrics - * @param groupAttribute - * @param groupTimestamp - * @param record - */ - private pushMetricRecordIntoGroupedMetrics( - groupedMetrics: Map>, - groupAttribute: string, - groupTimestamp: number, - record: MetricRecord - ) { - let metricsGroupedByAttribute = groupedMetrics.get(groupAttribute); - if (!metricsGroupedByAttribute) { - metricsGroupedByAttribute = new Map(); - groupedMetrics.set(groupAttribute, metricsGroupedByAttribute); - } - - let metricsGroupedByAttributeAndTimestamp = metricsGroupedByAttribute.get(groupTimestamp); - if (!metricsGroupedByAttributeAndTimestamp) { - metricsGroupedByAttributeAndTimestamp = []; - metricsGroupedByAttribute.set(groupTimestamp, metricsGroupedByAttributeAndTimestamp); - } - metricsGroupedByAttributeAndTimestamp.push(record); - } - - /** - * Export metrics as EMF logs to CloudWatch. - * Groups metrics by attributes and timestamp before creating EMF logs. - * - * @param resourceMetrics Resource Metrics data containing scope metrics - * @param resultCallback callback for when the export has completed - * @returns {Promise} - */ - public async export(resourceMetrics: ResourceMetrics, resultCallback: (result: ExportResult) => void) { - try { - if (!resourceMetrics) { - resultCallback({ code: ExportResultCode.SUCCESS }); - return; - } - - // Process all metrics from resource metrics their scope metrics - // The resource is now part of each resourceMetrics object - const resource = resourceMetrics.resource; - - for (const scopeMetrics of resourceMetrics.scopeMetrics) { - // Map of maps to group metrics by attributes and timestamp - // Keys: (attributesKey, timestampMs) - // Value: list of metric records - const groupedMetrics = new Map>(); - - // Process all metrics in this scope - for (const metric of scopeMetrics.metrics) { - // Convert metrics to a format compatible with createEmfLog - // Process metric.dataPoints for different metric types - if (metric.dataPointType === DataPointType.GAUGE || metric.dataPointType === DataPointType.SUM) { - for (const dataPoint of metric.dataPoints) { - const record = this.convertGaugeAndSum(metric, dataPoint); - const [groupAttribute, groupTimestamp] = this.groupByAttributesAndTimestamp(record); - this.pushMetricRecordIntoGroupedMetrics(groupedMetrics, groupAttribute, groupTimestamp, record); - } - } else if (metric.dataPointType === DataPointType.HISTOGRAM) { - for (const dataPoint of metric.dataPoints) { - const record = this.convertHistogram(metric, dataPoint); - const [groupAttribute, groupTimestamp] = this.groupByAttributesAndTimestamp(record); - this.pushMetricRecordIntoGroupedMetrics(groupedMetrics, groupAttribute, groupTimestamp, record); - } - } else if (metric.dataPointType === DataPointType.EXPONENTIAL_HISTOGRAM) { - for (const dataPoint of metric.dataPoints) { - const record = this.convertExpHistogram(metric, dataPoint); - const [groupAttribute, groupTimestamp] = this.groupByAttributesAndTimestamp(record); - this.pushMetricRecordIntoGroupedMetrics(groupedMetrics, groupAttribute, groupTimestamp, record); - } - } else { - // This else block should never run, all metric types are accounted for above - diag.debug(`Unsupported Metric Type in metric: ${metric}`); - } - } - - // Now process each group separately to create one EMF log per group - for (const [_, metricsRecordsGroupedByTimestamp] of groupedMetrics) { - for (const [timestampMs, metricRecords] of metricsRecordsGroupedByTimestamp) { - if (metricRecords) { - // Create and send EMF log for this batch of metrics - - // Convert to JSON - const logEvent = { - message: JSON.stringify(this.createEmfLog(metricRecords, resource, Number(timestampMs))), - timestamp: timestampMs, - }; - - // Send to CloudWatch Logs - await this.sendLogEvent(logEvent); - } - } - } - } - - resultCallback({ code: ExportResultCode.SUCCESS }); - } catch (e) { - diag.error(`Failed to export metrics: ${e}`); - const exportResult: ExportResult = { code: ExportResultCode.FAILED }; - if (e instanceof Error) { - exportResult.error = e; - } - resultCallback(exportResult); - } - } - /** * Send a log event to CloudWatch Logs using the log client. * * @param logEvent The log event to send * @returns {Promise} */ - private async sendLogEvent(logEvent: Required) { + protected async sendLogEvent(logEvent: Required) { await this.logClient.sendLogEvent(logEvent); } @@ -620,12 +55,4 @@ export class AWSCloudWatchEMFExporter implements PushMetricExporter { await this.forceFlush(); diag.debug('AWSCloudWatchEMFExporter shutdown called'); } - - selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality { - return this.aggregationTemporalitySelector(instrumentType); - } - - selectAggregation(instrumentType: InstrumentType): Aggregation { - return this.aggregationSelector(instrumentType); - } } diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/console-emf-exporter.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/console-emf-exporter.ts new file mode 100644 index 00000000..d4fc5b56 --- /dev/null +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/console-emf-exporter.ts @@ -0,0 +1,55 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { AggregationSelector, AggregationTemporalitySelector } from '@opentelemetry/sdk-metrics'; +import type { LogEvent } from '@aws-sdk/client-cloudwatch-logs'; +import { EMFExporterBase } from './emf-exporter-base'; + +/** + * OpenTelemetry metrics exporter for CloudWatch EMF format to console output. + * + * This exporter converts OTel metrics into CloudWatch EMF logs and writes them + * to standard output instead of sending to CloudWatch Logs. This is useful for + * debugging, testing, or when you want to process EMF logs with other tools. + * + * https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + */ +export class ConsoleEMFExporter extends EMFExporterBase { + /** + * Constructor for the Console EMF exporter. + * + * @param namespace CloudWatch namespace for metrics (defaults to "default") + * @param aggregationTemporalitySelector Optional Aggregation temporality selector based on metric instrument types. + * @param aggregationSelector Optional Aggregation selector based on metric instrument types + */ + constructor( + namespace: string = 'default', + aggregationTemporalitySelector?: AggregationTemporalitySelector, + aggregationSelector?: AggregationSelector + ) { + super(namespace, aggregationTemporalitySelector, aggregationSelector); + } + + /** + * This method writes the EMF log message to stdout, making it easy to + * capture and redirect the output for processing or debugging purposes. + * + * @param logEvent The log event containing the message to send + * @returns {Promise} + */ + protected async sendLogEvent(logEvent: Required) { + console.log(logEvent.message); + } + + /** + * Force flush any pending metrics. + * For this exporter, there is nothing to forceFlush. + */ + public async forceFlush(): Promise {} + + /** + * Shutdown the exporter. + * For this exporter, there is nothing to clean-up in order to shutdown. + */ + public async shutdown(): Promise {} +} diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/emf-exporter-base.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/emf-exporter-base.ts new file mode 100644 index 00000000..97723956 --- /dev/null +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/aws/metrics/emf-exporter-base.ts @@ -0,0 +1,621 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { Attributes, diag, HrTime } from '@opentelemetry/api'; +import { + Aggregation, + AggregationSelector, + AggregationTemporality, + AggregationTemporalitySelector, + DataPoint, + DataPointType, + ExponentialHistogram, + ExponentialHistogramMetricData, + GaugeMetricData, + Histogram, + HistogramMetricData, + InstrumentType, + PushMetricExporter, + ResourceMetrics, + SumMetricData, +} from '@opentelemetry/sdk-metrics'; +import { Resource } from '@opentelemetry/resources'; +import { ExportResult, ExportResultCode } from '@opentelemetry/core'; +import type { LogEvent } from '@aws-sdk/client-cloudwatch-logs'; + +/** + * Intermediate format for metric data before converting to EMF + */ +export interface MetricRecord { + name: string; + unit: string; + description: string; + timestamp: number; + attributes: Attributes; + + // Only one of the following should be defined + sumData?: number; + histogramData?: HistogramMetricRecordData; + expHistogramData?: ExponentialHistogramMetricRecordData; + value?: number; +} + +interface HistogramMetricRecordData { + Count: number; + Sum: number; + Max: number; + Min: number; +} + +interface ExponentialHistogramMetricRecordData { + Values: number[]; + Counts: number[]; + Count: number; + Sum: number; + Max: number; + Min: number; +} + +export interface EMFLog { + _aws: _Aws; + [key: `otel.resource.${string}`]: string; + [metricName: string]: any; + Version: string; +} + +interface _Aws { + CloudWatchMetrics: CloudWatchMetric[]; + Timestamp: number; +} + +interface CloudWatchMetric { + Namespace: string; + Dimensions?: string[][]; + Metrics: Metric[]; +} + +interface Metric { + Name: string; + Unit?: string; +} + +/** + * Base class for OpenTelemetry metrics exporters that convert to CloudWatch EMF format. + * + * This class contains all the common logic for converting OTel metrics into CloudWatch EMF logs. + * Subclasses need to implement the sendLogEvent method to define where the EMF logs are sent. + * + * https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + */ +export abstract class EMFExporterBase implements PushMetricExporter { + private namespace: string; + private aggregationTemporalitySelector: AggregationTemporalitySelector; + private aggregationSelector: AggregationSelector; + + // CloudWatch EMF supported units + // Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html + private EMF_SUPPORTED_UNITS: Set = new Set([ + 'Seconds', + 'Microseconds', + 'Milliseconds', + 'Bytes', + 'Kilobytes', + 'Megabytes', + 'Gigabytes', + 'Terabytes', + 'Bits', + 'Kilobits', + 'Megabits', + 'Gigabits', + 'Terabits', + 'Percent', + 'Count', + 'Bytes/Second', + 'Kilobytes/Second', + 'Megabytes/Second', + 'Gigabytes/Second', + 'Terabytes/Second', + 'Bits/Second', + 'Kilobits/Second', + 'Megabits/Second', + 'Gigabits/Second', + 'Terabits/Second', + 'Count/Second', + 'None', + ]); + + // OTel to CloudWatch unit mapping + // Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 + private UNIT_MAPPING: Map = new Map( + Object.entries({ + '1': '', + ns: '', + ms: 'Milliseconds', + s: 'Seconds', + us: 'Microseconds', + By: 'Bytes', + bit: 'Bits', + }) + ); + + constructor( + namespace: string = 'default', + aggregationTemporalitySelector?: AggregationTemporalitySelector, + aggregationSelector?: AggregationSelector + ) { + this.namespace = namespace; + + if (aggregationTemporalitySelector) { + this.aggregationTemporalitySelector = aggregationTemporalitySelector; + } else { + this.aggregationTemporalitySelector = (instrumentType: InstrumentType) => { + return AggregationTemporality.DELTA; + }; + } + + if (aggregationSelector) { + this.aggregationSelector = aggregationSelector; + } else { + this.aggregationSelector = (instrumentType: InstrumentType) => { + switch (instrumentType) { + case InstrumentType.HISTOGRAM: { + return Aggregation.ExponentialHistogram(); + } + } + return Aggregation.Default(); + }; + } + } + + /** + * Get CloudWatch unit from unit in MetricRecord + */ + private getUnit(record: MetricRecord): string | undefined { + const unit = record.unit; + + if (this.EMF_SUPPORTED_UNITS.has(unit)) { + return unit; + } + + return this.UNIT_MAPPING.get(unit); + } + + /** + * Extract dimension names from attributes. + * For now, use all attributes as dimensions for the dimension selection logic. + */ + private getDimensionNames(attributes: Attributes): string[] { + return Object.keys(attributes); + } + + /** + * Create a hashable key from attributes for grouping metrics. + */ + private getAttributesKey(attributes: Attributes): string { + // Sort the attributes to ensure consistent keys + const sortedAttrs = Object.entries(attributes).sort(); + // Create a string representation of the attributes + return sortedAttrs.toString(); + } + + /** + * Normalize an OpenTelemetry timestamp to milliseconds for CloudWatch. + */ + private normalizeTimestamp(hrTime: HrTime): number { + // Convert from second and nanoseconds to milliseconds + const secondsToMillis = hrTime[0] * 1000; + const nanosToMillis = Math.floor(hrTime[1] / 1_000_000); + return secondsToMillis + nanosToMillis; + } + + /** + * Create a base metric record with instrument information. + */ + private createMetricRecord( + metricName: string, + metricUnit: string, + metricDescription: string, + timestamp: number, + attributes: Attributes + ): MetricRecord { + return { + name: metricName, + unit: metricUnit, + description: metricDescription, + timestamp, + attributes, + }; + } + + /** + * Convert a Gauge or Sum metric datapoint to a metric record. + */ + private convertGaugeAndSum(metric: SumMetricData | GaugeMetricData, dataPoint: DataPoint): MetricRecord { + const timestampMs = this.normalizeTimestamp(dataPoint.endTime); + const record = this.createMetricRecord( + metric.descriptor.name, + metric.descriptor.unit, + metric.descriptor.description, + timestampMs, + dataPoint.attributes + ); + record.value = dataPoint.value; + return record; + } + + /** + * Convert a Histogram metric datapoint to a metric record. + * + * https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 + */ + private convertHistogram(metric: HistogramMetricData, dataPoint: DataPoint): MetricRecord { + const timestampMs = this.normalizeTimestamp(dataPoint.endTime); + const record = this.createMetricRecord( + metric.descriptor.name, + metric.descriptor.unit, + metric.descriptor.description, + timestampMs, + dataPoint.attributes + ); + record.histogramData = { + Count: dataPoint.value.count, + Sum: dataPoint.value.sum ?? 0, + Min: dataPoint.value.min ?? 0, + Max: dataPoint.value.max ?? 0, + }; + return record; + } + + /** + * Convert an ExponentialHistogram metric datapoint to a metric record. + * + * This function follows the logic of CalculateDeltaDatapoints in the Go implementation, + * converting exponential buckets to their midpoint values. + * Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 + */ + private convertExpHistogram( + metric: ExponentialHistogramMetricData, + dataPoint: DataPoint + ): MetricRecord { + // Initialize arrays for values and counts + const arrayValues = []; + const arrayCounts = []; + + const scale = dataPoint.value.scale; + const base = Math.pow(2, Math.pow(2, -scale)); + + // Process positive buckets + if (dataPoint.value?.positive?.bucketCounts) { + const positiveOffset = dataPoint.value.positive.offset; + const positiveBucketCounts = dataPoint.value.positive.bucketCounts; + + let bucketBegin = 0; + let bucketEnd = 0; + + for (const [i, count] of positiveBucketCounts.entries()) { + const index = i + positiveOffset; + + if (bucketBegin === 0) { + bucketBegin = Math.pow(base, index); + } else { + bucketBegin = bucketEnd; + } + + bucketEnd = Math.pow(base, index + 1); + // Calculate midpoint value of the bucket + const metricVal = (bucketBegin + bucketEnd) / 2; + + // Only include buckets with positive counts + if (count > 0) { + arrayValues.push(metricVal); + arrayCounts.push(count); + } + } + } + + // Process zero bucket + const zeroCount = dataPoint.value.zeroCount; + if (zeroCount > 0) { + arrayValues.push(0); + arrayCounts.push(zeroCount); + } + + // Process negative buckets + if (dataPoint.value?.negative?.bucketCounts) { + const negativeOffset = dataPoint.value.negative.offset; + const negativeBucketCounts = dataPoint.value.negative.bucketCounts; + + let bucketBegin = 0; + let bucketEnd = 0; + + for (const [i, count] of negativeBucketCounts.entries()) { + const index = i + negativeOffset; + + if (bucketEnd === 0) { + bucketEnd = -Math.pow(base, index); + } else { + bucketEnd = bucketBegin; + } + + bucketBegin = -Math.pow(base, index + 1); + // Calculate midpoint value of the bucket + const metricVal = (bucketBegin + bucketEnd) / 2; + + // Only include buckets with positive counts + if (count > 0) { + arrayValues.push(metricVal); + arrayCounts.push(count); + } + } + } + + const timestampMs = this.normalizeTimestamp(dataPoint.endTime); + const record = this.createMetricRecord( + metric.descriptor.name, + metric.descriptor.unit, + metric.descriptor.description, + timestampMs, + dataPoint.attributes + ); + + // Set the histogram data in the format expected by CloudWatch EMF + record.expHistogramData = { + Values: arrayValues, + Counts: arrayCounts, + Count: dataPoint.value.count, + Sum: dataPoint.value.sum ?? 0, + Max: dataPoint.value.max ?? 0, + Min: dataPoint.value.min ?? 0, + }; + + return record; + } + + /** + * Create EMF log from metric records. + * + * Since metricRecords is already grouped by attributes, this function + * creates a single EMF log for all records. + */ + private createEmfLog( + metricRecords: MetricRecord[], + resource: Resource, + timestamp: number | undefined = undefined + ): EMFLog { + // Start with base structure and latest EMF version schema + // opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 + const emfLog: EMFLog = { + _aws: { + Timestamp: timestamp || Date.now(), + CloudWatchMetrics: [], + }, + Version: '1', + }; + + // Add resource attributes to EMF log but not as dimensions + // OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes + // as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, + // we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. + // And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. + if (resource && resource.attributes) { + for (const [key, value] of Object.entries(resource.attributes)) { + emfLog[`otel.resource.${key}`] = value?.toString() ?? 'undefined'; + } + } + + // Initialize collections for dimensions and metrics + const metricDefinitions: Metric[] = []; + // Collect attributes from all records (they should be the same for all records in the group) + // Only collect once from the first record and apply to all records + const allAttributes: Attributes = metricRecords.length > 0 ? metricRecords[0].attributes : {}; + + // Process each metric record + for (const record of metricRecords) { + const metricName = record.name; + if (!metricName) { + continue; + } + + if (record.expHistogramData) { + // Base2 Exponential Histogram + emfLog[metricName] = record.expHistogramData; + } else if (record.histogramData) { + // Regular Histogram metrics + emfLog[metricName] = record.histogramData; + } else if (record.value !== undefined) { + // Gauge, Sum, and other aggregations + emfLog[metricName] = record.value; + } else { + diag.debug(`Skipping metric ${metricName} as it does not have valid metric value`); + continue; + } + + const metricData: Metric = { + Name: metricName, + }; + + const unit = this.getUnit(record); + if (unit) { + metricData.Unit = unit; + } + metricDefinitions.push(metricData); + } + + const dimensionNames = this.getDimensionNames(allAttributes); + + // Add attribute values to the root of the EMF log + for (const [name, value] of Object.entries(allAttributes)) { + emfLog[name] = value?.toString() ?? 'undefined'; + } + + // Add CloudWatch Metrics if we have metrics, include dimensions only if they exist + if (metricDefinitions.length > 0) { + const cloudWatchMetric: CloudWatchMetric = { + Namespace: this.namespace, + Metrics: metricDefinitions, + }; + + if (dimensionNames.length > 0) { + cloudWatchMetric.Dimensions = [dimensionNames]; + } + + emfLog._aws.CloudWatchMetrics.push(cloudWatchMetric); + } + + return emfLog; + } + + /** + * Group metric record by attributes and timestamp. + * + * @param record The metric record + * @param timestampMs The timestamp in milliseconds + * @returns {[string, number]} Values for the key to group metrics + */ + private groupByAttributesAndTimestamp(record: MetricRecord): [string, number] { + // Create a key for grouping based on attributes + const attrsKey = this.getAttributesKey(record.attributes); + return [attrsKey, record.timestamp]; + } + + /** + * Method to handle safely pushing a MetricRecord into a Map of a Map of a list of MetricRecords + * + * @param groupedMetrics + * @param groupAttribute + * @param groupTimestamp + * @param record + */ + private pushMetricRecordIntoGroupedMetrics( + groupedMetrics: Map>, + groupAttribute: string, + groupTimestamp: number, + record: MetricRecord + ) { + let metricsGroupedByAttribute = groupedMetrics.get(groupAttribute); + if (!metricsGroupedByAttribute) { + metricsGroupedByAttribute = new Map(); + groupedMetrics.set(groupAttribute, metricsGroupedByAttribute); + } + + let metricsGroupedByAttributeAndTimestamp = metricsGroupedByAttribute.get(groupTimestamp); + if (!metricsGroupedByAttributeAndTimestamp) { + metricsGroupedByAttributeAndTimestamp = []; + metricsGroupedByAttribute.set(groupTimestamp, metricsGroupedByAttributeAndTimestamp); + } + metricsGroupedByAttributeAndTimestamp.push(record); + } + + /** + * Export metrics as EMF logs. + * Groups metrics by attributes and timestamp before creating EMF logs. + * + * @param resourceMetrics Resource Metrics data containing scope metrics + * @param resultCallback callback for when the export has completed + * @returns {Promise} + */ + public async export(resourceMetrics: ResourceMetrics, resultCallback: (result: ExportResult) => void) { + try { + if (!resourceMetrics) { + resultCallback({ code: ExportResultCode.SUCCESS }); + return; + } + + // Process all metrics from resource metrics their scope metrics + // The resource is now part of each resourceMetrics object + const resource = resourceMetrics.resource; + + for (const scopeMetrics of resourceMetrics.scopeMetrics) { + // Map of maps to group metrics by attributes and timestamp + // Keys: (attributesKey, timestampMs) + // Value: list of metric records + const groupedMetrics = new Map>(); + + // Process all metrics in this scope + for (const metric of scopeMetrics.metrics) { + // Convert metrics to a format compatible with createEmfLog + // Process metric.dataPoints for different metric types + if (metric.dataPointType === DataPointType.GAUGE || metric.dataPointType === DataPointType.SUM) { + for (const dataPoint of metric.dataPoints) { + const record = this.convertGaugeAndSum(metric, dataPoint); + const [groupAttribute, groupTimestamp] = this.groupByAttributesAndTimestamp(record); + this.pushMetricRecordIntoGroupedMetrics(groupedMetrics, groupAttribute, groupTimestamp, record); + } + } else if (metric.dataPointType === DataPointType.HISTOGRAM) { + for (const dataPoint of metric.dataPoints) { + const record = this.convertHistogram(metric, dataPoint); + const [groupAttribute, groupTimestamp] = this.groupByAttributesAndTimestamp(record); + this.pushMetricRecordIntoGroupedMetrics(groupedMetrics, groupAttribute, groupTimestamp, record); + } + } else if (metric.dataPointType === DataPointType.EXPONENTIAL_HISTOGRAM) { + for (const dataPoint of metric.dataPoints) { + const record = this.convertExpHistogram(metric, dataPoint); + const [groupAttribute, groupTimestamp] = this.groupByAttributesAndTimestamp(record); + this.pushMetricRecordIntoGroupedMetrics(groupedMetrics, groupAttribute, groupTimestamp, record); + } + } else { + // This else block should never run, all metric types are accounted for above + diag.debug(`Unsupported Metric Type in metric: ${metric}`); + } + } + + // Now process each group separately to create one EMF log per group + for (const [_, metricsRecordsGroupedByTimestamp] of groupedMetrics) { + for (const [timestampMs, metricRecords] of metricsRecordsGroupedByTimestamp) { + if (metricRecords) { + // Create and send EMF log for this batch of metrics + + // Convert to JSON + const logEvent = { + message: JSON.stringify(this.createEmfLog(metricRecords, resource, Number(timestampMs))), + timestamp: timestampMs, + }; + + // Send log events to the destination + await this.sendLogEvent(logEvent); + } + } + } + } + + resultCallback({ code: ExportResultCode.SUCCESS }); + } catch (e) { + diag.error(`Failed to export metrics: ${e}`); + const exportResult: ExportResult = { code: ExportResultCode.FAILED }; + if (e instanceof Error) { + exportResult.error = e; + } + resultCallback(exportResult); + } + } + + /** + * Send a log event to the destination (CloudWatch Logs, console, etc.). + * This method must be implemented by subclasses to define where the EMF logs are sent. + * + * @param logEvent The log event to send + * @returns {Promise} + */ + protected abstract sendLogEvent(logEvent: Required): Promise; + + /** + * Force flush any pending metrics. + * This method must be implemented by subclasses to forceFlush pending metrics. + */ + abstract forceFlush(): Promise; + + /** + * Shutdown the exporter. + * This method must be implemented by subclasses to gracefully shutdown + * and clean-up the exporter. + */ + abstract shutdown(): Promise; + + selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality { + return this.aggregationTemporalitySelector(instrumentType); + } + + selectAggregation(instrumentType: InstrumentType): Aggregation { + return this.aggregationSelector(instrumentType); + } +} diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/instrumentation-patch.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/instrumentation-patch.ts index b55175f8..9a336d74 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/instrumentation-patch.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/instrumentation-patch.ts @@ -34,7 +34,7 @@ import { } from './aws/services/bedrock'; import { SecretsManagerServiceExtension } from './aws/services/secretsmanager'; import { StepFunctionsServiceExtension } from './aws/services/step-functions'; -import { AwsLambdaInstrumentation } from '@opentelemetry/instrumentation-aws-lambda'; +import type { AwsLambdaInstrumentation } from '@opentelemetry/instrumentation-aws-lambda'; import type { Command as AwsV3Command } from '@aws-sdk/types'; export const traceContextEnvironmentKey = '_X_AMZN_TRACE_ID'; @@ -280,13 +280,20 @@ function patchAwsLambdaInstrumentation(instrumentation: Instrumentation): void { span.end(); const flushers = []; - if ((this as any)._traceForceFlusher) { - flushers.push((this as any)._traceForceFlusher()); + if (this['_traceForceFlusher']) { + flushers.push(this['_traceForceFlusher']()); } else { diag.error( 'Spans may not be exported for the lambda function because we are not force flushing before callback.' ); } + if (this['_metricForceFlusher']) { + flushers.push(this['_metricForceFlusher']()); + } else { + diag.debug( + 'Metrics may not be exported for the lambda function because we are not force flushing before callback.' + ); + } Promise.all(flushers).then(callback, callback); }; diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/register.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/register.ts index f24d8d2f..cb7e0bf9 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/register.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/register.ts @@ -13,7 +13,7 @@ if (process.env.OTEL_TRACES_SAMPLER === 'xray') { useXraySampler = true; } -import { diag, DiagConsoleLogger, trace } from '@opentelemetry/api'; +import { diag, DiagConsoleLogger, metrics, trace } from '@opentelemetry/api'; import { getNodeAutoInstrumentations, InstrumentationConfigMap } from '@opentelemetry/auto-instrumentations-node'; import { Instrumentation } from '@opentelemetry/instrumentation'; import * as opentelemetry from '@opentelemetry/sdk-node'; @@ -136,6 +136,7 @@ try { diag.info('Setting TraceProvider for instrumentations at the end of initialization'); for (const instrumentation of instrumentations) { instrumentation.setTracerProvider(trace.getTracerProvider()); + instrumentation.setMeterProvider(metrics.getMeterProvider()); } diag.debug(`Environment variable OTEL_PROPAGATORS is set to '${process.env.OTEL_PROPAGATORS}'`); diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts index 93c7fbd8..dfd8f9e1 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts @@ -64,6 +64,7 @@ import { OTLPAwsLogExporter } from '../src/exporter/otlp/aws/logs/otlp-aws-log-e import { OTLPAwsSpanExporter } from '../src/exporter/otlp/aws/traces/otlp-aws-span-exporter'; import { AwsCloudWatchOtlpBatchLogRecordProcessor } from '../src/exporter/otlp/aws/logs/aws-cw-otlp-batch-log-record-processor'; import { TRACE_PARENT_HEADER } from '@opentelemetry/core'; +import { ConsoleEMFExporter } from '../src/exporter/aws/metrics/console-emf-exporter'; // Tests AwsOpenTelemetryConfigurator after running Environment Variable setup in register.ts describe('AwsOpenTelemetryConfiguratorTest', () => { @@ -1307,14 +1308,6 @@ describe('AwsOpenTelemetryConfiguratorTest', () => { expect(process.env.OTEL_METRICS_EXPORTER).toEqual('first,third'); }); - it('testCreateEmfExporter', async () => { - process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = - 'x-aws-log-group=/test/log/group/name,x-aws-log-stream=test_log_stream_name,x-aws-metric-namespace=TEST_NAMESPACE'; - const exporter = createEmfExporter(); - expect(exporter).toBeInstanceOf(AWSCloudWatchEMFExporter); - delete process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS; - }); - it('testIsAwsOtlpEndpoint', () => { expect(isAwsOtlpEndpoint('https://xray.us-east-1.amazonaws.com/v1/traces', 'xray')).toBeTruthy(); expect(isAwsOtlpEndpoint('https://lambda.us-east-1.amazonaws.com/v1/traces', 'xray')).toBeFalsy(); @@ -1339,9 +1332,9 @@ describe('AwsOpenTelemetryConfiguratorTest', () => { headerSettings = validateAndFetchLogsHeader(); expect(headerSettings).toEqual({ isValid: false, - logGroup: '', - logStream: '', - namespace: '', + logGroup: undefined, + logStream: undefined, + namespace: undefined, }); }); @@ -1396,4 +1389,56 @@ describe('AwsOpenTelemetryConfiguratorTest', () => { delete process.env.AGENT_OBSERVABILITY_ENABLED; }); + + describe('Test createEmfExporter', () => { + beforeEach(() => { + delete process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS; + delete process.env.AWS_LAMBDA_FUNCTION_NAME; + }); + + afterEach(() => { + sinon.restore(); + }); + + it('Test createEmfExporter in Lambda with valid headers', () => { + process.env.AWS_LAMBDA_FUNCTION_NAME = 'lambdaFunctionName'; + process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = + 'x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace'; + + const result = createEmfExporter(); + expect(result).toBeInstanceOf(AWSCloudWatchEMFExporter); + }); + + it('creates Console EMF exporter in Lambda with invalid headers', () => { + process.env.AWS_LAMBDA_FUNCTION_NAME = 'lambdaFunctionName'; + process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = 'x-aws-metric-namespace=test-namespace'; + + const result = createEmfExporter(); + expect(result).toBeInstanceOf(ConsoleEMFExporter); + }); + + it('creates Console EMF exporter in Lambda with invalid headers and no namespace defaults to "default" namespace', () => { + process.env.AWS_LAMBDA_FUNCTION_NAME = 'lambdaFunctionName'; + process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = 'nothing=here'; + + const result = createEmfExporter(); + expect(result).toBeInstanceOf(ConsoleEMFExporter); + expect(result!['namespace']).toEqual('default'); + }); + + it('creates CloudWatch EMF exporter in non-Lambda environment', () => { + process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = + 'x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace'; + + const result = createEmfExporter(); + expect(result).toBeInstanceOf(AWSCloudWatchEMFExporter); + }); + + it('returns undefined when CloudWatch EMF exporter creation fails in non-Lambda environment due to invalid headers', () => { + process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = 'x-aws-log-stream=test-stream'; + + const result = createEmfExporter(); + expect(result).toBeUndefined(); + }); + }); }); diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts index 10260e75..23ec223d 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts @@ -30,14 +30,12 @@ import { import { ExportResultCode } from '@opentelemetry/core'; import { expect } from 'expect'; import * as sinon from 'sinon'; -import { - AWSCloudWatchEMFExporter, - MetricRecord, -} from '../../../../src/exporter/aws/metrics/aws-cloudwatch-emf-exporter'; +import { MetricRecord } from '../../../../src/exporter/aws/metrics/emf-exporter-base'; import { Resource } from '@opentelemetry/resources'; import { LogEventBatch } from '../../../../src/exporter/aws/metrics/cloudwatch-logs-client'; +import { AWSCloudWatchEMFExporter } from '../../../../src/exporter/aws/metrics/aws-cloudwatch-emf-exporter'; -describe('TestAwsCloudWatchEmfExporter', () => { +describe('TestAWSCloudWatchEMFExporter', () => { let exporter: AWSCloudWatchEMFExporter; const { CloudWatchLogs } = require('@aws-sdk/client-cloudwatch-logs'); diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/console-emf-exporter-test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/console-emf-exporter-test.ts new file mode 100644 index 00000000..2bff569d --- /dev/null +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/console-emf-exporter-test.ts @@ -0,0 +1,79 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { expect } from 'expect'; +import * as sinon from 'sinon'; +import { ConsoleEMFExporter } from '../../../../src/exporter/aws/metrics/console-emf-exporter'; +import { EMFLog } from '../../../../src/exporter/aws/metrics/emf-exporter-base'; + +describe('TestConsoleEMFExporter', () => { + let exporter: ConsoleEMFExporter; + + beforeEach(() => { + /* Set up test fixtures. */ + exporter = new ConsoleEMFExporter(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('TestNamespaceInitialization', () => { + /* Test exporter initialization with different namespace scenarios. */ + // Test default namespace + const defaultExporter = new ConsoleEMFExporter(); + expect(defaultExporter['namespace']).toEqual('default'); + + // Test custom namespace + const customExporter = new ConsoleEMFExporter('CustomNamespace'); + expect(customExporter['namespace']).toEqual('CustomNamespace'); + + // Test undefined namespace (should default to 'default') + const undefinedNamespaceExporter = new ConsoleEMFExporter(undefined); + expect(undefinedNamespaceExporter['namespace']).toEqual('default'); + }); + + it('TestSendLogEvent', () => { + /* Test that log events are properly sent to console output. */ + // Create a simple log event with EMF-formatted message + const testMessage: EMFLog = { + _aws: { + Timestamp: 1640995200000, + CloudWatchMetrics: [ + { + Namespace: 'TestNamespace', + Dimensions: [['Service']], + Metrics: [ + { + Name: 'TestMetric', + Unit: 'Count', + }, + ], + }, + ], + }, + Service: 'test-service', + TestMetric: 42, + Version: '1', + }; + + const logEvent = { + message: JSON.stringify(testMessage), + timestamp: 1640995200000, + }; + + // Spy on console.log + const consoleLogSpy = sinon.spy(console, 'log'); + + // Call the method + exporter['sendLogEvent'](logEvent); + + // Verify the message was printed to console.log + expect(consoleLogSpy.calledOnce).toBeTruthy(); + expect(consoleLogSpy.calledWith(logEvent.message)).toBeTruthy(); + + // Verify the content of the logged message + const loggedMessage = JSON.parse(consoleLogSpy.firstCall.args[0]); + expect(loggedMessage).toEqual(testMessage); + }); +}); diff --git a/lambda-layer/packages/layer/scripts/otel-instrument b/lambda-layer/packages/layer/scripts/otel-instrument index 582cd20a..1e3c54c9 100644 --- a/lambda-layer/packages/layer/scripts/otel-instrument +++ b/lambda-layer/packages/layer/scripts/otel-instrument @@ -76,8 +76,9 @@ if [ -z "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" ]; then export OTEL_AWS_APPLICATION_SIGNALS_ENABLED="true"; fi +# - Enable emf exporter by default if [ -z "${OTEL_METRICS_EXPORTER}" ]; then - export OTEL_METRICS_EXPORTER="none"; + export OTEL_METRICS_EXPORTER="awsemf"; fi # - Append Lambda Resource Attributes to OTel Resource Attribute List