Skip to content

Commit bef2eb7

Browse files
committed
feat(otlp-transformer): add custom logs protobuf serializer
1 parent fcafab5 commit bef2eb7

File tree

16 files changed

+2031
-11
lines changed

16 files changed

+2031
-11
lines changed

experimental/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2
8383
* feat(opentelemetry-sdk-node): set instrumentation and propagators for experimental start [#6148](https://github.com/open-telemetry/opentelemetry-js/pull/6148) @maryliag
8484
* refactor(configuration): set console exporter as empty object [#6164](https://github.com/open-telemetry/opentelemetry-js/pull/6164) @maryliag
8585
* feat(instrumentation-http, instrumentation-fetch, instrumentation-xml-http-request): support "QUERY" as a known HTTP method
86+
* feat(otlp-transformer): add custom protobuf logs serializer [#6228](https://github.com/open-telemetry/opentelemetry-js/pull/6228) @pichlermarc
8687

8788
### :bug: Bug Fixes
8889

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
build
22
src/generated
3+
test/generated
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
src/generated/*
2+
test/generated/*
23
!src/generated/.gitkeep
4+
!test/generated/.gitkeep
35
!src/logs
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import type { Attributes, HrTime } from '@opentelemetry/api';
17+
import type { AnyValue, LogAttributes } from '@opentelemetry/api-logs';
18+
import type { IProtobufWriter } from './i-protobuf-writer';
19+
20+
/**
21+
* Write HrTime [seconds, nanoseconds] directly as fixed64 to the serializer.
22+
* Converts to nanoseconds and writes as 64-bit little-endian integer without allocations.
23+
*
24+
* HrTime represents: total_nanos = seconds * 1_000_000_000 + nanoseconds
25+
* We need to split this into low (bits 0-31) and high (bits 32-63).
26+
*
27+
* @param serializer - The protobuf writer
28+
* @param hrTime - HrTime tuple [seconds, nanoseconds]
29+
*/
30+
export function writeHrTimeAsFixed64(
31+
serializer: IProtobufWriter,
32+
hrTime: HrTime
33+
): void {
34+
const seconds = hrTime[0];
35+
const nanos = hrTime[1];
36+
37+
// Calculate total nanoseconds split into 32-bit parts
38+
// We use the fact that 1e9 < 2^30, so multiplication is safe for reasonable timestamps
39+
40+
// For the low 32 bits: (seconds * 1e9 + nanos) & 0xFFFFFFFF
41+
// For the high 32 bits: floor((seconds * 1e9 + nanos) / 2^32)
42+
43+
// Calculate seconds * 1e9 split into parts
44+
const secNanos = seconds * 1e9;
45+
const secNanosLow = secNanos >>> 0; // Low 32 bits of seconds * 1e9
46+
const secNanosHigh = (secNanos / 0x100000000) >>> 0; // High bits from seconds * 1e9
47+
48+
// Add nanoseconds to the low part
49+
const totalLow = (secNanosLow + nanos) >>> 0;
50+
51+
// Check for overflow from low to high (carry bit)
52+
const carry = secNanosLow + nanos >= 0x100000000 ? 1 : 0;
53+
const totalHigh = (secNanosHigh + carry) >>> 0;
54+
55+
serializer.writeFixed64(totalLow, totalHigh);
56+
}
57+
58+
/**
59+
* Write Attributes directly to protobuf as repeated KeyValue
60+
*/
61+
export function writeAttributes(
62+
writer: IProtobufWriter,
63+
attributes: Attributes | LogAttributes,
64+
fieldNumber: number
65+
): void {
66+
for (const key in attributes) {
67+
if (!Object.prototype.hasOwnProperty.call(attributes, key)) {
68+
continue;
69+
}
70+
const value = attributes[key];
71+
writer.writeTag(fieldNumber, 2);
72+
const kvStart = writer.startLengthDelimited();
73+
const startPos = writer.pos;
74+
writeKeyValue(writer, key, value);
75+
writer.finishLengthDelimited(kvStart, writer.pos - startPos);
76+
}
77+
}
78+
79+
/**
80+
* Write a KeyValue pair directly to protobuf
81+
*/
82+
export function writeKeyValue(
83+
writer: IProtobufWriter,
84+
key: string,
85+
value: AnyValue
86+
): void {
87+
writer.writeTag(1, 2);
88+
writer.writeString(key);
89+
writer.writeTag(2, 2);
90+
const valueStart = writer.startLengthDelimited();
91+
const startPos = writer.pos;
92+
writeAnyValue(writer, value);
93+
writer.finishLengthDelimited(valueStart, writer.pos - startPos);
94+
}
95+
96+
/**
97+
* Write an AnyValue directly from raw attribute value to protobuf
98+
*/
99+
export function writeAnyValue(writer: IProtobufWriter, value: AnyValue): void {
100+
const t = typeof value;
101+
if (t === 'string') {
102+
writer.writeTag(1, 2);
103+
writer.writeString(value as string);
104+
} else if (t === 'boolean') {
105+
writer.writeTag(2, 0);
106+
writer.writeVarint((value as boolean) ? 1 : 0);
107+
} else if (t === 'number') {
108+
// Use isSafeInteger to avoid precision loss with large integers
109+
// Numbers outside the safe integer range should be serialized as doubles
110+
if (!Number.isSafeInteger(value as number)) {
111+
writer.writeTag(4, 1);
112+
writer.writeDouble(value as number);
113+
} else {
114+
writer.writeTag(3, 0);
115+
writer.writeVarint(value as number);
116+
}
117+
} else if (value instanceof Uint8Array) {
118+
writer.writeTag(7, 2);
119+
writer.writeBytes(value);
120+
} else if (Array.isArray(value)) {
121+
writer.writeTag(5, 2);
122+
const arrayStart = writer.startLengthDelimited();
123+
const arrayStartPos = writer.pos;
124+
for (const item of value) {
125+
writer.writeTag(1, 2);
126+
const itemStart = writer.startLengthDelimited();
127+
const itemStartPos = writer.pos;
128+
writeAnyValue(writer, item);
129+
writer.finishLengthDelimited(itemStart, writer.pos - itemStartPos);
130+
}
131+
writer.finishLengthDelimited(arrayStart, writer.pos - arrayStartPos);
132+
} else if (t === 'object' && value != null) {
133+
writer.writeTag(6, 2);
134+
const kvlistStart = writer.startLengthDelimited();
135+
const kvlistStartPos = writer.pos;
136+
const obj = value as Record<string, AnyValue>;
137+
for (const k in obj) {
138+
if (!Object.prototype.hasOwnProperty.call(obj, k)) {
139+
continue;
140+
}
141+
const v = obj[k];
142+
writer.writeTag(1, 2);
143+
const kvStart = writer.startLengthDelimited();
144+
const kvStartPos = writer.pos;
145+
writer.writeTag(1, 2);
146+
writer.writeString(k);
147+
writer.writeTag(2, 2);
148+
const valueStart = writer.startLengthDelimited();
149+
const valueStartPos = writer.pos;
150+
writeAnyValue(writer, v);
151+
writer.finishLengthDelimited(valueStart, writer.pos - valueStartPos);
152+
writer.finishLengthDelimited(kvStart, writer.pos - kvStartPos);
153+
}
154+
writer.finishLengthDelimited(kvlistStart, writer.pos - kvlistStartPos);
155+
}
156+
// Else: unsupported type, write nothing
157+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
export interface IProtobufWriter {
18+
pos: number;
19+
writeTag(fieldNumber: number, wireType: number): void;
20+
writeVarint(value: number): void;
21+
writeFixed32(value: number): void;
22+
writeFixed64(low: number, high: number): void;
23+
writeBytes(bytes: Uint8Array): void;
24+
writeString(str: string): void;
25+
writeDouble(value: number): void;
26+
startLengthDelimited(): number;
27+
finishLengthDelimited(pos: number, length: number): void;
28+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import type { IProtobufWriter } from './i-protobuf-writer';
18+
import { sizeAsVarint } from './utils';
19+
20+
/**
21+
* Calculate UTF-8 byte length without encoding
22+
* @param str valid UTF-16 string
23+
*/
24+
function utf8ByteLength(str: string): number {
25+
// No quick path for ASCII, since we need to loop though all chars anyway.
26+
const len = str.length;
27+
let byteLen = 0;
28+
for (let i = 0; i < len; i++) {
29+
const code = str.charCodeAt(i);
30+
if (code < 0x80) {
31+
byteLen += 1;
32+
} else if (code < 0x800) {
33+
byteLen += 2;
34+
} else if (code < 0xd800 || code >= 0xe000) {
35+
byteLen += 3;
36+
} else {
37+
// Surrogate pair
38+
i++; // Skip the next character
39+
byteLen += 4;
40+
}
41+
}
42+
return byteLen;
43+
}
44+
45+
/**
46+
* Size estimator for protobuf messages.
47+
* Implements the same interface as ProtobufWriter but only counts bytes without allocating a buffer.
48+
* @internal
49+
*/
50+
export class ProtobufSizeEstimator implements IProtobufWriter {
51+
public pos: number = 0;
52+
53+
startLengthDelimited(): number {
54+
return this.pos;
55+
}
56+
57+
finishLengthDelimited(_: number, length: number): void {
58+
this.pos += sizeAsVarint(length);
59+
}
60+
61+
writeVarint(value: number): void {
62+
this.pos += sizeAsVarint(value);
63+
}
64+
65+
writeFixed32(_value: number): void {
66+
this.pos += 4;
67+
}
68+
69+
writeFixed64(_low: number, _high: number): void {
70+
this.pos += 8;
71+
}
72+
73+
writeBytes(bytes: Uint8Array): void {
74+
this.pos += sizeAsVarint(bytes.length);
75+
this.pos += bytes.length;
76+
}
77+
78+
writeTag(fieldNumber: number, wireType: number): void {
79+
this.writeVarint((fieldNumber << 3) | wireType);
80+
}
81+
82+
writeDouble(_value: number): void {
83+
this.pos += 8;
84+
}
85+
86+
writeString(str: string): void {
87+
const byteLen = utf8ByteLength(str);
88+
this.pos += sizeAsVarint(byteLen);
89+
this.pos += byteLen;
90+
}
91+
}

0 commit comments

Comments
 (0)