Skip to content

Commit 6d182ad

Browse files
authored
Add AwsCloudWatchOtlpBatchLogRecordProcessor for handling Large Log Objects (#209)
*Description of changes:* JS version of - aws-observability/aws-otel-python-instrumentation#402 By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent a02218c commit 6d182ad

File tree

4 files changed

+515
-2
lines changed

4 files changed

+515
-2
lines changed

aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ import { isAgentObservabilityEnabled } from './utils';
7979
import { BaggageSpanProcessor } from '@opentelemetry/baggage-span-processor';
8080
import { logs } from '@opentelemetry/api-logs';
8181
import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys';
82+
import { AwsCloudWatchOtlpBatchLogRecordProcessor } from './exporter/otlp/aws/logs/aws-cw-otlp-batch-log-record-processor';
8283

8384
const AWS_TRACES_OTLP_ENDPOINT_PATTERN = '^https://xray\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/traces$';
8485
const AWS_LOGS_OTLP_ENDPOINT_PATTERN = '^https://logs\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/logs$';
@@ -522,9 +523,11 @@ export class AwsLoggerProcessorProvider {
522523
return exporters.map(exporter => {
523524
if (exporter instanceof ConsoleLogRecordExporter) {
524525
return new SimpleLogRecordProcessor(exporter);
525-
} else {
526-
return new BatchLogRecordProcessor(exporter);
527526
}
527+
if (exporter instanceof OTLPAwsLogExporter && isAgentObservabilityEnabled()) {
528+
return new AwsCloudWatchOtlpBatchLogRecordProcessor(exporter);
529+
}
530+
return new BatchLogRecordProcessor(exporter);
528531
});
529532
}
530533

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
import { LogRecord, BufferConfig, BatchLogRecordProcessor } from '@opentelemetry/sdk-logs';
4+
import { AnyValue } from '@opentelemetry/api-logs';
5+
import { callWithTimeout } from '@opentelemetry/core';
6+
import { OTLPAwsLogExporter } from './otlp-aws-log-exporter';
7+
8+
/*
9+
* OTel log events include fixed metadata attributes so the estimated metadata size
10+
* possibly be calculated as this with best efforts:
11+
* service.name (255 chars) + cloud.resource_id (max ARN length) + telemetry.xxx (~20 chars) +
12+
* common attributes (255 chars) +
13+
* scope + flags + traceId + spanId + numeric/timestamp fields + ...
14+
* Example log structure:
15+
* {
16+
* "resource": {
17+
* "attributes": {
18+
* "aws.local.service": "example-service123",
19+
* "telemetry.sdk.language": "python",
20+
* "service.name": "my-application",
21+
* "cloud.resource_id": "example-resource",
22+
* "aws.log.group.names": "example-log-group",
23+
* "aws.ai.agent.type": "default",
24+
* "telemetry.sdk.version": "1.x.x",
25+
* "telemetry.auto.version": "0.x.x",
26+
* "telemetry.sdk.name": "opentelemetry"
27+
* }
28+
* },
29+
* "scope": {"name": "example.instrumentation.library"},
30+
* "timeUnixNano": 1234567890123456789,
31+
* "observedTimeUnixNano": 1234567890987654321,
32+
* "severityNumber": 9,
33+
* "body": {...},
34+
* "attributes": {...},
35+
* "flags": 1,
36+
* "traceId": "abcd1234efgh5678ijkl9012mnop3456",
37+
* "spanId": "1234abcd5678efgh"
38+
* }
39+
* 2000 might be a bit of an overestimate but it's better to overestimate the size of the log
40+
* and suffer a small performance impact with batching than it is to underestimate and risk
41+
* a large log being dropped when sent to the AWS otlp endpoint.
42+
*/
43+
export const BASE_LOG_BUFFER_BYTE_SIZE: number = 2000;
44+
45+
// Maximum uncompressed/unserialized bytes / request -
46+
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
47+
export const MAX_LOG_REQUEST_BYTE_SIZE: number = 1048576;
48+
49+
/**
50+
* Custom implementation of BatchLogRecordProcessor that manages log record batching
51+
* with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits.
52+
*
53+
* This processor still exports all logs up to _MAX_LOG_REQUEST_BYTE_SIZE but rather than doing exactly
54+
* one export, we will estimate log sizes and do multiple batch exports
55+
* where each exported batch will have an additional constraint:
56+
*
57+
* If the batch to be exported will have a data size of > 1 MB:
58+
* The batch will be split into multiple exports of sub-batches of data size <= 1 MB.
59+
*
60+
* A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it.
61+
*
62+
*/
63+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
64+
// @ts-ignore
65+
export class AwsCloudWatchOtlpBatchLogRecordProcessor extends BatchLogRecordProcessor {
66+
constructor(exporter: OTLPAwsLogExporter, config?: BufferConfig) {
67+
super(exporter, config);
68+
}
69+
70+
/**
71+
* Explicitly overrides upstream _flushOneBatch method to add AWS CloudWatch size-based batching.
72+
* Returns a list of promise export requests where each promise will be estimated to be at or under
73+
* the 1 MB limit for CloudWatch Logs OTLP endpoint.
74+
*
75+
* Estimated data size of exported batches will typically be <= 1 MB except for the case below:
76+
* If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
77+
*/
78+
override _flushOneBatch(): Promise<void> {
79+
this['_clearTimer']();
80+
81+
if (this['_finishedLogRecords'].length === 0) {
82+
return Promise.resolve();
83+
}
84+
85+
const logsToExport: LogRecord[] = this['_finishedLogRecords'].splice(0, this['_maxExportBatchSize']);
86+
let batch: LogRecord[] = [];
87+
let batchDataSize = 0;
88+
const exportPromises: Promise<void>[] = [];
89+
90+
for (const logData of logsToExport) {
91+
const logSize = AwsCloudWatchOtlpBatchLogRecordProcessor.estimateLogSize(logData);
92+
93+
if (batch.length > 0 && batchDataSize + logSize > MAX_LOG_REQUEST_BYTE_SIZE) {
94+
exportPromises.push(callWithTimeout(this['_export'](batch), this['_exportTimeoutMillis']));
95+
batchDataSize = 0;
96+
batch = [];
97+
}
98+
99+
batchDataSize += logSize;
100+
batch.push(logData);
101+
}
102+
103+
if (batch.length > 0) {
104+
exportPromises.push(callWithTimeout(this['_export'](batch), this['_exportTimeoutMillis']));
105+
}
106+
// Explicitly returns Promise<void> because of upstream's method signature for this function
107+
return Promise.all(exportPromises)
108+
.then(() => {})
109+
.catch();
110+
}
111+
112+
/**
113+
* Estimates the size in bytes of a log by calculating the size of its body and its attributes
114+
* and adding a buffer amount to account for other log metadata information.
115+
* Will process complex log structures up to the specified depth limit.
116+
* Includes cycle detection to prevent processing the log content more than once.
117+
* If the depth limit of the log structure is exceeded, returns the truncated calculation
118+
* to everything up to that point.
119+
*
120+
* We set depth to 3 as this is the minimum required depth to estimate our consolidated Gen AI log events:
121+
*
122+
* Example structure:
123+
* {
124+
* "output": {
125+
* "messages": [
126+
* {
127+
* "content": "Hello, World!",
128+
* "role": "assistant"
129+
* }
130+
* ]
131+
* },
132+
* "input": {
133+
* "messages": [
134+
* {
135+
* "content": "Say Hello, World!",
136+
* "role": "user"
137+
* }
138+
* ]
139+
* }
140+
* }
141+
*
142+
* @param log - The Log object to calculate size for
143+
* @param depth - Maximum depth to traverse in nested structures (default: 3)
144+
* @returns The estimated size of the log object in bytes
145+
*/
146+
private static estimateLogSize(log: LogRecord, maxDepth: number = 3): number {
147+
// Queue contains tuples of [log_content, depth] where:
148+
// - log_content is the current piece of log data being processed
149+
// - depth tracks how many levels deep we've traversed to reach this content
150+
// - body starts at depth 0 since it's an AnyValue object
151+
// - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will
152+
// start processing its keys at depth 0
153+
let queue: Array<[AnyValue, number]> = [
154+
[log.body, 0],
155+
[log.attributes, -1],
156+
];
157+
158+
// Track visited complex log contents to avoid calculating the same one more than once
159+
const visited = new Set<object>();
160+
161+
let size: number = BASE_LOG_BUFFER_BYTE_SIZE;
162+
163+
while (queue.length > 0) {
164+
const newQueue: Array<[AnyValue, number]> = [];
165+
166+
for (const [nextVal, currentDepth] of queue) {
167+
// Small optimization, can stop calculating the size once it reaches the 1 MB limit
168+
if (size >= MAX_LOG_REQUEST_BYTE_SIZE) {
169+
return size;
170+
}
171+
172+
if (nextVal == null) {
173+
continue;
174+
}
175+
176+
if (typeof nextVal === 'number' || typeof nextVal === 'boolean' || typeof nextVal === 'string') {
177+
size += this.estimateUtf8Size(nextVal.toString());
178+
continue;
179+
}
180+
181+
if (nextVal instanceof Uint8Array) {
182+
size += nextVal.byteLength;
183+
continue;
184+
}
185+
186+
// nextVal must be Array or AnyValueMap
187+
if (currentDepth <= maxDepth && !visited.has(nextVal)) {
188+
visited.add(nextVal);
189+
190+
if (Array.isArray(nextVal)) {
191+
for (const content of nextVal) {
192+
newQueue.push([content, currentDepth + 1]);
193+
}
194+
continue;
195+
}
196+
if (typeof nextVal === 'object') {
197+
for (const key in nextVal) {
198+
size += AwsCloudWatchOtlpBatchLogRecordProcessor.estimateUtf8Size(key);
199+
newQueue.push([nextVal[key], currentDepth + 1]);
200+
}
201+
}
202+
}
203+
}
204+
queue = newQueue;
205+
}
206+
return size;
207+
}
208+
209+
private static estimateUtf8Size(s: string): number {
210+
let asciiCount = 0;
211+
let nonAsciiCount = 0;
212+
213+
for (const char of s) {
214+
if (char.charCodeAt(0) < 128) {
215+
asciiCount += 1;
216+
} else {
217+
nonAsciiCount += 1;
218+
}
219+
}
220+
221+
return asciiCount + nonAsciiCount * 4;
222+
}
223+
}

aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import {
6262
} from '@opentelemetry/sdk-logs';
6363
import { OTLPAwsLogExporter } from '../src/exporter/otlp/aws/logs/otlp-aws-log-exporter';
6464
import { OTLPAwsSpanExporter } from '../src/exporter/otlp/aws/traces/otlp-aws-span-exporter';
65+
import { AwsCloudWatchOtlpBatchLogRecordProcessor } from '../src/exporter/otlp/aws/logs/aws-cw-otlp-batch-log-record-processor';
6566

6667
// Tests AwsOpenTelemetryConfigurator after running Environment Variable setup in register.ts
6768
describe('AwsOpenTelemetryConfiguratorTest', () => {
@@ -736,10 +737,19 @@ describe('AwsOpenTelemetryConfiguratorTest', () => {
736737
expect(config.logRecordProcessors?.length).toEqual(1);
737738
expect((config.logRecordProcessors as any)[0]._exporter).toBeInstanceOf(OTLPAwsLogExporter);
738739

740+
process.env.AGENT_OBSERVABILITY_ENABLED = 'true';
741+
742+
// Test Agent Observability for AWS OTLP logs endpoint uses OTLPAwsLogExporter and AwsCloudWatchOtlpBatchLogRecordProcessor
743+
config = new AwsOpentelemetryConfigurator([]).configure();
744+
expect(config.logRecordProcessors?.length).toEqual(1);
745+
expect(config.logRecordProcessors![0]).toBeInstanceOf(AwsCloudWatchOtlpBatchLogRecordProcessor);
746+
expect((config.logRecordProcessors as any)[0]._exporter).toBeInstanceOf(OTLPAwsLogExporter);
747+
739748
// Cleanup
740749
delete process.env.OTEL_LOGS_EXPORTER;
741750
delete process.env.OTEL_EXPORTER_OTLP_LOGS_ENDPOINT;
742751
delete process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS;
752+
delete process.env.AGENT_OBSERVABILITY_ENABLED;
743753
});
744754

745755
it('ResourceDetectorInputValidationTest', () => {
@@ -901,7 +911,21 @@ describe('AwsOpenTelemetryConfiguratorTest', () => {
901911
expect(logRecordProcessors).toHaveLength(1);
902912
expect(logRecordProcessors[0]).toBeInstanceOf(SimpleLogRecordProcessor);
903913

914+
process.env.AGENT_OBSERVABILITY_ENABLED = 'true';
915+
process.env.OTEL_LOGS_EXPORTER = 'otlp';
916+
process.env.OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = 'https://logs.us-east-1.amazonaws.com/v1/logs';
917+
process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = 'x-aws-log-group=my-group,x-aws-log-stream=my-stream';
918+
919+
logRecordProcessors = AwsLoggerProcessorProvider.getlogRecordProcessors();
920+
921+
expect(logRecordProcessors).toHaveLength(1);
922+
expect(logRecordProcessors[0]).toBeInstanceOf(AwsCloudWatchOtlpBatchLogRecordProcessor);
923+
924+
delete process.env.OTEL_LOGS_EXPORTER;
925+
delete process.env.AGENT_OBSERVABILITY_ENABLED;
904926
delete process.env.OTEL_LOGS_EXPORTER;
927+
delete process.env.OTEL_EXPORTER_OTLP_LOGS_ENDPOINT;
928+
delete process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS;
905929
});
906930

907931
it('configureLogExportersFromEnv', () => {

0 commit comments

Comments
 (0)