Skip to content

Commit 7eeddf5

Browse files
authored
[otlp] Refactor shared protobuf otlp export client code into a base class (open-telemetry#6001)
1 parent 88d2ad6 commit 7eeddf5

File tree

3 files changed

+118
-127
lines changed

3 files changed

+118
-127
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#if NETFRAMEWORK
5+
using System.Net.Http;
6+
#endif
7+
using System.Net.Http.Headers;
8+
using OpenTelemetry.Internal;
9+
10+
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
11+
12+
internal abstract class ProtobufOtlpExportClient : IProtobufExportClient
13+
{
14+
private static readonly Version Http2RequestVersion = new(2, 0);
15+
16+
#if NET
17+
private static readonly bool SynchronousSendSupportedByCurrentPlatform;
18+
19+
static ProtobufOtlpExportClient()
20+
{
21+
#if NET
22+
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
23+
SynchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
24+
&& !OperatingSystem.IsIOS()
25+
&& !OperatingSystem.IsTvOS()
26+
&& !OperatingSystem.IsBrowser();
27+
#endif
28+
}
29+
#endif
30+
31+
protected ProtobufOtlpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
32+
{
33+
Guard.ThrowIfNull(options);
34+
Guard.ThrowIfNull(httpClient);
35+
Guard.ThrowIfNull(signalPath);
36+
37+
Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
38+
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
39+
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
40+
this.HttpClient = httpClient;
41+
}
42+
43+
internal HttpClient HttpClient { get; }
44+
45+
internal Uri Endpoint { get; }
46+
47+
internal IReadOnlyDictionary<string, string> Headers { get; }
48+
49+
internal abstract MediaTypeHeaderValue MediaTypeHeader { get; }
50+
51+
internal virtual bool RequireHttp2 => false;
52+
53+
public abstract ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default);
54+
55+
/// <inheritdoc/>
56+
public bool Shutdown(int timeoutMilliseconds)
57+
{
58+
this.HttpClient.CancelPendingRequests();
59+
return true;
60+
}
61+
62+
protected HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
63+
{
64+
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
65+
66+
if (this.RequireHttp2)
67+
{
68+
request.Version = Http2RequestVersion;
69+
70+
#if NET6_0_OR_GREATER
71+
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
72+
#endif
73+
}
74+
75+
foreach (var header in this.Headers)
76+
{
77+
request.Headers.Add(header.Key, header.Value);
78+
}
79+
80+
// TODO: Support compression.
81+
82+
request.Content = new ByteArrayContent(buffer, 0, contentLength);
83+
request.Content.Headers.ContentType = this.MediaTypeHeader;
84+
85+
return request;
86+
}
87+
88+
protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
89+
{
90+
#if NET
91+
// Note: SendAsync must be used with HTTP/2 because synchronous send is
92+
// not supported.
93+
return this.RequireHttp2 || !SynchronousSendSupportedByCurrentPlatform
94+
? this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult()
95+
: this.HttpClient.Send(request, cancellationToken);
96+
#else
97+
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
98+
#endif
99+
}
100+
}

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs

Lines changed: 14 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,15 @@
66
#endif
77
using System.Net.Http.Headers;
88
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc;
9-
using OpenTelemetry.Internal;
109

1110
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
1211

1312
/// <summary>Base class for sending OTLP export request over gRPC.</summary>
14-
internal sealed class ProtobufOtlpGrpcExportClient : IProtobufExportClient
13+
internal sealed class ProtobufOtlpGrpcExportClient : ProtobufOtlpExportClient
1514
{
1615
public const string GrpcStatusDetailsHeader = "grpc-status-details-bin";
1716
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
1817
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc");
19-
private static readonly Version Http2RequestVersion = new(2, 0);
2018

2119
private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrpcResponse
2220
= new(
@@ -27,49 +25,34 @@ private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrp
2725
grpcStatusDetailsHeader: null);
2826

2927
public ProtobufOtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
28+
: base(options, httpClient, signalPath)
3029
{
31-
Guard.ThrowIfNull(options);
32-
Guard.ThrowIfNull(httpClient);
33-
Guard.ThrowIfNull(signalPath);
34-
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);
35-
36-
Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
37-
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
38-
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
39-
this.HttpClient = httpClient;
4030
}
4131

42-
internal HttpClient HttpClient { get; }
32+
internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue;
4333

44-
internal Uri Endpoint { get; set; }
45-
46-
internal IReadOnlyDictionary<string, string> Headers { get; }
47-
48-
internal int TimeoutMilliseconds { get; }
34+
internal override bool RequireHttp2 => true;
4935

