Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import { isAgentObservabilityEnabled } from './utils';
import { BaggageSpanProcessor } from '@opentelemetry/baggage-span-processor';
import { logs } from '@opentelemetry/api-logs';
import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys';
import { AwsCloudWatchOtlpBatchLogRecordProcessor } from './exporter/otlp/aws/logs/aws-cw-otlp-batch-log-record-processor';

const AWS_TRACES_OTLP_ENDPOINT_PATTERN = '^https://xray\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/traces$';
const AWS_LOGS_OTLP_ENDPOINT_PATTERN = '^https://logs\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/logs$';
Expand Down Expand Up @@ -522,9 +523,11 @@ export class AwsLoggerProcessorProvider {
return exporters.map(exporter => {
if (exporter instanceof ConsoleLogRecordExporter) {
return new SimpleLogRecordProcessor(exporter);
} else {
return new BatchLogRecordProcessor(exporter);
}
if (exporter instanceof OTLPAwsLogExporter && isAgentObservabilityEnabled()) {
return new AwsCloudWatchOtlpBatchLogRecordProcessor(exporter);
}
return new BatchLogRecordProcessor(exporter);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { LogRecord, BufferConfig, BatchLogRecordProcessor } from '@opentelemetry/sdk-logs';
import { AnyValue, AnyValueMap } from '@opentelemetry/api-logs';
import { callWithTimeout } from '@opentelemetry/core';
import { OTLPAwsLogExporter } from './otlp-aws-log-exporter';

/*
* OTel log events include fixed metadata attributes so the estimated metadata size
* possibly be calculated as this with best efforts:
* service.name (255 chars) + cloud.resource_id (max ARN length) + telemetry.xxx (~20 chars) +
* common attributes (255 chars) +
* scope + flags + traceId + spanId + numeric/timestamp fields + ...
* Example log structure:
* {
* "resource": {
* "attributes": {
* "aws.local.service": "example-service123",
* "telemetry.sdk.language": "python",
* "service.name": "my-application",
* "cloud.resource_id": "example-resource",
* "aws.log.group.names": "example-log-group",
* "aws.ai.agent.type": "default",
* "telemetry.sdk.version": "1.x.x",
* "telemetry.auto.version": "0.x.x",
* "telemetry.sdk.name": "opentelemetry"
* }
* },
* "scope": {"name": "example.instrumentation.library"},
* "timeUnixNano": 1234567890123456789,
* "observedTimeUnixNano": 1234567890987654321,
* "severityNumber": 9,
* "body": {...},
* "attributes": {...},
* "flags": 1,
* "traceId": "abcd1234efgh5678ijkl9012mnop3456",
* "spanId": "1234abcd5678efgh"
* }
* 2000 might be a bit of an overestimate but it's better to overestimate the size of the log
* and suffer a small performance impact with batching than it is to underestimate and risk
* a large log being dropped when sent to the AWS otlp endpoint.
*/
export const BASE_LOG_BUFFER_BYTE_SIZE: number = 2000;

// Maximum uncompressed/unserialized bytes / request -
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
export const MAX_LOG_REQUEST_BYTE_SIZE: number = 1048576;

/**
* Custom implementation of BatchLogRecordProcessor that manages log record batching
* with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits.
*
* This processor still exports all logs up to _MAX_LOG_REQUEST_BYTE_SIZE but rather than doing exactly
* one export, we will estimate log sizes and do multiple batch exports
* where each exported batch will have an additional constraint:
*
* If the batch to be exported will have a data size of > 1 MB:
* The batch will be split into multiple exports of sub-batches of data size <= 1 MB.
*
* A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it.
*
*/
export class AwsCloudWatchOtlpBatchLogRecordProcessor extends BatchLogRecordProcessor {
constructor(exporter: OTLPAwsLogExporter, config?: BufferConfig) {
super(exporter, config);
(this as any)._flushOneBatch = () => this._flushSizeLimitedBatch();
}

/**
* Explicitly overrides upstream _flushOneBatch method to add AWS CloudWatch size-based batching.
* Returns a list of promise export requests where each promise will be estimated to be at or under
* the 1 MB limit for CloudWatch Logs OTLP endpoint.
*
* Estimated data size of exported batches will typically be <= 1 MB except for the case below:
* If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
*/
private _flushSizeLimitedBatch(): Promise<void> {
const processor = this as any;

processor._clearTimer();

if (processor._finishedLogRecords.length === 0) {
return Promise.resolve();
}

const logsToExport: LogRecord[] = processor._finishedLogRecords.splice(0, processor._maxExportBatchSize);
let batch: LogRecord[] = [];
let batchDataSize = 0;
const exportPromises: Promise<void>[] = [];

for (let i = 0; i < logsToExport.length; i += 1) {
const logData = logsToExport[i];
const logSize = AwsCloudWatchOtlpBatchLogRecordProcessor.estimateLogSize(logData);

if (batch.length > 0 && batchDataSize + logSize > MAX_LOG_REQUEST_BYTE_SIZE) {
exportPromises.push(callWithTimeout(processor._export(batch), processor._exportTimeoutMillis));
batchDataSize = 0;
batch = [];
}

batchDataSize += logSize;
batch.push(logData);
}

if (batch.length > 0) {
exportPromises.push(callWithTimeout(processor._export(batch), processor._exportTimeoutMillis));
}

return new Promise((resolve, reject) => {
Promise.all(exportPromises)
.then(() => resolve())
.catch(reject);
});
}

/**
* Estimates the size in bytes of a log by calculating the size of its body and its attributes
* and adding a buffer amount to account for other log metadata information.
* Will process complex log structures up to the specified depth limit.
* Includes cycle detection to prevent processing the log content more than once.
* If the depth limit of the log structure is exceeded, returns the truncated calculation
* to everything up to that point.
*
* @param log - The Log object to calculate size for
* @param depth - Maximum depth to traverse in nested structures (default: 3)
* @returns The estimated size of the log object in bytes
*/
private static estimateLogSize(log: LogRecord, depth: number = 3): number {
// Queue contains tuples of [log_content, depth] where:
// - log_content is the current piece of log data being processed
// - depth tracks how many levels deep we've traversed to reach this content
// - body starts at depth 0 since it's an AnyValue object
// - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will
// start processing its keys at depth 0
let queue: Array<[AnyValue, number]> = [
[log.body, 0],
[log.attributes, -1],
];

// Track visited complex log contents to avoid calculating the same one more than once
const visited = new Set<object>();

let size: number = BASE_LOG_BUFFER_BYTE_SIZE;

while (queue.length > 0) {
const newQueue: Array<[AnyValue, number]> = [];

for (const [nextVal, currentDepth] of queue) {
// Small optimization, can stop calculating the size once it reaches the 1 MB limit
if (size >= MAX_LOG_REQUEST_BYTE_SIZE) {
return size;
}

if (nextVal === null || nextVal === undefined) {
continue;
}

if (typeof nextVal === 'number' || typeof nextVal === 'boolean' || typeof nextVal === 'string') {
size += this.estimateUtf8Size(nextVal.toString());
continue;
}

if (nextVal instanceof Uint8Array) {
size += nextVal.byteLength;
continue;
}

// nextVal must be Array or AnyValueMap
if (currentDepth <= depth) {
if (visited.has(nextVal)) {
continue;
}
visited.add(nextVal);

if (Array.isArray(nextVal)) {
for (const content of nextVal) {
newQueue.push([content, currentDepth + 1]);
}
continue;
}
// It's an AnyValueMap
const map = nextVal as AnyValueMap;
for (const key in map) {
size += AwsCloudWatchOtlpBatchLogRecordProcessor.estimateUtf8Size(key);
newQueue.push([map[key], currentDepth + 1]);
}
}
}
queue = newQueue;
}
return size;
}

private static estimateUtf8Size(s: string): number {
let asciiCount = 0;
let nonAsciiCount = 0;

for (const char of s) {
if (char.charCodeAt(0) < 128) {
asciiCount += 1;
} else {
nonAsciiCount += 1;
}
}

return asciiCount + nonAsciiCount * 4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import {
} from '@opentelemetry/sdk-logs';
import { OTLPAwsLogExporter } from '../src/exporter/otlp/aws/logs/otlp-aws-log-exporter';
import { OTLPAwsSpanExporter } from '../src/exporter/otlp/aws/traces/otlp-aws-span-exporter';
import { AwsCloudWatchOtlpBatchLogRecordProcessor } from '../src/exporter/otlp/aws/logs/aws-cw-otlp-batch-log-record-processor';

// Tests AwsOpenTelemetryConfigurator after running Environment Variable setup in register.ts
describe('AwsOpenTelemetryConfiguratorTest', () => {
Expand Down Expand Up @@ -736,10 +737,19 @@ describe('AwsOpenTelemetryConfiguratorTest', () => {
expect(config.logRecordProcessors?.length).toEqual(1);
expect((config.logRecordProcessors as any)[0]._exporter).toBeInstanceOf(OTLPAwsLogExporter);

process.env.AGENT_OBSERVABILITY_ENABLED = 'true';

// Test Agent Observability for AWS OTLP logs endpoint uses OTLPAwsLogExporter and AwsCloudWatchOtlpBatchLogRecordProcessor
config = new AwsOpentelemetryConfigurator([]).configure();
expect(config.logRecordProcessors?.length).toEqual(1);
expect(config.logRecordProcessors![0]).toBeInstanceOf(AwsCloudWatchOtlpBatchLogRecordProcessor);
expect((config.logRecordProcessors as any)[0]._exporter).toBeInstanceOf(OTLPAwsLogExporter);

// Cleanup
delete process.env.OTEL_LOGS_EXPORTER;
delete process.env.OTEL_EXPORTER_OTLP_LOGS_ENDPOINT;
delete process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS;
delete process.env.AGENT_OBSERVABILITY_ENABLED;
});

it('ResourceDetectorInputValidationTest', () => {
Expand Down Expand Up @@ -901,7 +911,21 @@ describe('AwsOpenTelemetryConfiguratorTest', () => {
expect(logRecordProcessors).toHaveLength(1);
expect(logRecordProcessors[0]).toBeInstanceOf(SimpleLogRecordProcessor);

process.env.AGENT_OBSERVABILITY_ENABLED = 'true';
process.env.OTEL_LOGS_EXPORTER = 'otlp';
process.env.OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = 'https://logs.us-east-1.amazonaws.com/v1/logs';
process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = 'x-aws-log-group=my-group,x-aws-log-stream=my-stream';

logRecordProcessors = AwsLoggerProcessorProvider.getlogRecordProcessors();

expect(logRecordProcessors).toHaveLength(1);
expect(logRecordProcessors[0]).toBeInstanceOf(AwsCloudWatchOtlpBatchLogRecordProcessor);

delete process.env.OTEL_LOGS_EXPORTER;
delete process.env.AGENT_OBSERVABILITY_ENABLED;
delete process.env.OTEL_LOGS_EXPORTER;
delete process.env.OTEL_EXPORTER_OTLP_LOGS_ENDPOINT;
delete process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS;
});

it('configureLogExportersFromEnv', () => {
Expand Down
Loading
Loading