Skip to content

Commit be82099

Browse files
[otlp] Expand array buffer / add tests to existing base buffer (open-telemetry#6013)
Co-authored-by: Mikel Blanchard <[email protected]>
1 parent 145e7ad commit be82099

File tree

16 files changed

+525
-42
lines changed

16 files changed

+525
-42
lines changed

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ public void GrpcRetryDelayParsingFailed(string grpcStatusDetailsHeader, string e
220220
this.WriteEvent(24, grpcStatusDetailsHeader, exception);
221221
}
222222

223+
[Event(25, Message = "The array tag buffer exceeded the maximum allowed size. The array tag value was replaced with 'TRUNCATED'", Level = EventLevel.Warning)]
224+
public void ArrayBufferExceededMaxSize()
225+
{
226+
this.WriteEvent(25);
227+
}
228+
223229
void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value)
224230
{
225231
this.InvalidConfigurationValue(key, value);

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogSerializer.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ internal static class ProtobufOtlpLogSerializer
2020
[ThreadStatic]
2121
private static SerializationState? threadSerializationState;
2222

23-
internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, in Batch<LogRecord> logRecordBatch)
23+
internal static int WriteLogsData(ref byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, in Batch<LogRecord> logRecordBatch)
2424
{
2525
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpLogFieldNumberConstants.LogsData_Resource_Logs, ProtobufWireType.LEN);
2626
int logsDataLengthPosition = writePosition;
@@ -47,27 +47,27 @@ internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOpti
4747
logRecords.Add(logRecord);
4848
}
4949

50-
writePosition = TryWriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
50+
writePosition = TryWriteResourceLogs(ref buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
5151
ProtobufSerializer.WriteReservedLength(buffer, logsDataLengthPosition, writePosition - (logsDataLengthPosition + ReserveSizeForLength));
5252
ReturnLogRecordListToPool();
5353

5454
return writePosition;
5555
}
5656

57-
internal static int TryWriteResourceLogs(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, Dictionary<string, List<LogRecord>> scopeLogs)
57+
internal static int TryWriteResourceLogs(ref byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, Dictionary<string, List<LogRecord>> scopeLogs)
5858
{
5959
try
6060
{
6161
writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
6262
}
63-
catch (IndexOutOfRangeException)
63+
catch (Exception ex) when (ex is IndexOutOfRangeException || ex is ArgumentException)
6464
{
6565
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Logs))
6666
{
6767
throw;
6868
}
6969

70-
return TryWriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
70+
return TryWriteResourceLogs(ref buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
7171
}
7272

7373
return writePosition;

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpMetricSerializer.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal static class ProtobufOtlpMetricSerializer
1717

1818
private delegate int WriteExemplarFunc(byte[] buffer, int writePosition, in Exemplar exemplar);
1919

20-
internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources.Resource? resource, in Batch<Metric> batch)
20+
internal static int WriteMetricsData(ref byte[] buffer, int writePosition, Resources.Resource? resource, in Batch<Metric> batch)
2121
{
2222
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.MetricsData_Resource_Metrics, ProtobufWireType.LEN);
2323
int mericsDataLengthPosition = writePosition;
@@ -35,27 +35,27 @@ internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources
3535
metrics.Add(metric);
3636
}
3737

38-
writePosition = TryWriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList);
38+
writePosition = TryWriteResourceMetrics(ref buffer, writePosition, resource, ScopeMetricsList);
3939
ProtobufSerializer.WriteReservedLength(buffer, mericsDataLengthPosition, writePosition - (mericsDataLengthPosition + ReserveSizeForLength));
4040
ReturnMetricListToPool();
4141

4242
return writePosition;
4343
}
4444

45-
internal static int TryWriteResourceMetrics(byte[] buffer, int writePosition, Resources.Resource? resource, Dictionary<string, List<Metric>> scopeMetrics)
45+
internal static int TryWriteResourceMetrics(ref byte[] buffer, int writePosition, Resources.Resource? resource, Dictionary<string, List<Metric>> scopeMetrics)
4646
{
4747
try
4848
{
4949
writePosition = WriteResourceMetrics(buffer, writePosition, resource, scopeMetrics);
5050
}
51-
catch (IndexOutOfRangeException)
51+
catch (Exception ex) when (ex is IndexOutOfRangeException || ex is ArgumentException)
5252
{
5353
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Metrics))
5454
{
5555
throw;
5656
}
5757

58-
return TryWriteResourceMetrics(buffer, writePosition, resource, scopeMetrics);
58+
return TryWriteResourceMetrics(ref buffer, writePosition, resource, scopeMetrics);
5959
}
6060

6161
return writePosition;

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTagWriter.cs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
using System.Diagnostics;
45
using OpenTelemetry.Internal;
56

