Skip to content

Commit e1ff129

Browse files
committed
add utf-8 estimation
1 parent 7424857 commit e1ff129

File tree

2 files changed

+212
-4
lines changed

2 files changed

+212
-4
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
import { LogRecord, BufferConfig, BatchLogRecordProcessor } from '@opentelemetry/sdk-logs';
4+
import { AnyValue, AnyValueMap } from '@opentelemetry/api-logs';
5+
import { callWithTimeout } from '@opentelemetry/core';
6+
import { OTLPAwsLogExporter } from './otlp-aws-log-exporter';
7+
8+
/*
9+
* OTel log events include fixed metadata attributes so the estimated metadata size
10+
* possibly be calculated as this with best efforts:
11+
* service.name (255 chars) + cloud.resource_id (max ARN length) + telemetry.xxx (~20 chars) +
12+
* common attributes (255 chars) +
13+
* scope + flags + traceId + spanId + numeric/timestamp fields + ...
14+
* Example log structure:
15+
* {
16+
* "resource": {
17+
* "attributes": {
18+
* "aws.local.service": "example-service123",
19+
* "telemetry.sdk.language": "python",
20+
* "service.name": "my-application",
21+
* "cloud.resource_id": "example-resource",
22+
* "aws.log.group.names": "example-log-group",
23+
* "aws.ai.agent.type": "default",
24+
* "telemetry.sdk.version": "1.x.x",
25+
* "telemetry.auto.version": "0.x.x",
26+
* "telemetry.sdk.name": "opentelemetry"
27+
* }
28+
* },
29+
* "scope": {"name": "example.instrumentation.library"},
30+
* "timeUnixNano": 1234567890123456789,
31+
* "observedTimeUnixNano": 1234567890987654321,
32+
* "severityNumber": 9,
33+
* "body": {...},
34+
* "attributes": {...},
35+
* "flags": 1,
36+
* "traceId": "abcd1234efgh5678ijkl9012mnop3456",
37+
* "spanId": "1234abcd5678efgh"
38+
* }
39+
* 2000 might be a bit of an overestimate but it's better to overestimate the size of the log
40+
* and suffer a small performance impact with batching than it is to underestimate and risk
41+
* a large log being dropped when sent to the AWS otlp endpoint.
42+
*/
43+
export const BASE_LOG_BUFFER_BYTE_SIZE: number = 2000;
44+
45+
// Maximum uncompressed/unserialized bytes / request -
46+
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
47+
export const MAX_LOG_REQUEST_BYTE_SIZE: number = 1048576;
48+
49+
/**
50+
* Custom implementation of BatchLogRecordProcessor that manages log record batching
51+
* with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits.
52+
*
53+
* This processor still exports all logs up to _MAX_LOG_REQUEST_BYTE_SIZE but rather than doing exactly
54+
* one export, we will estimate log sizes and do multiple batch exports
55+
* where each exported batch will have an additional constraint:
56+
*
57+
* If the batch to be exported will have a data size of > 1 MB:
58+
* The batch will be split into multiple exports of sub-batches of data size <= 1 MB.
59+
*
60+
* A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it.
61+
*
62+
*/
63+
export class AwsCloudWatchOtlpBatchLogRecordProcessor extends BatchLogRecordProcessor {
64+
constructor(exporter: OTLPAwsLogExporter, config?: BufferConfig) {
65+
super(exporter, config);
66+
(this as any)._flushOneBatch = () => this._flushSizeLimitedBatch();
67+
}
68+
69+
/**
70+
* Explicitly overrides upstream _flushOneBatch method to add AWS CloudWatch size-based batching.
71+
* Returns a list of promise export requests where each promise will be estimated to be at or under
72+
* the 1 MB limit for CloudWatch Logs OTLP endpoint.
73+
*
74+
* Estimated data size of exported batches will typically be <= 1 MB except for the case below:
75+
* If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
76+
*/
77+
private _flushSizeLimitedBatch(): Promise<void> {
78+
const processor = this as any;
79+
80+
processor._clearTimer();
81+
82+
if (processor._finishedLogRecords.length === 0) {
83+
return Promise.resolve();
84+
}
85+
86+
const logsToExport: LogRecord[] = processor._finishedLogRecords.splice(0, processor._maxExportBatchSize);
87+
let batch: LogRecord[] = [];
88+
let batchDataSize = 0;
89+
const exportPromises: Promise<void>[] = [];
90+
91+
for (let i = 0; i < logsToExport.length; i += 1) {
92+
const logData = logsToExport[i];
93+
const logSize = AwsCloudWatchOtlpBatchLogRecordProcessor.estimateLogSize(logData);
94+
95+
if (batch.length > 0 && batchDataSize + logSize > MAX_LOG_REQUEST_BYTE_SIZE) {
96+
exportPromises.push(callWithTimeout(processor._export(batch), processor._exportTimeoutMillis));
97+
batchDataSize = 0;
98+
batch = [];
99+
}
100+
101+
batchDataSize += logSize;
102+
batch.push(logData);
103+
}
104+
105+
if (batch.length > 0) {
106+
exportPromises.push(callWithTimeout(processor._export(batch), processor._exportTimeoutMillis));
107+
}
108+
109+
return new Promise((resolve, reject) => {
110+
Promise.all(exportPromises)
111+
.then(() => resolve())
112+
.catch(reject);
113+
});
114+
}
115+
116+
/**
117+
* Estimates the size in bytes of a log by calculating the size of its body and its attributes
118+
* and adding a buffer amount to account for other log metadata information.
119+
* Will process complex log structures up to the specified depth limit.
120+
* Includes cycle detection to prevent processing the log content more than once.
121+
* If the depth limit of the log structure is exceeded, returns the truncated calculation
122+
* to everything up to that point.
123+
*
124+
* @param log - The Log object to calculate size for
125+
* @param depth - Maximum depth to traverse in nested structures (default: 3)
126+
* @returns The estimated size of the log object in bytes
127+
*/
128+
private static estimateLogSize(log: LogRecord, depth: number = 3): number {
129+
// Queue contains tuples of [log_content, depth] where:
130+
// - log_content is the current piece of log data being processed
131+
// - depth tracks how many levels deep we've traversed to reach this content
132+
// - body starts at depth 0 since it's an AnyValue object
133+
// - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will
134+
// start processing its keys at depth 0
135+
let queue: Array<[AnyValue, number]> = [
136+
[log.body, 0],
137+
[log.attributes, -1],
138+
];
139+
140+
// Track visited complex log contents to avoid calculating the same one more than once
141+
const visited = new Set<object>();
142+
143+
let size: number = BASE_LOG_BUFFER_BYTE_SIZE;
144+
145+
while (queue.length > 0) {
146+
const newQueue: Array<[AnyValue, number]> = [];
147+
148+
for (const [nextVal, currentDepth] of queue) {
149+
// Small optimization, can stop calculating the size once it reaches the 1 MB limit
150+
if (size >= MAX_LOG_REQUEST_BYTE_SIZE) {
151+
return size;
152+
}
153+
154+
if (nextVal === null || nextVal === undefined) {
155+
continue;
156+
}
157+
158+
if (typeof nextVal === 'number' || typeof nextVal === 'boolean' || typeof nextVal === 'string') {
159+
size += this.estimateUtf8Size(nextVal.toString());
160+
continue;
161+
}
162+
163+
if (nextVal instanceof Uint8Array) {
164+
size += nextVal.byteLength;
165+
continue;
166+
}
167+
168+
// nextVal must be Array or AnyValueMap
169+
if (currentDepth <= depth) {
170+
if (visited.has(nextVal)) {
171+
continue;
172+
}
173+
visited.add(nextVal);
174+
175+
if (Array.isArray(nextVal)) {
176+
for (const content of nextVal) {
177+
newQueue.push([content, currentDepth + 1]);
178+
}
179+
continue;
180+
}
181+
// It's an AnyValueMap
182+
const map = nextVal as AnyValueMap;
183+
for (const key in map) {
184+
size += AwsCloudWatchOtlpBatchLogRecordProcessor.estimateUtf8Size(key);
185+
newQueue.push([map[key], currentDepth + 1]);
186+
}
187+
}
188+
}
189+
queue = newQueue;
190+
}
191+
return size;
192+
}
193+
194+
private static estimateUtf8Size(s: string): number {
195+
let asciiCount = 0;
196+
let nonAsciiCount = 0;
197+
198+
for (const char of s) {
199+
if (char.charCodeAt(0) < 128) {
200+
asciiCount += 1;
201+
} else {
202+
nonAsciiCount += 1;
203+
}
204+
}
205+
206+
return asciiCount + nonAsciiCount * 4;
207+
}
208+
}

aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/otlp/aws/logs/aws-batch-log-record-processor.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
AwsCloudWatchOtlpBatchLogRecordProcessor,
77
BASE_LOG_BUFFER_BYTE_SIZE,
88
MAX_LOG_REQUEST_BYTE_SIZE,
9-
} from '../../../../../src/exporter/otlp/aws/logs/aws-batch-log-record-processor';
9+
} from '../../../../../src/exporter/otlp/aws/logs/aws-cw-otlp-batch-log-record-processor';
1010
import { OTLPAwsLogExporter } from '../../../../../src/exporter/otlp/aws/logs/otlp-aws-log-exporter';
1111
import expect from 'expect';
1212
import { ExportResultCode } from '@opentelemetry/core';
@@ -90,8 +90,8 @@ describe('AwsCloudWatchOtlpBatchLogRecordProcessor', () => {
9090
});
9191

9292
it('should handle primitive types', () => {
93-
const primitives: AnyValue[] = ['test', new Uint8Array([116, 101, 115, 116]), 1, 1.2, true, false, null];
94-
const expectedSizes = [4, 4, 1, 3, 4, 5, 0];
93+
const primitives: AnyValue[] = ['test', new Uint8Array([116, 101, 115, 116]), 1, 1.2, true, false, null, '深入 Python', 'calfé'];
94+
const expectedSizes = [4, 4, 1, 3, 4, 5, 0, 2 * 4 + ' Python'.length, 1 * 4 + 'calf'.length];
9595

9696
primitives.forEach((primitive, index) => {
9797
const log = generateTestLogData(primitive, 'key', 0, 1, true);
@@ -120,7 +120,7 @@ describe('AwsCloudWatchOtlpBatchLogRecordProcessor', () => {
120120
});
121121
});
122122

123-
describe('_flushOneBatchIntermediary', () => {
123+
describe('_flushSizeLimitedBatch', () => {
124124
let sandbox!: sinon.SinonSandbox;
125125
let mockExporter: sinon.SinonStubbedInstance<OTLPAwsLogExporter>;
126126
let processor: any; // Setting it to any instead of AwsCloudWatchOtlpBatchLogRecordProcessor since we need to stub a few of its methods

0 commit comments

Comments
 (0)