5036
/// <inheritdoc/>
51-
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
37+
public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
5238
{
5339
try
5440
{
5541
using var httpRequest = this.CreateHttpRequest(buffer, contentLength);
5642
using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);
5743

58-
try
59-
{
60-
httpResponse.EnsureSuccessStatusCode();
61-
}
62-
catch (HttpRequestException)
63-
{
64-
throw;
65-
}
44+
httpResponse.EnsureSuccessStatusCode();
6645

6746
var trailingHeaders = httpResponse.TrailingHeaders();
6847
Status status = GrpcProtocolHelpers.GetResponseStatus(httpResponse, trailingHeaders);
6948

7049
if (status.Detail.Equals(Status.NoReplyDetailMessage))
7150
{
51+
#if NET
52+
using var responseStream = httpResponse.Content.ReadAsStream(cancellationToken);
53+
#else
7254
using var responseStream = httpResponse.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
55+
#endif
7356
int firstByte = responseStream.ReadByte();
7457

7558
if (firstByte == -1)
@@ -170,45 +153,11 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength,
170153
}
171154
}
172155

173-
public HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
174-
{
175-
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
176-
request.Version = Http2RequestVersion;
177-
178-
#if NET6_0_OR_GREATER
179-
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
180-
#endif
181-
182-
foreach (var header in this.Headers)
183-
{
184-
request.Headers.Add(header.Key, header.Value);
185-
}
186-
187-
// TODO: Support compression.
188-
189-
request.Content = new ByteArrayContent(buffer, 0, contentLength);
190-
request.Content.Headers.ContentType = MediaHeaderValue;
191-
192-
return request;
193-
}
194-
195-
public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
196-
{
197-
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
198-
}
199-
200-
/// <inheritdoc/>
201-
public bool Shutdown(int timeoutMilliseconds)
202-
{
203-
this.HttpClient.CancelPendingRequests();
204-
return true;
205-
}
206-
207156
private static bool IsTransientNetworkError(HttpRequestException ex)
208157
{
209-
return ex.InnerException is System.Net.Sockets.SocketException socketEx &&
210-
(socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut ||
211-
socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset ||
212-
socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable);
158+
return ex.InnerException is System.Net.Sockets.SocketException socketEx
159+
&& (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut
160+
|| socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset
161+
|| socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable);
213162
}
214163
}

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs

Lines changed: 4 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,48 +5,24 @@
55
using System.Net.Http;
66
#endif
77
using System.Net.Http.Headers;
8-
using OpenTelemetry.Internal;
98

109
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
1110

1211
/// <summary>Class for sending OTLP trace export request over HTTP.</summary>
13-
internal sealed class ProtobufOtlpHttpExportClient : IProtobufExportClient
12+
internal sealed class ProtobufOtlpHttpExportClient : ProtobufOtlpExportClient
1413
{
1514
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
1615
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
17-
#if NET
18-
private readonly bool synchronousSendSupportedByCurrentPlatform;
19-
#endif
2016

2117
internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
18+
: base(options, httpClient, signalPath)
2219
{
23-
Guard.ThrowIfNull(options);
24-
Guard.ThrowIfNull(httpClient);
25-
Guard.ThrowIfNull(signalPath);
26-
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);
27-
28-
Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
29-
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
30-
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
31-
this.HttpClient = httpClient;
32-
33-
#if NET
34-
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
35-
this.synchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
36-
&& !OperatingSystem.IsIOS()
37-
&& !OperatingSystem.IsTvOS()
38-
&& !OperatingSystem.IsBrowser();
39-
#endif
4020
}
4121

42-
internal HttpClient HttpClient { get; }
43-
44-
internal Uri Endpoint { get; set; }
45-
46-
internal IReadOnlyDictionary<string, string> Headers { get; }
22+
internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue;
4723

4824
/// <inheritdoc/>
49-
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
25+
public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
5026
{
5127
try
5228
{
@@ -71,38 +47,4 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength,
7147
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: null, exception: ex);
7248
}
7349
}
74-
75-
/// <inheritdoc/>
76-
public bool Shutdown(int timeoutMilliseconds)
77-
{
78-
this.HttpClient.CancelPendingRequests();
79-
return true;
80-
}
81-
82-
public HttpRequestMessage CreateHttpRequest(byte[] exportRequest, int contentLength)
83-
{
84-
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
85-
86-
foreach (var header in this.Headers)
87-
{
88-
request.Headers.Add(header.Key, header.Value);
89-
}
90-
91-
var content = new ByteArrayContent(exportRequest, 0, contentLength);
92-
content.Headers.ContentType = MediaHeaderValue;
93-
request.Content = content;
94-
95-
return request;
96-
}
97-
98-
public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
99-
{
100-
#if NET
101-
return this.synchronousSendSupportedByCurrentPlatform
102-
? this.HttpClient.Send(request, cancellationToken)
103-
: this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
104-
#else
105-
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
106-
#endif
107-
}
10850
}

0 commit comments

Comments
 (0)