Skip to content

Commit e67d7e5

Browse files
committed
fixup! feat(otlp-transformer): add custom logs protobuf serializer
1 parent 527bb10 commit e67d7e5

File tree

2 files changed

+260
-0
lines changed

2 files changed

+260
-0
lines changed

experimental/packages/otlp-transformer/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ src/generated/*
22
test/generated/*
33
!src/generated/.gitkeep
44
!test/generated/.gitkeep
5+
!src/logs
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import type { ReadableLogRecord } from '@opentelemetry/sdk-logs';
17+
import { GrowingBufferProtobufWriter } from '../../common/protobuf/growing-buffer-protobuf-writer';
18+
import { hexToBinary } from '../../common/hex-to-binary';
19+
import type { Resource } from '@opentelemetry/resources';
20+
import type { InstrumentationScope } from '@opentelemetry/core';
21+
import { SeverityNumber } from '@opentelemetry/api-logs';
22+
import {
23+
writeAnyValue,
24+
writeAttributes,
25+
writeHrTimeAsFixed64,
26+
} from '../../common/protobuf/common-serializer';
27+
import type { IProtobufWriter } from '../../common/protobuf/i-protobuf-writer';
28+
import { ProtobufSizeEstimator } from '../../common/protobuf/protobuf-size-estimator';
29+
30+
/**
31+
* Serialize a single LogRecord directly from ReadableLogRecord
32+
*/
33+
function serializeLogRecord(
34+
writer: IProtobufWriter,
35+
logRecord: ReadableLogRecord
36+
): void {
37+
const logStart = writer.startLengthDelimited();
38+
const logStartPos = writer.pos;
39+
40+
// time_unix_nano (field 1, fixed64)
41+
writer.writeTag(1, 1); // wire type 1 (fixed64)
42+
writeHrTimeAsFixed64(writer, logRecord.hrTime);
43+
44+
// severity_number (field 2, enum/varint) - skip if unspecified
45+
if (
46+
logRecord.severityNumber !== undefined &&
47+
logRecord.severityNumber !== SeverityNumber.UNSPECIFIED
48+
) {
49+
writer.writeTag(2, 0);
50+
writer.writeVarint(logRecord.severityNumber);
51+
}
52+
53+
// severity_text (field 3, string) - skip if empty
54+
if (logRecord.severityText) {
55+
writer.writeTag(3, 2);
56+
writer.writeString(logRecord.severityText);
57+
}
58+
59+
// body (field 5, AnyValue) - skip if undefined
60+
if (logRecord.body !== undefined) {
61+
writer.writeTag(5, 2);
62+
const bodyStart = writer.startLengthDelimited();
63+
const bodyStartPos = writer.pos;
64+
writeAnyValue(writer, logRecord.body);
65+
writer.finishLengthDelimited(bodyStart, writer.pos - bodyStartPos);
66+
}
67+
68+
// attributes (field 6, repeated KeyValue)
69+
if (logRecord.attributes) {
70+
writeAttributes(writer, logRecord.attributes, 6);
71+
}
72+
73+
// dropped_attributes_count (field 7, uint32)
74+
writer.writeTag(7, 0);
75+
writer.writeVarint(logRecord.droppedAttributesCount);
76+
77+
// flags (field 8, fixed32) - skip if 0 or undefined
78+
if (logRecord.spanContext?.traceFlags) {
79+
writer.writeTag(8, 5); // wire type 5 (fixed32)
80+
writer.writeFixed32(logRecord.spanContext.traceFlags);
81+
}
82+
83+
// trace_id (field 9, bytes) - skip if empty
84+
if (logRecord.spanContext?.traceId) {
85+
writer.writeTag(9, 2);
86+
writer.writeBytes(hexToBinary(logRecord.spanContext.traceId));
87+
}
88+
89+
// span_id (field 10, bytes) - skip if empty
90+
if (logRecord.spanContext?.spanId) {
91+
writer.writeTag(10, 2);
92+
writer.writeBytes(hexToBinary(logRecord.spanContext.spanId));
93+
}
94+
95+
// observed_time_unix_nano (field 11, fixed64)
96+
writer.writeTag(11, 1); // wire type 1 (fixed64)
97+
writeHrTimeAsFixed64(writer, logRecord.hrTimeObserved);
98+
99+
// event_name (field 12, string) - skip if empty
100+
if (logRecord.eventName) {
101+
writer.writeTag(12, 2);
102+
writer.writeString(logRecord.eventName);
103+
}
104+
105+
writer.finishLengthDelimited(logStart, writer.pos - logStartPos);
106+
}
107+
108+
/**
109+
* Serialize ScopeLogs directly from SDK types
110+
*/
111+
function serializeScopeLogs(
112+
writer: IProtobufWriter,
113+
scope: InstrumentationScope,
114+
logRecords: ReadableLogRecord[]
115+
): void {
116+
const scopeLogsStart = writer.startLengthDelimited();
117+
const scopeLogsStartPos = writer.pos;
118+
119+
// scope (field 1, InstrumentationScope)
120+
writer.writeTag(1, 2);
121+
const scopeStart = writer.startLengthDelimited();
122+
const scopeStartPos = writer.pos;
123+
124+
// Write InstrumentationScope fields directly
125+
writer.writeTag(1, 2);
126+
writer.writeString(scope.name);
127+
128+
if (scope.version) {
129+
writer.writeTag(2, 2);
130+
writer.writeString(scope.version);
131+
}
132+
133+
writer.finishLengthDelimited(scopeStart, writer.pos - scopeStartPos);
134+
135+
// log_records (field 2, repeated LogRecord)
136+
for (const logRecord of logRecords) {
137+
writer.writeTag(2, 2);
138+
serializeLogRecord(writer, logRecord);
139+
}
140+
141+
// schema_url (field 3, string) - skip if empty
142+
if (scope.schemaUrl) {
143+
writer.writeTag(3, 2);
144+
writer.writeString(scope.schemaUrl);
145+
}
146+
147+
writer.finishLengthDelimited(scopeLogsStart, writer.pos - scopeLogsStartPos);
148+
}
149+
150+
function serializeResource(
151+
writer: IProtobufWriter,
152+
resource: Resource,
153+
fieldNumber: number
154+
) {
155+
writer.writeTag(fieldNumber, 2);
156+
const resourceStart = writer.startLengthDelimited();
157+
const resourceStartPos = writer.pos;
158+
159+
// Write Resource attributes directly
160+
if (resource.attributes) {
161+
writeAttributes(writer, resource.attributes, 1);
162+
}
163+
164+
// dropped_attributes_count (field 2, uint32) - set to 0 as we don't track this
165+
writer.writeTag(2, 0);
166+
writer.writeVarint(0);
167+
168+
writer.finishLengthDelimited(resourceStart, writer.pos - resourceStartPos);
169+
}
170+
171+
/**
172+
* Serialize ResourceLogs directly from SDK Resource type
173+
*/
174+
function serializeResourceLogs(
175+
writer: IProtobufWriter,
176+
resource: Resource,
177+
scopeMap: Map<string, ReadableLogRecord[]>
178+
): void {
179+
const resourceLogsStart = writer.startLengthDelimited();
180+
const resourceLogsStartPos = writer.pos;
181+
182+
// resource (field 1, Resource)
183+
serializeResource(writer, resource, 1);
184+
185+
// scope_logs (field 2, repeated ScopeLogs)
186+
for (const scopeLogs of scopeMap.values()) {
187+
writer.writeTag(2, 2);
188+
const scope = scopeLogs[0].instrumentationScope;
189+
serializeScopeLogs(writer, scope, scopeLogs);
190+
}
191+
192+
// schema_url (field 3, string) - skip if empty
193+
if (resource.schemaUrl) {
194+
writer.writeTag(3, 2);
195+
writer.writeString(resource.schemaUrl);
196+
}
197+
198+
writer.finishLengthDelimited(
199+
resourceLogsStart,
200+
writer.pos - resourceLogsStartPos
201+
);
202+
}
203+
204+
/**
205+
* Group log records by resource and instrumentation scope
206+
*/
207+
function createResourceMap(
208+
logRecords: ReadableLogRecord[]
209+
): Map<Resource, Map<string, ReadableLogRecord[]>> {
210+
const resourceMap: Map<
211+
Resource,
212+
Map<string, ReadableLogRecord[]>
213+
> = new Map();
214+
215+
for (const record of logRecords) {
216+
const resource = record.resource;
217+
const scope = record.instrumentationScope;
218+
219+
let ismMap = resourceMap.get(resource);
220+
if (!ismMap) {
221+
ismMap = new Map();
222+
resourceMap.set(resource, ismMap);
223+
}
224+
225+
const ismKey = `${scope.name}@${scope.version}:${scope.schemaUrl}`;
226+
let records = ismMap.get(ismKey);
227+
if (!records) {
228+
records = [];
229+
ismMap.set(ismKey, records);
230+
}
231+
records.push(record);
232+
}
233+
return resourceMap;
234+
}
235+
236+
/**
237+
* Serialize ExportLogsServiceRequest directly from ReadableLogRecord[]
238+
*/
239+
export function serializeLogsExportRequest(
240+
logRecords: ReadableLogRecord[]
241+
): Uint8Array {
242+
const resourceMap = createResourceMap(logRecords);
243+
244+
// First pass: estimate size
245+
const estimator = new ProtobufSizeEstimator();
246+
for (const [resource, scopeMap] of resourceMap) {
247+
estimator.writeTag(1, 2);
248+
serializeResourceLogs(estimator, resource, scopeMap);
249+
}
250+
251+
// Second pass: write with estimated size
252+
const writer = new GrowingBufferProtobufWriter(estimator.pos);
253+
for (const [resource, scopeMap] of resourceMap) {
254+
writer.writeTag(1, 2);
255+
serializeResourceLogs(writer, resource, scopeMap);
256+
}
257+
258+
return writer.finish();
259+
}

0 commit comments

Comments
 (0)