Skip to content

Commit d45060b

Browse files
[otlp] OTLP Exporter Custom serializer - Spans (open-telemetry#5928)
Co-authored-by: Mikel Blanchard <[email protected]>
1 parent 8de335c commit d45060b

File tree

3 files changed

+396
-31
lines changed

3 files changed

+396
-31
lines changed

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTagWriter.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public override OtlpTagWriterArrayState BeginWriteArray()
111111

112112
public override void WriteNullValue(ref OtlpTagWriterArrayState state)
113113
{
114+
state.WritePosition = ProtobufSerializer.WriteTagAndLength(state.Buffer, state.WritePosition, 0, ProtobufOtlpFieldNumberConstants.ArrayValue_Value, ProtobufWireType.LEN);
114115
}
115116

116117
public override void WriteIntegralValue(ref OtlpTagWriterArrayState state, long value)
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Diagnostics;
5+
using OpenTelemetry.Trace;
6+
7+
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
8+
9+
internal static class ProtobufOtlpTraceSerializer
10+
{
11+
private const int ReserveSizeForLength = 4;
12+
private const string UnsetStatusCodeTagValue = "UNSET";
13+
private const string OkStatusCodeTagValue = "OK";
14+
private const string ErrorStatusCodeTagValue = "ERROR";
15+
private const int TraceIdSize = 16;
16+
private const int SpanIdSize = 8;
17+
18+
internal static int WriteSpan(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Activity activity)
19+
{
20+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.ScopeSpans_Span, ProtobufWireType.LEN);
21+
int spanLengthPosition = writePosition;
22+
writePosition += ReserveSizeForLength;
23+
24+
writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, TraceIdSize, ProtobufOtlpFieldNumberConstants.Span_Trace_Id, ProtobufWireType.LEN);
25+
writePosition = WriteTraceId(buffer, writePosition, activity.TraceId);
26+
27+
writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, SpanIdSize, ProtobufOtlpFieldNumberConstants.Span_Span_Id, ProtobufWireType.LEN);
28+
writePosition = WriteSpanId(buffer, writePosition, activity.SpanId);
29+
30+
if (activity.TraceStateString != null)
31+
{
32+
writePosition = ProtobufSerializer.WriteStringWithTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Trace_State, activity.TraceStateString);
33+
}
34+
35+
if (activity.ParentSpanId != default)
36+
{
37+
writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, SpanIdSize, ProtobufOtlpFieldNumberConstants.Span_Parent_Span_Id, ProtobufWireType.LEN);
38+
writePosition = WriteSpanId(buffer, writePosition, activity.ParentSpanId);
39+
}
40+
41+
writePosition = WriteTraceFlags(buffer, writePosition, activity.ActivityTraceFlags, activity.HasRemoteParent, ProtobufOtlpFieldNumberConstants.Span_Flags);
42+
writePosition = ProtobufSerializer.WriteStringWithTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Name, activity.DisplayName);
43+
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Kind, (int)activity.Kind + 1);
44+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Start_Time_Unix_Nano, (ulong)activity.StartTimeUtc.ToUnixTimeNanoseconds());
45+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_End_Time_Unix_Nano, (ulong)(activity.StartTimeUtc.ToUnixTimeNanoseconds() + activity.Duration.ToNanoseconds()));
46+
47+
(writePosition, StatusCode? statusCode, string? statusMessage) = WriteActivityTags(buffer, writePosition, sdkLimitOptions, activity);
48+
writePosition = WriteSpanEvents(buffer, writePosition, sdkLimitOptions, activity);
49+
writePosition = WriteSpanLinks(buffer, writePosition, sdkLimitOptions, activity);
50+
writePosition = WriteSpanStatus(buffer, writePosition, activity, statusCode, statusMessage);
51+
ProtobufSerializer.WriteReservedLength(buffer, spanLengthPosition, writePosition - (spanLengthPosition + ReserveSizeForLength));
52+
53+
return writePosition;
54+
}
55+
56+
internal static int WriteTraceId(byte[] buffer, int position, ActivityTraceId activityTraceId)
57+
{
58+
var traceBytes = new Span<byte>(buffer, position, TraceIdSize);
59+
activityTraceId.CopyTo(traceBytes);
60+
return position + TraceIdSize;
61+
}
62+
63+
internal static int WriteSpanId(byte[] buffer, int position, ActivitySpanId activitySpanId)
64+
{
65+
var spanIdBytes = new Span<byte>(buffer, position, SpanIdSize);
66+
activitySpanId.CopyTo(spanIdBytes);
67+
return position + SpanIdSize;
68+
}
69+
70+
internal static int WriteTraceFlags(byte[] buffer, int position, ActivityTraceFlags activityTraceFlags, bool hasRemoteParent, int fieldNumber)
71+
{
72+
uint spanFlags = (uint)activityTraceFlags & (byte)0x000000FF;
73+
74+
spanFlags |= 0x00000100;
75+
if (hasRemoteParent)
76+
{
77+
spanFlags |= 0x00000200;
78+
}
79+
80+
position = ProtobufSerializer.WriteFixed32WithTag(buffer, position, fieldNumber, spanFlags);
81+
82+
return position;
83+
}
84+
85+
internal static (int Position, StatusCode? StatusCode, string? StatusMessage) WriteActivityTags(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Activity activity)
86+
{
87+
StatusCode? statusCode = null;
88+
string? statusMessage = null;
89+
int maxAttributeCount = sdkLimitOptions.SpanAttributeCountLimit ?? int.MaxValue;
90+
int maxAttributeValueLength = sdkLimitOptions.AttributeValueLengthLimit ?? int.MaxValue;
91+
int attributeCount = 0;
92+
int droppedAttributeCount = 0;
93+
94+
ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState = new ProtobufOtlpTagWriter.OtlpTagWriterState
95+
{
96+
Buffer = buffer,
97+
WritePosition = writePosition,
98+
};
99+
100+
foreach (ref readonly var tag in activity.EnumerateTagObjects())
101+
{
102+
switch (tag.Key)
103+
{
104+
case "otel.status_code":
105+
106+
statusCode = tag.Value switch
107+
{
108+
/*
109+
* Note: Order here does matter for perf. Unset is
110+
* first because assumption is most spans will be
111+
* Unset, then Error. Ok is not set by the SDK.
112+
*/
113+
not null when UnsetStatusCodeTagValue.Equals(tag.Value as string, StringComparison.OrdinalIgnoreCase) => StatusCode.Unset,
114+
not null when ErrorStatusCodeTagValue.Equals(tag.Value as string, StringComparison.OrdinalIgnoreCase) => StatusCode.Error,
115+
not null when OkStatusCodeTagValue.Equals(tag.Value as string, StringComparison.OrdinalIgnoreCase) => StatusCode.Ok,
116+
_ => null,
117+
};
118+
continue;
119+
case "otel.status_description":
120+
statusMessage = tag.Value as string;
121+
continue;
122+
}
123+
124+
if (attributeCount < maxAttributeCount)
125+
{
126+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpFieldNumberConstants.Span_Attributes, ProtobufWireType.LEN);
127+
int spanAttributesLengthPosition = otlpTagWriterState.WritePosition;
128+
otlpTagWriterState.WritePosition += ReserveSizeForLength;
129+
130+
ProtobufOtlpTagWriter.Instance.TryWriteTag(ref otlpTagWriterState, tag.Key, tag.Value, maxAttributeValueLength);
131+
132+
ProtobufSerializer.WriteReservedLength(buffer, spanAttributesLengthPosition, otlpTagWriterState.WritePosition - (spanAttributesLengthPosition + 4));
133+
attributeCount++;
134+
}
135+
else
136+
{
137+
droppedAttributeCount++;
138+
}
139+
}
140+
141+
if (droppedAttributeCount > 0)
142+
{
143+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(buffer, otlpTagWriterState.WritePosition, ProtobufOtlpFieldNumberConstants.Span_Dropped_Attributes_Count, ProtobufWireType.VARINT);
144+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteVarInt32(buffer, otlpTagWriterState.WritePosition, (uint)droppedAttributeCount);
145+
}
146+
147+
return (otlpTagWriterState.WritePosition, statusCode, statusMessage);
148+
}
149+
150+
internal static int WriteSpanEvents(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Activity activity)
151+
{
152+
int maxEventCountLimit = sdkLimitOptions.SpanEventCountLimit ?? int.MaxValue;
153+
int eventCount = 0;
154+
int droppedEventCount = 0;
155+
foreach (ref readonly var evnt in activity.EnumerateEvents())
156+
{
157+
if (eventCount < maxEventCountLimit)
158+
{
159+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Events, ProtobufWireType.LEN);
160+
int spanEventsLengthPosition = writePosition;
161+
writePosition += ReserveSizeForLength; // Reserve 4 bytes for length
162+
163+
writePosition = ProtobufSerializer.WriteStringWithTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Event_Name, evnt.Name);
164+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Event_Time_Unix_Nano, (ulong)evnt.Timestamp.ToUnixTimeNanoseconds());
165+
writePosition = WriteEventAttributes(ref buffer, writePosition, sdkLimitOptions, evnt);
166+
167+
ProtobufSerializer.WriteReservedLength(buffer, spanEventsLengthPosition, writePosition - (spanEventsLengthPosition + ReserveSizeForLength));
168+
eventCount++;
169+
}
170+
else
171+
{
172+
droppedEventCount++;
173+
}
174+
}
175+
176+
if (droppedEventCount > 0)
177+
{
178+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Dropped_Events_Count, ProtobufWireType.VARINT);
179+
writePosition = ProtobufSerializer.WriteVarInt32(buffer, writePosition, (uint)droppedEventCount);
180+
}
181+
182+
return writePosition;
183+
}
184+
185+
internal static int WriteEventAttributes(ref byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ActivityEvent evnt)
186+
{
187+
int maxAttributeCount = sdkLimitOptions.SpanEventAttributeCountLimit ?? int.MaxValue;
188+
int maxAttributeValueLength = sdkLimitOptions.AttributeValueLengthLimit ?? int.MaxValue;
189+
int attributeCount = 0;
190+
int droppedAttributeCount = 0;
191+
192+
ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState = new ProtobufOtlpTagWriter.OtlpTagWriterState
193+
{
194+
Buffer = buffer,
195+
WritePosition = writePosition,
196+
};
197+
198+
foreach (ref readonly var tag in evnt.EnumerateTagObjects())
199+
{
200+
if (attributeCount < maxAttributeCount)
201+
{
202+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpFieldNumberConstants.Event_Attributes, ProtobufWireType.LEN);
203+
int eventAttributesLengthPosition = otlpTagWriterState.WritePosition;
204+
otlpTagWriterState.WritePosition += ReserveSizeForLength;
205+
ProtobufOtlpTagWriter.Instance.TryWriteTag(ref otlpTagWriterState, tag.Key, tag.Value, maxAttributeValueLength);
206+
ProtobufSerializer.WriteReservedLength(buffer, eventAttributesLengthPosition, otlpTagWriterState.WritePosition - (eventAttributesLengthPosition + ReserveSizeForLength));
207+
attributeCount++;
208+
}
209+
else
210+
{
211+
droppedAttributeCount++;
212+
}
213+
}
214+
215+
if (droppedAttributeCount > 0)
216+
{
217+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(buffer, otlpTagWriterState.WritePosition, ProtobufOtlpFieldNumberConstants.Event_Dropped_Attributes_Count, ProtobufWireType.VARINT);
218+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteVarInt32(buffer, otlpTagWriterState.WritePosition, (uint)droppedAttributeCount);
219+
}
220+
221+
return otlpTagWriterState.WritePosition;
222+
}
223+
224+
internal static int WriteSpanLinks(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Activity activity)
225+
{
226+
int maxLinksCount = sdkLimitOptions.SpanLinkCountLimit ?? int.MaxValue;
227+
int linkCount = 0;
228+
int droppedLinkCount = 0;
229+
230+
foreach (ref readonly var link in activity.EnumerateLinks())
231+
{
232+
if (linkCount < maxLinksCount)
233+
{
234+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Links, ProtobufWireType.LEN);
235+
int spanLinksLengthPosition = writePosition;
236+
writePosition += ReserveSizeForLength; // Reserve 4 bytes for length
237+
238+
writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, TraceIdSize, ProtobufOtlpFieldNumberConstants.Link_Trace_Id, ProtobufWireType.LEN);
239+
writePosition = WriteTraceId(buffer, writePosition, link.Context.TraceId);
240+
writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, SpanIdSize, ProtobufOtlpFieldNumberConstants.Link_Span_Id, ProtobufWireType.LEN);
241+
writePosition = WriteSpanId(buffer, writePosition, link.Context.SpanId);
242+
if (link.Context.TraceState != null)
243+
{
244+
writePosition = ProtobufSerializer.WriteStringWithTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Trace_State, link.Context.TraceState);
245+
}
246+
247+
writePosition = WriteLinkAttributes(buffer, writePosition, sdkLimitOptions, link);
248+
writePosition = WriteTraceFlags(buffer, writePosition, link.Context.TraceFlags, link.Context.IsRemote, ProtobufOtlpFieldNumberConstants.Link_Flags);
249+
250+
ProtobufSerializer.WriteReservedLength(buffer, spanLinksLengthPosition, writePosition - (spanLinksLengthPosition + ReserveSizeForLength));
251+
linkCount++;
252+
}
253+
else
254+
{
255+
droppedLinkCount++;
256+
}
257+
}
258+
259+
if (droppedLinkCount > 0)
260+
{
261+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpFieldNumberConstants.Span_Dropped_Links_Count, ProtobufWireType.VARINT);
262+
writePosition = ProtobufSerializer.WriteVarInt32(buffer, writePosition, (uint)droppedLinkCount);
263+
}
264+
265+
return writePosition;
266+
}
267+
268+
internal static int WriteLinkAttributes(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ActivityLink link)
269+
{
270+
int maxAttributeCount = sdkLimitOptions.SpanLinkAttributeCountLimit ?? int.MaxValue;
271+
int maxAttributeValueLength = sdkLimitOptions.AttributeValueLengthLimit ?? int.MaxValue;
272+
int attributeCount = 0;
273+
int droppedAttributeCount = 0;
274+
275+
ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState = new ProtobufOtlpTagWriter.OtlpTagWriterState
276+
{
277+
Buffer = buffer,
278+
WritePosition = writePosition,
279+
};
280+
281+
foreach (ref readonly var tag in link.EnumerateTagObjects())
282+
{
283+
if (attributeCount < maxAttributeCount)
284+
{
285+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpFieldNumberConstants.Link_Attributes, ProtobufWireType.LEN);
286+
int linkAttributesLengthPosition = otlpTagWriterState.WritePosition;
287+
otlpTagWriterState.WritePosition += ReserveSizeForLength;
288+
ProtobufOtlpTagWriter.Instance.TryWriteTag(ref otlpTagWriterState, tag.Key, tag.Value, maxAttributeValueLength);
289+
ProtobufSerializer.WriteReservedLength(buffer, linkAttributesLengthPosition, otlpTagWriterState.WritePosition - (linkAttributesLengthPosition + ReserveSizeForLength));
290+
attributeCount++;
291+
}
292+
else
293+
{
294+
droppedAttributeCount++;
295+
}
296+
}
297+
298+
if (droppedAttributeCount > 0)
299+
{
300+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(buffer, otlpTagWriterState.WritePosition, ProtobufOtlpFieldNumberConstants.Link_Dropped_Attributes_Count, ProtobufWireType.VARINT);
301+
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteVarInt32(buffer, otlpTagWriterState.WritePosition, (uint)droppedAttributeCount);
302+
}
303+
304+
return otlpTagWriterState.WritePosition;
305+
}
306+
307+
internal static int WriteSpanStatus(byte[] buffer, int position, Activity activity, StatusCode? statusCode, string? statusMessage)
308+
{
309+
if (activity.Status == ActivityStatusCode.Unset && statusCode == null)
310+
{
311+
return position;
312+
}
313+
314+
var useActivity = activity.Status != ActivityStatusCode.Unset;
315+
var isError = useActivity ? activity.Status == ActivityStatusCode.Error : statusCode == StatusCode.Error;
316+
var description = useActivity ? activity.StatusDescription : statusMessage;
317+
318+
if (isError && description != null)
319+
{
320+
var descriptionSpan = description.AsSpan();
321+
var numberOfUtf8CharsInString = ProtobufSerializer.GetNumberOfUtf8CharsInString(descriptionSpan);
322+
position = ProtobufSerializer.WriteTagAndLength(buffer, position, numberOfUtf8CharsInString + 4, ProtobufOtlpFieldNumberConstants.Span_Status, ProtobufWireType.LEN);
323+
position = ProtobufSerializer.WriteStringWithTag(buffer, position, ProtobufOtlpFieldNumberConstants.Status_Message, numberOfUtf8CharsInString, descriptionSpan);
324+
}
325+
else
326+
{
327+
position = ProtobufSerializer.WriteTagAndLength(buffer, position, 2, ProtobufOtlpFieldNumberConstants.Span_Status, ProtobufWireType.LEN);
328+
}
329+
330+
var finalStatusCode = useActivity ? (int)activity.Status : (statusCode != null && statusCode != StatusCode.Unset) ? (int)statusCode! : (int)StatusCode.Unset;
331+
position = ProtobufSerializer.WriteEnumWithTag(buffer, position, ProtobufOtlpFieldNumberConstants.Status_Code, finalStatusCode);
332+
333+
return position;
334+
}
335+
}

0 commit comments

Comments
 (0)