Skip to content

Commit 1e7397e

Browse files
[otlp] Add Trace Exporter to transmit custom serialized data. (open-telemetry#5969)
Co-authored-by: Mikel Blanchard <[email protected]>
1 parent b201d70 commit 1e7397e

16 files changed

+786
-9
lines changed

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExperimentalOptions.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ internal sealed class ExperimentalOptions
1717

1818
public const string OtlpDiskRetryDirectoryPathEnvVar = "OTEL_DOTNET_EXPERIMENTAL_OTLP_DISK_RETRY_DIRECTORY_PATH";
1919

20+
public const string OtlpUseCustomSerializer = "OTEL_DOTNET_EXPERIMENTAL_USE_CUSTOM_PROTOBUF_SERIALIZER";
21+
2022
public ExperimentalOptions()
2123
: this(new ConfigurationBuilder().AddEnvironmentVariables().Build())
2224
{
@@ -29,6 +31,11 @@ public ExperimentalOptions(IConfiguration configuration)
2931
this.EmitLogEventAttributes = emitLogEventAttributes;
3032
}
3133

34+
if (configuration.TryGetBoolValue(OpenTelemetryProtocolExporterEventSource.Log, OtlpUseCustomSerializer, out var useCustomSerializer))
35+
{
36+
this.UseCustomProtobufSerializer = useCustomSerializer;
37+
}
38+
3239
if (configuration.TryGetStringValue(OtlpRetryEnvVar, out var retryPolicy) && retryPolicy != null)
3340
{
3441
if (retryPolicy.Equals("in_memory", StringComparison.OrdinalIgnoreCase))
@@ -78,4 +85,9 @@ public ExperimentalOptions(IConfiguration configuration)
7885
/// Gets the path on disk where the telemetry will be stored for retries at a later point.
7986
/// </summary>
8087
public string? DiskRetryDirectoryPath { get; }
88+
89+
/// <summary>
90+
/// Gets a value indicating whether custom serializer should be used for OTLP export.
91+
/// </summary>
92+
public bool UseCustomProtobufSerializer { get; }
8193
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
5+
6+
/// <summary>Export client interface.</summary>
7+
internal interface IProtobufExportClient
8+
{
9+
/// <summary>
10+
/// Method for sending export request to the server.
11+
/// </summary>
12+
/// <param name="buffer">The request body to send to the server.</param>
13+
/// <param name="contentLength">length of the content.</param>
14+
/// <param name="deadlineUtc">The deadline time in utc for export request to finish.</param>
15+
/// <param name="cancellationToken">An optional token for canceling the call.</param>
16+
/// <returns><see cref="ExportClientResponse"/>.</returns>
17+
ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default);
18+
19+
/// <summary>
20+
/// Method for shutting down the export client.
21+
/// </summary>
22+
/// <param name="timeoutMilliseconds">
23+
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
24+
/// wait indefinitely.
25+
/// </param>
26+
/// <returns>
27+
/// Returns <c>true</c> if shutdown succeeded; otherwise, <c>false</c>.
28+
/// </returns>
29+
bool Shutdown(int timeoutMilliseconds);
30+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#if NETFRAMEWORK
5+
using System.Net.Http;
6+
#endif
7+
using System.Net.Http.Headers;
8+
using Grpc.Core;
9+
using OpenTelemetry.Internal;
10+
11+
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
12+
13+
/// <summary>Base class for sending OTLP export request over gRPC.</summary>
14+
internal sealed class ProtobufOtlpGrpcExportClient : IProtobufExportClient
15+
{
16+
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
17+
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc");
18+
private static readonly Version Http2RequestVersion = new(2, 0);
19+
20+
public ProtobufOtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
21+
{
22+
Guard.ThrowIfNull(options);
23+
Guard.ThrowIfNull(httpClient);
24+
Guard.ThrowIfNull(signalPath);
25+
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);
26+
27+
Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
28+
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
29+
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
30+
this.HttpClient = httpClient;
31+
}
32+
33+
internal HttpClient HttpClient { get; }
34+
35+
internal Uri Endpoint { get; set; }
36+
37+
internal IReadOnlyDictionary<string, string> Headers { get; }
38+
39+
internal int TimeoutMilliseconds { get; }
40+
41+
/// <inheritdoc/>
42+
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
43+
{
44+
try
45+
{
46+
using var httpRequest = this.CreateHttpRequest(buffer, contentLength);
47+
48+
using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);
49+
50+
try
51+
{
52+
httpResponse.EnsureSuccessStatusCode();
53+
}
54+
catch (HttpRequestException ex)
55+
{
56+
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: httpResponse, ex);
57+
}
58+
59+
// TODO: Hande retries & failures.
60+
return SuccessExportResponse;
61+
}
62+
catch (RpcException ex)
63+
{
64+
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);
65+
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex);
66+
}
67+
}
68+
69+
public HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
70+
{
71+
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
72+
request.Version = Http2RequestVersion;
73+
74+
#if NET6_0_OR_GREATER
75+
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
76+
#endif
77+
78+
foreach (var header in this.Headers)
79+
{
80+
request.Headers.Add(header.Key, header.Value);
81+
}
82+
83+
// TODO: Support compression.
84+
85+
request.Content = new ByteArrayContent(buffer, 0, contentLength);
86+
request.Content.Headers.ContentType = MediaHeaderValue;
87+
88+
return request;
89+
}
90+
91+
public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
92+
{
93+
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
94+
}
95+
96+
/// <inheritdoc/>
97+
public bool Shutdown(int timeoutMilliseconds)
98+
{
99+
this.HttpClient.CancelPendingRequests();
100+
return true;
101+
}
102+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#if NETFRAMEWORK
5+
using System.Net.Http;
6+
#endif
7+
using System.Net.Http.Headers;
8+
using OpenTelemetry.Internal;
9+
10+
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
11+
12+
/// <summary>Class for sending OTLP trace export request over HTTP.</summary>
13+
internal sealed class ProtobufOtlpHttpExportClient : IProtobufExportClient
14+
{
15+
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
16+
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
17+
#if NET
18+
private readonly bool synchronousSendSupportedByCurrentPlatform;
19+
#endif
20+
21+
internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
22+
{
23+
Guard.ThrowIfNull(options);
24+
Guard.ThrowIfNull(httpClient);
25+
Guard.ThrowIfNull(signalPath);
26+
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);
27+
28+
Uri exporterEndpoint = options.AppendSignalPathToEndpoint
29+
? options.Endpoint.AppendPathIfNotPresent(signalPath)
30+
: options.Endpoint;
31+
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
32+
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
33+
this.HttpClient = httpClient;
34+
35+
#if NET
36+
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
37+
this.synchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
38+
&& !OperatingSystem.IsIOS()
39+
&& !OperatingSystem.IsTvOS()
40+
&& !OperatingSystem.IsBrowser();
41+
#endif
42+
}
43+
44+
internal HttpClient HttpClient { get; }
45+
46+
internal Uri Endpoint { get; set; }
47+
48+
internal IReadOnlyDictionary<string, string> Headers { get; }
49+
50+
/// <inheritdoc/>
51+
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
52+
{
53+
try
54+
{
55+
using var httpRequest = this.CreateHttpRequest(buffer, contentLength);
56+
57+
using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);
58+
59+
try
60+
{
61+
httpResponse.EnsureSuccessStatusCode();
62+
}
63+
catch (HttpRequestException ex)
64+
{
65+
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: httpResponse, ex);
66+
}
67+
68+
return SuccessExportResponse;
69+
}
70+
catch (HttpRequestException ex)
71+
{
72+
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);
73+
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: null, exception: ex);
74+
}
75+
}
76+
77+
/// <inheritdoc/>
78+
public bool Shutdown(int timeoutMilliseconds)
79+
{
80+
this.HttpClient.CancelPendingRequests();
81+
return true;
82+
}
83+
84+
public HttpRequestMessage CreateHttpRequest(byte[] exportRequest, int contentLength)
85+
{
86+
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
87+
88+
foreach (var header in this.Headers)
89+
{
90+
request.Headers.Add(header.Key, header.Value);
91+
}
92+
93+
var content = new ByteArrayContent(exportRequest, 0, contentLength);
94+
content.Headers.ContentType = MediaHeaderValue;
95+
request.Content = content;
96+
97+
return request;
98+
}
99+
100+
public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
101+
{
102+
#if NET
103+
return this.synchronousSendSupportedByCurrentPlatform
104+
? this.HttpClient.Send(request, cancellationToken)
105+
: this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
106+
#else
107+
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
108+
#endif
109+
}
110+
}

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTraceFieldNumberConstants.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer
55

