diff --git a/examples/Console/Program.cs b/examples/Console/Program.cs index a7d0e822f47..3a29544a608 100644 --- a/examples/Console/Program.cs +++ b/examples/Console/Program.cs @@ -125,6 +125,9 @@ internal sealed class OtlpOptions [Option('p', "protocol", HelpText = "Transport protocol used by exporter. Supported values: grpc and http/protobuf.", Default = "grpc")] public string? Protocol { get; set; } + + [Option('c', "compression", HelpText = "Compression algorithm used by exporter. Supported values: none and gzip.", Default = "none")] + public string? Compression { get; set; } } [Verb("logs", HelpText = "Specify the options required to test Logs")] diff --git a/examples/Console/TestOtlpExporter.cs b/examples/Console/TestOtlpExporter.cs index c38ac518a18..eb9402975be 100644 --- a/examples/Console/TestOtlpExporter.cs +++ b/examples/Console/TestOtlpExporter.cs @@ -62,6 +62,10 @@ private static int RunWithActivitySource(OtlpOptions options) } opt.Protocol = otlpExportProtocol.Value; + if (Enum.TryParse(options.Compression, true, out var compression)) + { + opt.Compression = compression; + } System.Console.WriteLine($"OTLP Exporter is using {opt.Protocol} protocol and endpoint {opt.Endpoint}"); }) diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/.publicApi/Stable/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/.publicApi/Stable/PublicAPI.Unshipped.txt index e69de29bb2d..9423c204626 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/.publicApi/Stable/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/.publicApi/Stable/PublicAPI.Unshipped.txt @@ -0,0 +1,5 @@ +OpenTelemetry.Exporter.OtlpExportCompression +OpenTelemetry.Exporter.OtlpExportCompression.Gzip = 1 -> OpenTelemetry.Exporter.OtlpExportCompression +OpenTelemetry.Exporter.OtlpExportCompression.None = 0 -> OpenTelemetry.Exporter.OtlpExportCompression +OpenTelemetry.Exporter.OtlpExporterOptions.Compression.get -> OpenTelemetry.Exporter.OtlpExportCompression +OpenTelemetry.Exporter.OtlpExporterOptions.Compression.set -> void diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/CHANGELOG.md b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/CHANGELOG.md index 82056bf2c5f..faf1c80d95b 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/CHANGELOG.md +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/CHANGELOG.md @@ -7,6 +7,13 @@ Notes](../../RELEASENOTES.md). ## Unreleased +* Added support for gzip compression in the OTLP exporter for HTTP and gRPC. + Compression can be configured using the environment variables `OTEL_EXPORTER_OTLP_COMPRESSION`, + `OTEL_EXPORTER_OTLP_TRACES_COMPRESSION`, `OTEL_EXPORTER_OTLP_METRICS_COMPRESSION`, + `OTEL_EXPORTER_OTLP_LOGS_COMPRESSION`. Setting the respective environment variable + to `gzip` activates compression. + ([#6494](https://github.com/open-telemetry/opentelemetry-dotnet/pull/6494)) + * Fixed an issue in .NET Framework where OTLP export of traces, logs, and metrics using `OtlpExportProtocol.Grpc` did not correctly set the initial write position, resulting in gRPC protocol errors. diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/IOtlpExporterOptions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/IOtlpExporterOptions.cs index d6402bc85a8..a900d9074f0 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/IOtlpExporterOptions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/IOtlpExporterOptions.cs @@ -69,6 +69,15 @@ internal interface IOtlpExporterOptions /// int TimeoutMilliseconds { get; set; } + /// + /// Gets or sets a value indicating how to compress the payload. + /// Currently Gzip is the only supported compression method. + /// Note: Refer to the + /// OpenTelemetry Specification for details />. + /// + OtlpExportCompression Compression { get; set; } + /// /// Gets or sets the factory function called to create the instance that will be used at runtime to diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpExportClient.cs index 24fc1551cc9..535918c8e2e 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpExportClient.cs @@ -44,10 +44,13 @@ protected OtlpExportClient(OtlpExporterOptions options, HttpClient httpClient, s this.Endpoint = new UriBuilder(exporterEndpoint).Uri; this.Headers = options.GetHeaders>((d, k, v) => d.Add(k, v)); this.HttpClient = httpClient; + this.Compression = options.Compression; } internal HttpClient HttpClient { get; } + internal OtlpExportCompression Compression { get; } + internal Uri Endpoint { get; } internal IReadOnlyDictionary Headers { get; } @@ -56,6 +59,10 @@ protected OtlpExportClient(OtlpExporterOptions options, HttpClient httpClient, s internal virtual bool RequireHttp2 => false; + protected abstract string? ContentEncodingHeaderKey { get; } + + protected abstract string? ContentEncodingHeaderValue { get; } + public abstract ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default); /// @@ -83,14 +90,26 @@ protected HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength) request.Headers.Add(header.Key, header.Value); } - // TODO: Support compression. + Stream data = new MemoryStream(buffer, 0, contentLength); + + if (this.Compression == OtlpExportCompression.Gzip) + { + data = this.Compress(data); + } - request.Content = new ByteArrayContent(buffer, 0, contentLength); + request.Content = new StreamContent(data); request.Content.Headers.ContentType = this.MediaTypeHeader; + if (this.Compression == OtlpExportCompression.Gzip && this.ContentEncodingHeaderKey != null) + { + request.Content.Headers.Add(this.ContentEncodingHeaderKey, this.ContentEncodingHeaderValue); + } + return request; } + protected abstract Stream Compress(Stream dataStream); + protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken) { #if NET diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcExportClient.cs index 31b7d0e9bb2..86231364e14 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcExportClient.cs @@ -4,6 +4,8 @@ #if NETFRAMEWORK using System.Net.Http; #endif +using System.Buffers.Binary; +using System.IO.Compression; using System.Net.Http.Headers; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc; @@ -13,6 +15,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie internal sealed class OtlpGrpcExportClient : OtlpExportClient { public const string GrpcStatusDetailsHeader = "grpc-status-details-bin"; + private const int GrpcMessageHeaderSize = 5; private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null); private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc"); @@ -33,11 +36,24 @@ public OtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, internal override bool RequireHttp2 => true; + protected override string ContentEncodingHeaderKey => "grpc-encoding"; + + protected override string ContentEncodingHeaderValue => "gzip"; + /// public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) { try { + // A gRPC message consists of 3 parts: + // byte 0 - Compression flag (0 = not compressed, 1 = compressed). + // bytes 1-4 - Message length in big-endian format (length of the serialized data only). + // bytes 5+ - Protobuf-encoded payload. + buffer[0] = this.Compression == OtlpExportCompression.Gzip ? (byte)1 : (byte)0; + Span data = new Span(buffer, 1, 4); + var dataLength = contentLength - GrpcMessageHeaderSize; + BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); + using var httpRequest = this.CreateHttpRequest(buffer, contentLength); // TE is required by some servers, e.g. C Core. @@ -158,6 +174,27 @@ public override ExportClientResponse SendExportRequest(byte[] buffer, int conten } } + protected override Stream Compress(Stream dataStream) + { + var compressedStream = new MemoryStream(); + + compressedStream.WriteByte(1); + compressedStream.Write([0, 0, 0, 0], 0, 4); + + dataStream.Position = GrpcMessageHeaderSize; + + using (var gzipStream = new GZipStream(compressedStream, CompressionLevel.Fastest, leaveOpen: true)) + { + dataStream.CopyTo(gzipStream); + } + + var compressedDataLength = (uint)(compressedStream.Length - GrpcMessageHeaderSize); + BinaryPrimitives.WriteUInt32BigEndian(compressedStream.GetBuffer().AsSpan(1, 4), compressedDataLength); + + compressedStream.Position = 0; + return compressedStream; + } + private static bool IsTransientNetworkError(HttpRequestException ex) { return ex.InnerException is System.Net.Sockets.SocketException socketEx diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpExportClient.cs index 0d938a90f89..026e907f4b2 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpExportClient.cs @@ -4,6 +4,7 @@ #if NETFRAMEWORK using System.Net.Http; #endif +using System.IO.Compression; using System.Net.Http.Headers; namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; @@ -21,6 +22,10 @@ internal OtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue; + protected override string ContentEncodingHeaderKey => "Content-Encoding"; + + protected override string ContentEncodingHeaderValue => "gzip"; + /// public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) { @@ -47,4 +52,16 @@ public override ExportClientResponse SendExportRequest(byte[] buffer, int conten return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: null, exception: ex); } } + + protected override Stream Compress(Stream dataStream) + { + var compressedStream = new MemoryStream(); + using (var gzipStream = new GZipStream(compressedStream, CompressionLevel.Fastest, leaveOpen: true)) + { + dataStream.CopyTo(gzipStream); + } + + compressedStream.Position = 0; + return compressedStream; + } } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OtlpSpecConfigDefinitions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OtlpSpecConfigDefinitions.cs index 3bc62218b3f..cf4ab6ecd6f 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OtlpSpecConfigDefinitions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OtlpSpecConfigDefinitions.cs @@ -15,20 +15,24 @@ internal static class OtlpSpecConfigDefinitions public const string DefaultHeadersEnvVarName = "OTEL_EXPORTER_OTLP_HEADERS"; public const string DefaultTimeoutEnvVarName = "OTEL_EXPORTER_OTLP_TIMEOUT"; public const string DefaultProtocolEnvVarName = "OTEL_EXPORTER_OTLP_PROTOCOL"; + public const string DefaultCompressionEnvVarName = "OTEL_EXPORTER_OTLP_COMPRESSION"; public const string LogsEndpointEnvVarName = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"; public const string LogsHeadersEnvVarName = "OTEL_EXPORTER_OTLP_LOGS_HEADERS"; public const string LogsTimeoutEnvVarName = "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT"; public const string LogsProtocolEnvVarName = "OTEL_EXPORTER_OTLP_LOGS_PROTOCOL"; + public const string LogsCompressionEnvVarName = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; public const string MetricsEndpointEnvVarName = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"; public const string MetricsHeadersEnvVarName = "OTEL_EXPORTER_OTLP_METRICS_HEADERS"; public const string MetricsTimeoutEnvVarName = "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT"; public const string MetricsProtocolEnvVarName = "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL"; public const string MetricsTemporalityPreferenceEnvVarName = "OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE"; + public const string MetricsCompressionEnvVarName = "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION"; public const string TracesEndpointEnvVarName = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"; public const string TracesHeadersEnvVarName = "OTEL_EXPORTER_OTLP_TRACES_HEADERS"; public const string TracesTimeoutEnvVarName = "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT"; public const string TracesProtocolEnvVarName = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"; + public const string TracesCompressionEnvVarName = "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION"; } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExportCompression.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExportCompression.cs new file mode 100644 index 00000000000..cdc920c73bd --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExportCompression.cs @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Exporter; + +/// +/// Supported compression methods for OTLP exporter according to the specification https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md. +/// +public enum OtlpExportCompression +{ + /// + /// No compression. + /// + None = 0, + + /// + /// Compress with Gzip. + /// + Gzip = 1, +} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptions.cs index 91ebfdbd3e1..c9f8b3b47f9 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptions.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 using System.Diagnostics; + #if NETFRAMEWORK using System.Net.Http; #endif @@ -136,6 +137,9 @@ public OtlpExportProtocol Protocol /// Note: This only applies when exporting traces. public BatchExportProcessorOptions BatchExportProcessorOptions { get; set; } + /// + public OtlpExportCompression Compression { get; set; } = OtlpExportCompression.None; + /// public Func HttpClientFactory { @@ -179,7 +183,8 @@ internal void ApplyConfigurationUsingSpecificationEnvVars( bool appendSignalPathToEndpoint, string protocolEnvVarKey, string headersEnvVarKey, - string timeoutEnvVarKey) + string timeoutEnvVarKey, + string compressPayloadEnvVarKey) { if (configuration.TryGetUriValue(OpenTelemetryProtocolExporterEventSource.Log, endpointEnvVarKey, out var endpoint)) { @@ -205,6 +210,18 @@ internal void ApplyConfigurationUsingSpecificationEnvVars( { this.TimeoutMilliseconds = timeout; } + + if (configuration.TryGetStringValue(compressPayloadEnvVarKey, out var compressPayload)) + { + if (Enum.TryParse(compressPayload.Trim(), true, out var compression)) + { + this.Compression = compression; + } + else + { + OpenTelemetryProtocolExporterEventSource.Log.InvalidConfigurationValue(compressPayloadEnvVarKey, compressPayload); + } + } } internal OtlpExporterOptions ApplyDefaults(OtlpExporterOptions defaultExporterOptions) @@ -250,7 +267,8 @@ private void ApplyConfiguration( appendSignalPathToEndpoint: true, OtlpSpecConfigDefinitions.DefaultProtocolEnvVarName, OtlpSpecConfigDefinitions.DefaultHeadersEnvVarName, - OtlpSpecConfigDefinitions.DefaultTimeoutEnvVarName); + OtlpSpecConfigDefinitions.DefaultTimeoutEnvVarName, + OtlpSpecConfigDefinitions.DefaultCompressionEnvVarName); } else if (configurationType == OtlpExporterOptionsConfigurationType.Logs) { @@ -260,7 +278,8 @@ private void ApplyConfiguration( appendSignalPathToEndpoint: false, OtlpSpecConfigDefinitions.LogsProtocolEnvVarName, OtlpSpecConfigDefinitions.LogsHeadersEnvVarName, - OtlpSpecConfigDefinitions.LogsTimeoutEnvVarName); + OtlpSpecConfigDefinitions.LogsTimeoutEnvVarName, + OtlpSpecConfigDefinitions.LogsCompressionEnvVarName); } else if (configurationType == OtlpExporterOptionsConfigurationType.Metrics) { @@ -270,7 +289,8 @@ private void ApplyConfiguration( appendSignalPathToEndpoint: false, OtlpSpecConfigDefinitions.MetricsProtocolEnvVarName, OtlpSpecConfigDefinitions.MetricsHeadersEnvVarName, - OtlpSpecConfigDefinitions.MetricsTimeoutEnvVarName); + OtlpSpecConfigDefinitions.MetricsTimeoutEnvVarName, + OtlpSpecConfigDefinitions.MetricsCompressionEnvVarName); } else if (configurationType == OtlpExporterOptionsConfigurationType.Traces) { @@ -280,7 +300,8 @@ private void ApplyConfiguration( appendSignalPathToEndpoint: false, OtlpSpecConfigDefinitions.TracesProtocolEnvVarName, OtlpSpecConfigDefinitions.TracesHeadersEnvVarName, - OtlpSpecConfigDefinitions.TracesTimeoutEnvVarName); + OtlpSpecConfigDefinitions.TracesTimeoutEnvVarName, + OtlpSpecConfigDefinitions.TracesCompressionEnvVarName); } else { diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporter.cs index dbec08c771b..ec95d6727e1 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporter.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporter.cs @@ -1,7 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System.Buffers.Binary; using System.Diagnostics; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer; @@ -78,17 +77,6 @@ public override ExportResult Export(in Batch logRecordBatch) { int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(ref this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch); - if (this.startWritePosition == GrpcStartWritePosition) - { - // Grpc payload consists of 3 parts - // byte 0 - Specifying if the payload is compressed. - // 1-4 byte - Specifies the length of payload in big endian format. - // 5 and above - Protobuf serialized data. - Span data = new Span(this.buffer, 1, 4); - var dataLength = writePosition - GrpcStartWritePosition; - BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); - } - if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition)) { return ExportResult.Failure; diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporter.cs index 1ff7dcc5ae5..ac95ca49934 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporter.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporter.cs @@ -1,7 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System.Buffers.Binary; using System.Diagnostics; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer; @@ -71,17 +70,6 @@ public override ExportResult Export(in Batch metrics) { int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(ref this.buffer, this.startWritePosition, this.Resource, metrics); - if (this.startWritePosition == GrpcStartWritePosition) - { - // Grpc payload consists of 3 parts - // byte 0 - Specifying if the payload is compressed. - // 1-4 byte - Specifies the length of payload in big endian format. - // 5 and above - Protobuf serialized data. - Span data = new Span(this.buffer, 1, 4); - var dataLength = writePosition - GrpcStartWritePosition; - BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); - } - if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition)) { return ExportResult.Failure; diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs index 6f10f51e7b2..92dc0f79bf9 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs @@ -1,7 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System.Buffers.Binary; using System.Diagnostics; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer; @@ -74,17 +73,6 @@ public override ExportResult Export(in Batch activityBatch) { int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(ref this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch); - if (this.startWritePosition == GrpcStartWritePosition) - { - // Grpc payload consists of 3 parts - // byte 0 - Specifying if the payload is compressed. - // 1-4 byte - Specifies the length of payload in big endian format. - // 5 and above - Protobuf serialized data. - Span data = new Span(this.buffer, 1, 4); - var dataLength = writePosition - GrpcStartWritePosition; - BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); - } - if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition)) { return ExportResult.Failure; diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpExporterCompressionTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpExporterCompressionTests.cs new file mode 100644 index 00000000000..fc899835695 --- /dev/null +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpExporterCompressionTests.cs @@ -0,0 +1,140 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Buffers.Binary; +using System.IO.Compression; +using System.Net.Http.Headers; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; +using Xunit; + +#if !NET +using System.Net.Http; +#endif + +namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests.Implementation.ExportClient; + +public class OtlpExporterCompressionTests +{ + [Theory] + [InlineData("")] + [InlineData("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")] + public void SendExportRequest_SendsCorrectContent_Http_NonCompressed(string text) + { + SendExportRequest_Http(OtlpExportCompression.None, text, (requestHeaders, testHttpHandlerContent, buffer) => + { + Assert.DoesNotContain(requestHeaders, h => h.Key == "Content-Encoding"); + Assert.Equal(buffer, testHttpHandlerContent); + }); + } + + [Theory] + [InlineData("")] + [InlineData("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")] + public void SendExportRequest_SendsCorrectContent_Http_Compressed(string text) + { + SendExportRequest_Http(OtlpExportCompression.Gzip, text, (requestHeaders, testHttpHandlerContent, buffer) => + { + Assert.Contains(requestHeaders, h => h.Key == "Content-Encoding" && h.Value.First() == "gzip"); + + Assert.NotNull(testHttpHandlerContent); + var decompressedStream = Decompress(testHttpHandlerContent); + + Assert.Equal(buffer, decompressedStream.ToArray()); + }); + } + + [Theory] + [InlineData("")] + [InlineData("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")] + public void SendExportRequest_SendsCorrectContent_Grpc_NonCompressed(string text) + { + SendExportRequest_Grpc(OtlpExportCompression.None, text, body => body); + } + + [Theory] + [InlineData("")] + [InlineData("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")] + public void SendExportRequest_SendsCorrectContent_Grpc_Compressed(string text) + { + SendExportRequest_Grpc(OtlpExportCompression.Gzip, text, Decompress); + } + + private static void SendExportRequest_Grpc(OtlpExportCompression compression, string text, Func readBody) + { + var payload = System.Text.Encoding.UTF8.GetBytes(text); + + // Arrange + var options = new OtlpExporterOptions + { + Endpoint = new Uri("http://localhost:4317"), + Compression = compression, + }; + + var buffer = new byte[payload.Length + 5]; + Buffer.BlockCopy(payload, 0, buffer, 5, payload.Length); + + using var testGrpcHandler = new TestGrpcMessageHandler(); + using var httpClient = new HttpClient(testGrpcHandler, false); + var exportClient = new OtlpGrpcExportClient(options, httpClient, string.Empty); + + var deadlineUtc = DateTime.UtcNow.AddMilliseconds(httpClient.Timeout.TotalMilliseconds); + + // Act + var result = exportClient.SendExportRequest(buffer, buffer.Length, deadlineUtc); + var httpRequest = testGrpcHandler.HttpRequestMessage; + var requestContent = testGrpcHandler.HttpRequestContent; + + // Assert + Assert.True(result.Success); + Assert.NotNull(httpRequest); + Assert.NotNull(requestContent); + + var compressionFlag = requestContent[0]; + var declaredLength = BinaryPrimitives.ReadUInt32BigEndian(requestContent.AsSpan(1, 4)); + var body = requestContent.AsSpan(5, (int)declaredLength).ToArray(); + + Assert.Equal(compression == OtlpExportCompression.Gzip ? 1 : 0, compressionFlag); + Assert.Equal(body.Length, (int)declaredLength); + + Assert.Equal(payload, readBody(body)); + } + + private static void SendExportRequest_Http(OtlpExportCompression compression, string text, Action assertions) + { + var buffer = System.Text.Encoding.UTF8.GetBytes(text); + + // Arrange + var options = new OtlpExporterOptions + { + Endpoint = new Uri("http://localhost:4317"), + Compression = compression, + }; + + using var testHttpHandler = new TestHttpMessageHandler(); + using var httpClient = new HttpClient(testHttpHandler, false); + var exportClient = new OtlpHttpExportClient(options, httpClient, string.Empty); + + var deadlineUtc = DateTime.UtcNow.AddMilliseconds(httpClient.Timeout.TotalMilliseconds); + + // Act + var result = exportClient.SendExportRequest(buffer, buffer.Length, deadlineUtc); + var httpRequest = testHttpHandler.HttpRequestMessage; + + // Assert + Assert.True(result.Success); + Assert.NotNull(httpRequest); + Assert.Equal(HttpMethod.Post, httpRequest.Method); + Assert.NotNull(httpRequest.Content); + + assertions(httpRequest.Content.Headers, testHttpHandler.HttpRequestContent, buffer); + } + + private static byte[] Decompress(byte[] data) + { + using var compressedStream = new MemoryStream(data); + using var gzipStream = new GZipStream(compressedStream, CompressionMode.Decompress); + using var decompressedStream = new MemoryStream(); + gzipStream.CopyTo(decompressedStream); + return decompressedStream.ToArray(); + } +} diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExporterOptionsTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExporterOptionsTests.cs index 9536d283cf5..ff42f1251ba 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExporterOptionsTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExporterOptionsTests.cs @@ -100,6 +100,7 @@ public void OtlpExporterOptions_InvalidEnvironmentVariableOverride() ["EndpointWithInvalidValue"] = "invalid", ["TimeoutWithInvalidValue"] = "invalid", ["ProtocolWithInvalidValue"] = "invalid", + ["CompressionWithInvalidValue"] = "invalid", }; var configuration = new ConfigurationBuilder() @@ -114,7 +115,8 @@ public void OtlpExporterOptions_InvalidEnvironmentVariableOverride() appendSignalPathToEndpoint: true, "ProtocolWithInvalidValue", "NoopHeaders", - "TimeoutWithInvalidValue"); + "TimeoutWithInvalidValue", + "CompressionWithInvalidValue"); #if NET462_OR_GREATER || NETSTANDARD2_0 Assert.Equal(new Uri(OtlpExporterOptions.DefaultHttpEndpoint), options.Endpoint); @@ -125,6 +127,7 @@ public void OtlpExporterOptions_InvalidEnvironmentVariableOverride() Assert.Equal(10000, options.TimeoutMilliseconds); Assert.Equal(OtlpExporterOptions.DefaultOtlpExportProtocol, options.Protocol); Assert.Null(options.Headers); + Assert.Equal(OtlpExportCompression.None, options.Compression); } [Fact] @@ -136,6 +139,7 @@ public void OtlpExporterOptions_SetterOverridesEnvironmentVariable() ["Timeout"] = "2000", ["Protocol"] = "grpc", ["Headers"] = "A=2,B=3", + ["Compression"] = "none", }; var configuration = new ConfigurationBuilder() @@ -150,18 +154,21 @@ public void OtlpExporterOptions_SetterOverridesEnvironmentVariable() appendSignalPathToEndpoint: true, "Protocol", "Headers", - "Timeout"); + "Timeout", + "Compression"); options.Endpoint = new Uri("http://localhost:200"); options.Headers = "C=3"; options.TimeoutMilliseconds = 40000; options.Protocol = OtlpExportProtocol.HttpProtobuf; + options.Compression = OtlpExportCompression.Gzip; Assert.Equal(new Uri("http://localhost:200"), options.Endpoint); Assert.Equal("C=3", options.Headers); Assert.Equal(40000, options.TimeoutMilliseconds); Assert.Equal(OtlpExportProtocol.HttpProtobuf, options.Protocol); Assert.False(options.AppendSignalPathToEndpoint); + Assert.Equal(OtlpExportCompression.Gzip, options.Compression); } [Fact] diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestGrpcMessageHandler.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestGrpcMessageHandler.cs new file mode 100644 index 00000000000..e56a4d8cd57 --- /dev/null +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestGrpcMessageHandler.cs @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#if !NET +using System.Net.Http; +using System.Net.Http.Headers; +#endif + +namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests; + +internal sealed class TestGrpcMessageHandler : HttpMessageHandler +{ + public HttpRequestMessage? HttpRequestMessage { get; private set; } + + public byte[]? HttpRequestContent { get; private set; } + + public HttpResponseMessage InternalSend(HttpRequestMessage request, CancellationToken cancellationToken) + { + this.HttpRequestMessage = request; +#if NET + this.HttpRequestContent = request.Content!.ReadAsByteArrayAsync(cancellationToken).Result; +#else + this.HttpRequestContent = request.Content!.ReadAsByteArrayAsync().Result; +#endif + var response = new HttpResponseMessage(System.Net.HttpStatusCode.OK) + { + RequestMessage = request, + }; + +#if NETSTANDARD2_0 || NET462 + const string ResponseTrailersKey = "__ResponseTrailers"; + + if (!response.RequestMessage.Properties.TryGetValue(ResponseTrailersKey, out var value)) + { + value = new CustomResponseTrailers(); + response.RequestMessage.Properties[ResponseTrailersKey] = value; + } + + var trailers = (HttpHeaders)value; + trailers.Add("grpc-status", "0"); +#else + response.TrailingHeaders.Add("grpc-status", "0"); +#endif + + return response; + } + +#if NET + protected override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) + { + return this.InternalSend(request, cancellationToken); + } +#endif + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + return Task.FromResult(this.InternalSend(request, cancellationToken)); + } + +#if NETSTANDARD2_0 || NET462 + private sealed class CustomResponseTrailers : HttpHeaders; +#endif +}