67
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
@@ -61,7 +62,6 @@ protected override void WriteStringTag(ref OtlpTagWriterState state, string key,
6162

6263
protected override void WriteArrayTag(ref OtlpTagWriterState state, string key, ref OtlpTagWriterArrayState value)
6364
{
64-
// TODO: Expand OtlpTagWriterArrayState.Buffer on IndexOutOfRangeException.
6565
// Write KeyValue tag
6666
state.WritePosition = ProtobufSerializer.WriteStringWithTag(state.Buffer, state.WritePosition, ProtobufOtlpCommonFieldNumberConstants.KeyValue_Key, key);
6767

@@ -95,18 +95,19 @@ internal struct OtlpTagWriterArrayState
9595
public int WritePosition;
9696
}
9797

98-
private sealed class OtlpArrayTagWriter : ArrayTagWriter<OtlpTagWriterArrayState>
98+
internal sealed class OtlpArrayTagWriter : ArrayTagWriter<OtlpTagWriterArrayState>
9999
{
100100
[ThreadStatic]
101-
private static byte[]? threadBuffer;
101+
internal static byte[]? ThreadBuffer;
102+
private const int MaxBufferSize = 2 * 1024 * 1024;
102103

103104
public override OtlpTagWriterArrayState BeginWriteArray()
104105
{
105-
threadBuffer ??= new byte[2048];
106+
ThreadBuffer ??= new byte[2048];
106107

107108
return new OtlpTagWriterArrayState
108109
{
109-
Buffer = threadBuffer,
110+
Buffer = ThreadBuffer,
110111
WritePosition = 0,
111112
};
112113
}
@@ -149,5 +150,29 @@ public override void WriteStringValue(ref OtlpTagWriterArrayState state, ReadOnl
149150
public override void EndWriteArray(ref OtlpTagWriterArrayState state)
150151
{
151152
}
153+
154+
public override bool TryResize()
155+
{
156+
var buffer = ThreadBuffer;
157+
158+
Debug.Assert(buffer != null, "buffer was null");
159+
160+
if (buffer!.Length >= MaxBufferSize)
161+
{
162+
OpenTelemetryProtocolExporterEventSource.Log.ArrayBufferExceededMaxSize();
163+
return false;
164+
}
165+
166+
try
167+
{
168+
ThreadBuffer = new byte[buffer.Length * 2];
169+
return true;
170+
}
171+
catch (OutOfMemoryException)
172+
{
173+
OpenTelemetryProtocolExporterEventSource.Log.BufferResizeFailedDueToMemory(nameof(OtlpArrayTagWriter));
174+
return false;
175+
}
176+
}
152177
}
153178
}

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTraceSerializer.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ internal static class ProtobufOtlpTraceSerializer
1818
private static readonly Stack<List<Activity>> ActivityListPool = [];
1919
private static readonly Dictionary<string, List<Activity>> ScopeTracesList = [];
2020

21-
internal static int WriteTraceData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource, in Batch<Activity> batch)
21+
internal static int WriteTraceData(ref byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource, in Batch<Activity> batch)
2222
{
2323
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpTraceFieldNumberConstants.TracesData_Resource_Spans, ProtobufWireType.LEN);
2424
int resourceSpansScopeSpansLengthPosition = writePosition;
@@ -36,20 +36,20 @@ internal static int WriteTraceData(byte[] buffer, int writePosition, SdkLimitOpt
3636
activities.Add(activity);
3737
}
3838

39-
writePosition = TryWriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource);
39+
writePosition = TryWriteResourceSpans(ref buffer, writePosition, sdkLimitOptions, resource);
4040
ReturnActivityListToPool();
4141
ProtobufSerializer.WriteReservedLength(buffer, resourceSpansScopeSpansLengthPosition, writePosition - (resourceSpansScopeSpansLengthPosition + ReserveSizeForLength));
4242

4343
return writePosition;
4444
}
4545

46-
internal static int TryWriteResourceSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource)
46+
internal static int TryWriteResourceSpans(ref byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource)
4747
{
4848
try
4949
{
5050
writePosition = WriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource);
5151
}
52-
catch (IndexOutOfRangeException)
52+
catch (Exception ex) when (ex is IndexOutOfRangeException || ex is ArgumentException)
5353
{
5454
// Attempt to increase the buffer size
5555
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Traces))
@@ -61,7 +61,7 @@ internal static int TryWriteResourceSpans(byte[] buffer, int writePosition, SdkL
6161
// The recursion depth is limited to a maximum of 7 calls, as the buffer size starts at ~732 KB
6262
// and doubles until it reaches the maximum size of 100 MB. This ensures the recursion remains safe
6363
// and avoids stack overflow.
64-
return TryWriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource);
64+
return TryWriteResourceSpans(ref buffer, writePosition, sdkLimitOptions, resource);
6565
}
6666

6767
return writePosition;

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufSerializer.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,13 @@ internal static int WriteStringWithTag(byte[] buffer, int writePosition, int fie
321321
writePosition = WriteLength(buffer, writePosition, numberOfUtf8CharsInString);
322322

323323
#if NETFRAMEWORK || NETSTANDARD2_0
324+
if (buffer.Length - writePosition < numberOfUtf8CharsInString)
325+
{
326+
// Note: Validate there is enough space in the buffer to hold the
327+
// string otherwise throw to trigger a resize of the buffer.
328+
throw new IndexOutOfRangeException();
329+
}
330+
324331
unsafe
325332
{
326333
fixed (char* strPtr = &GetNonNullPinnableReference(value))

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
7272

7373
try
7474
{
75-
int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch);
75+
int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(ref this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch);
7676

7777
if (this.startWritePosition == GrpcStartWritePosition)
7878
{

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public override ExportResult Export(in Batch<Metric> metrics)
6565

6666
try
6767
{
68-
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(this.buffer, this.startWritePosition, this.Resource, metrics);
68+
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(ref this.buffer, this.startWritePosition, this.Resource, metrics);
6969

7070
if (this.startWritePosition == GrpcStartWritePosition)
7171
{

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
6868

6969
try
7070
{
71-
int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch);
71+
int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(ref this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch);
7272

7373
if (this.startWritePosition == GrpcStartWritePosition)
7474
{

src/Shared/TagWriter/ArrayTagWriter.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ internal abstract class ArrayTagWriter<TArrayState>
1919
public abstract void WriteStringValue(ref TArrayState state, ReadOnlySpan<char> value);
2020

2121
public abstract void EndWriteArray(ref TArrayState state);
22+
23+
public virtual bool TryResize() => false;
2224
}

0 commit comments

Comments
 (0)