Skip to content

Commit ae3feb9

Browse files
author
Timothy Mothra
authored
[otlp] OTLP Exporter Custom serializer - (Part 2) Histogram and Exponential Histogram (open-telemetry#5962)
1 parent d113ecf commit ae3feb9

File tree

4 files changed

+226
-30
lines changed

4 files changed

+226
-30
lines changed

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpMetricSerializer.cs

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)
140140
writePosition = ProtobufSerializer.WriteStringWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Metric_Unit, metric.Unit);
141141
}
142142

143+
var aggregationValue = metric.Temporality == AggregationTemporality.Cumulative
144+
? ProtobufOtlpMetricFieldNumberConstants.Aggregation_Temporality_Cumulative
145+
: ProtobufOtlpMetricFieldNumberConstants.Aggregation_Temporality_Delta;
146+
143147
switch (metric.MetricType)
144148
{
145149
case MetricType.LongSum:
@@ -150,7 +154,7 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)
150154
writePosition += ReserveSizeForLength;
151155

152156
writePosition = ProtobufSerializer.WriteBoolWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Is_Monotonic, metric.MetricType == MetricType.LongSum);
153-
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Aggregation_Temporality, metric.Temporality == AggregationTemporality.Cumulative ? 2 : 1);
157+
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Aggregation_Temporality, aggregationValue);
154158

155159
foreach (ref readonly var metricPoint in metric.GetMetricPoints())
156160
{
@@ -170,7 +174,7 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)
170174
writePosition += ReserveSizeForLength;
171175

172176
writePosition = ProtobufSerializer.WriteBoolWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Is_Monotonic, metric.MetricType == MetricType.DoubleSum);
173-
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Aggregation_Temporality, metric.Temporality == AggregationTemporality.Cumulative ? 2 : 1);
177+
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Aggregation_Temporality, aggregationValue);
174178

175179
foreach (ref readonly var metricPoint in metric.GetMetricPoints())
176180
{
@@ -216,11 +220,134 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)
216220

217221
case MetricType.Histogram:
218222
{
223+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Metric_Data_Histogram, ProtobufWireType.LEN);
224+
int metricTypeLengthPosition = writePosition;
225+
writePosition += ReserveSizeForLength;
226+
227+
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Histogram_Aggregation_Temporality, aggregationValue);
228+
229+
foreach (ref readonly var metricPoint in metric.GetMetricPoints())
230+
{
231+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Histogram_Data_Points, ProtobufWireType.LEN);
232+
int dataPointLengthPosition = writePosition;
233+
writePosition += ReserveSizeForLength;
234+
235+
var startTime = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds();
236+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Start_Time_Unix_Nano, startTime);
237+
238+
var endTime = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds();
239+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Time_Unix_Nano, endTime);
240+
241+
foreach (var tag in metricPoint.Tags)
242+
{
243+
writePosition = WriteTag(buffer, writePosition, tag, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Attributes);
244+
}
245+
246+
var count = (ulong)metricPoint.GetHistogramCount();
247+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Count, count);
248+
249+
var sum = metricPoint.GetHistogramSum();
250+
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Sum, sum);
251+
252+
if (metricPoint.TryGetHistogramMinMaxValues(out double min, out double max))
253+
{
254+
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Min, min);
255+
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Max, max);
256+
}
257+
258+
foreach (var histogramMeasurement in metricPoint.GetHistogramBuckets())
259+
{
260+
var bucketCount = (ulong)histogramMeasurement.BucketCount;
261+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Bucket_Counts, bucketCount);
262+
263+
if (histogramMeasurement.ExplicitBound != double.PositiveInfinity)
264+
{
265+
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Explicit_Bounds, histogramMeasurement.ExplicitBound);
266+
}
267+
}
268+
269+
if (metricPoint.TryGetExemplars(out var exemplars))
270+
{
271+
foreach (ref readonly var exemplar in exemplars)
272+
{
273+
writePosition = WriteExemplar(buffer, writePosition, in exemplar, exemplar.DoubleValue, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Exemplars);
274+
}
275+
}
276+
277+
ProtobufSerializer.WriteReservedLength(buffer, dataPointLengthPosition, writePosition - (dataPointLengthPosition + ReserveSizeForLength));
278+
}
279+
280+
ProtobufSerializer.WriteReservedLength(buffer, metricTypeLengthPosition, writePosition - (metricTypeLengthPosition + ReserveSizeForLength));
219281
break;
220282
}
221283