66
internal static class ProtobufOtlpTraceFieldNumberConstants
77
{
8-
// Resource spans
98
#pragma warning disable SA1310 // Field names should not contain underscore
9+
10+
// Traces data
11+
internal const int TracesData_Resource_Spans = 1;
12+
13+
// Resource spans
1014
internal const int ResourceSpans_Resource = 1;
1115
internal const int ResourceSpans_Scope_Spans = 2;
1216
internal const int ResourceSpans_Schema_Url = 3;

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTraceSerializer.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@ internal static class ProtobufOtlpTraceSerializer
2020

2121
internal static int WriteTraceData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource, in Batch<Activity> batch)
2222
{
23+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpTraceFieldNumberConstants.TracesData_Resource_Spans, ProtobufWireType.LEN);
24+
int resourceSpansScopeSpansLengthPosition = writePosition;
25+
writePosition += ReserveSizeForLength;
26+
2327
foreach (var activity in batch)
2428
{
2529
var sourceName = activity.Source.Name;
2630
if (!ScopeTracesList.TryGetValue(sourceName, out var activities))
2731
{
28-
activities = ActivityListPool.Count > 0 ? ActivityListPool.Pop() : new List<Activity>();
32+
activities = ActivityListPool.Count > 0 ? ActivityListPool.Pop() : [];
2933
ScopeTracesList[sourceName] = activities;
3034
}
3135

@@ -34,6 +38,7 @@ internal static int WriteTraceData(byte[] buffer, int writePosition, SdkLimitOpt
3438

3539
writePosition = WriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource, ScopeTracesList);
3640
ReturnActivityListToPool();
41+
ProtobufSerializer.WriteReservedLength(buffer, resourceSpansScopeSpansLengthPosition, writePosition - (resourceSpansScopeSpansLengthPosition + ReserveSizeForLength));
3742

3843
return writePosition;
3944
}

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterPersistentStorageTransmissionHandler.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ internal bool InitiateAndWaitForRetryProcess(int timeOutMilliseconds)
5656