222284
case MetricType.ExponentialHistogram:
223285
{
286+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Metric_Data_Exponential_Histogram, ProtobufWireType.LEN);
287+
int metricTypeLengthPosition = writePosition;
288+
writePosition += ReserveSizeForLength;
289+
290+
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogram_Aggregation_Temporality, aggregationValue);
291+
292+
foreach (ref readonly var metricPoint in metric.GetMetricPoints())
293+
{
294+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogram_Data_Points, ProtobufWireType.LEN);
295+
int dataPointLengthPosition = writePosition;
296+
writePosition += ReserveSizeForLength;
297+
298+
var startTime = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds();
299+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Start_Time_Unix_Nano, startTime);
300+
301+
var endTime = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds();
302+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Time_Unix_Nano, endTime);
303+
304+
foreach (var tag in metricPoint.Tags)
305+
{
306+
writePosition = WriteTag(buffer, writePosition, tag, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Attributes);
307+
}
308+
309+
var sum = metricPoint.GetHistogramSum();
310+
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Sum, sum);
311+
312+
var count = (ulong)metricPoint.GetHistogramCount();
313+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Count, count);
314+
315+
if (metricPoint.TryGetHistogramMinMaxValues(out double min, out double max))
316+
{
317+
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Min, min);
318+
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Max, max);
319+
}
320+
321+
var exponentialHistogramData = metricPoint.GetExponentialHistogramData();
322+
323+
writePosition = ProtobufSerializer.WriteSInt32WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Scale, exponentialHistogramData.Scale);
324+
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Zero_Count, (ulong)exponentialHistogramData.ZeroCount);
325+
326+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Positive, ProtobufWireType.LEN);
327+
int positiveBucketsLengthPosition = writePosition;
328+
writePosition += ReserveSizeForLength;
329+
330+
writePosition = ProtobufSerializer.WriteSInt32WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Buckets_Offset, exponentialHistogramData.PositiveBuckets.Offset);
331+
332+
foreach (var bucketCount in exponentialHistogramData.PositiveBuckets)
333+
{
334+
writePosition = ProtobufSerializer.WriteInt64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Buckets_Bucket_Counts, (ulong)bucketCount);
335+
}
336+
337+
ProtobufSerializer.WriteReservedLength(buffer, positiveBucketsLengthPosition, writePosition - (positiveBucketsLengthPosition + ReserveSizeForLength));
338+
339+
if (metricPoint.TryGetExemplars(out var exemplars))
340+
{
341+
foreach (ref readonly var exemplar in exemplars)
342+
{
343+
writePosition = WriteExemplar(buffer, writePosition, in exemplar, exemplar.DoubleValue, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Exemplars);
344+
}
345+
}
346+
347+
ProtobufSerializer.WriteReservedLength(buffer, dataPointLengthPosition, writePosition - (dataPointLengthPosition + ReserveSizeForLength));
348+
}
349+
350+
ProtobufSerializer.WriteReservedLength(buffer, metricTypeLengthPosition, writePosition - (metricTypeLengthPosition + ReserveSizeForLength));
224351
break;
225352
}
226353
}

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufSerializer.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,17 @@ internal static int WriteFixed64WithTag(byte[] buffer, int writePosition, int fi
154154
return writePosition;
155155
}
156156

157+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
158+
internal static int WriteSInt32WithTag(byte[] buffer, int writePosition, int fieldNumber, int value)
159+
{
160+
writePosition = WriteTag(buffer, writePosition, fieldNumber, ProtobufWireType.VARINT);
161+
162+
// https://protobuf.dev/programming-guides/encoding/#signed-ints
163+
writePosition = WriteVarInt32(buffer, writePosition, (uint)((value << 1) ^ (value >> 31)));
164+
165+
return writePosition;
166+
}
167+
157168
[MethodImpl(MethodImplOptions.AggressiveInlining)]
158169
internal static int WriteVarInt32(byte[] buffer, int writePosition, uint value)
159170
{

test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/Serializer/ProtobufSerializerTests.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,24 @@ public void WriteDoubleWithTag_WritesCorrectly()
172172
Assert.Equal(0x40, buffer[8]);
173173
}
174174

175+
[Fact]
176+
public void WriteSignedInt32_WritesCorrectly()
177+
{
178+
byte[] buffer = new byte[10];
179+
int position = ProtobufSerializer.WriteSInt32WithTag(buffer, 0, 1, 300);
180+
Assert.Equal(3, position);
181+
Assert.Equal(8, buffer[0]); // Tag
182+
Assert.Equal(0xD8, buffer[1]);
183+
Assert.Equal(0x04, buffer[2]);
184+
185+
buffer = new byte[10];
186+
position = ProtobufSerializer.WriteSInt32WithTag(buffer, 0, 1, -300);
187+
Assert.Equal(3, position);
188+
Assert.Equal(8, buffer[0]); // Tag
189+
Assert.Equal(0xD7, buffer[1]);
190+
Assert.Equal(0x04, buffer[2]);
191+
}
192+
175193
[Fact]
176194
public void WriteVarInt32_WritesCorrectly()
177195
{

0 commit comments

Comments
 (0)