5757
protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
5858
{
59-
if (RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out _))
59+
if (RetryHelper.ShouldRetryRequest(response, OtlpRetry.InitialBackoffMilliseconds, out _))
6060
{
6161
byte[]? data = null;
6262
if (request is ExportTraceServiceRequest traceRequest)
@@ -158,7 +158,8 @@ private void RetryStoredRequests()
158158
{
159159
var deadlineUtc = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);
160160
var request = this.requestFactory.Invoke(data);
161-
if (this.TryRetryRequest(request, deadlineUtc, out var response) || !RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out var retryInfo))
161+
if (this.TryRetryRequest(request, deadlineUtc, out var response)
162+
|| !RetryHelper.ShouldRetryRequest(response, OtlpRetry.InitialBackoffMilliseconds, out var retryInfo))
162163
{
163164
blob.TryDelete();
164165
}

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterRetryTransmissionHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ internal OtlpExporterRetryTransmissionHandler(IExportClient<TRequest> exportClie
1515
protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
1616
{
1717
var nextRetryDelayMilliseconds = OtlpRetry.InitialBackoffMilliseconds;
18-
while (RetryHelper.ShouldRetryRequest(request, response, nextRetryDelayMilliseconds, out var retryResult))
18+
while (RetryHelper.ShouldRetryRequest(response, nextRetryDelayMilliseconds, out var retryResult))
1919
{
2020
// Note: This delay cannot exceed the configured timeout period for otlp exporter.
2121
// If the backend responds with `RetryAfter` duration that would result in exceeding the configured timeout period

0 commit comments

Comments
 (0)