Skip to content

Commit c865b1b

Browse files
authored
Add helper methods to IApiWebRequest for serializing JSON directly to a Stream (#8019)
## Summary of changes Adds `PostAsJson<T>` method to `IApiWebRequest` ## Reason for change Currently, if we want to send JSON, we serialize it to a string locally, convert the string to utf-8 bytes, potentially compress those bytes, and then copy that to the stream. Doing that as efficiently as we can is somewhat tricky, and we haven't always got it right. By creating a central method, and writing directly to the underlying stream where we can, we can potentially see efficiency gains, and can also potentially make it easier to move to a more modern serializer later. ## Implementation details - Added `IApiRequest.PostAsJsonAsync<T>(T payload, MultipartCompression compression)` (and an overload that accepts json settings) - Implemented the method as a "push stream" approach in each of the three implementations we currently have (`ApiRequest`, `HttpClient`, `HttpStream`) - Benchmarked the implementation to confirm no regressions (see below) ## Test coverage Added unit tests by specifically serializing telemetry data, and confirming we get the correct results when we deserialize the other end (telemetry is one of the candidates for using this approach). When running benchmarks, it became apparent that we had a serious regression in our allocations when we added GZIP-ing of our telemetry 😅 I didn't investigate the root cause, because switching to the new approach (in #8017) will resolve the issue anyway. Overall conclusion: - In general, the new approach allocates slightly less than before - We have a big allocation and speed regression in GZip (specifically for telemetry), which the new approach will resolve completely - In general, the new approach allocates the same whether you use gzip or not - Throughput is roughly the same both before and after **`ApiWebRequest` (.NET FX)** | Method | Mean | Allocated | Alloc Ratio | | ------------------------- | -------: | --------: | ----------: | | ApiWebRequest_Before_Gzip | 6.460 ms | 572.44 KB | 1.00 | | ApiWebRequest_After_Gzip | 2.037 ms | 20.75 KB | 0.04 | | | | | | | ApiWebRequest_Before | 1.949 ms | 22.34 KB | 1.00 | | ApiWebRequest_After | 1.908 ms | 20.75 KB | 0.93 | **`HttpClientRequest` (.NET Core 3.1, .NET 6)** - had to re-enable keep-alive to avoid connection exhaustion! | Method | Runtime | Mean | Allocated | Alloc Ratio | | ---------------------- | ------------- | ---------: | --------: | ----------: | | HttpClient_Before_Gzip | .NET 6.0 | 4,980.1 us | 406.27 KB | 0.98 | | HttpClient_After_Gzip | .NET 6.0 | 161.5 us | 12.04 KB | 0.03 | | HttpClient_Before_Gzip | .NET Core 3.1 | 4,847.4 us | 414.43 KB | 1.00 | | HttpClient_After_Gzip | .NET Core 3.1 | 166.3 us | 12.97 KB | 0.03 | | | | | | | | HttpClient_Before | .NET 6.0 | 129.2 us | 13.03 KB | 0.91 | | HttpClient_After | .NET 6.0 | 154.9 us | 12.05 KB | 0.84 | | HttpClient_Before | .NET Core 3.1 | 162.2 us | 14.27 KB | 1.00 | | HttpClient_After | .NET Core 3.1 | 189.6 us | 13.2 KB | 0.92 | **`HttpStreamRequest` over UDS (.NET Core 3.1, .NET 6)** | Method | Runtime | Mean | Allocated | Alloc Ratio | | ---------------------- | ------------- | ---------: | --------: | ----------: | | HttpStream_Before_Gzip | .NET 6.0 | 5,362.8 us | 440.87 KB | 0.99 | | HttpStream_After_Gzip | .NET 6.0 | 444.8 us | 42.63 KB | 0.10 | | HttpStream_Before_Gzip | .NET Core 3.1 | 5,421.3 us | 446.78 KB | 1.00 | | HttpStream_After_Gzip | .NET Core 3.1 | 462.6 us | 42.77 KB | 0.10 | | | | | | | | HttpStream_Before | .NET 6.0 | 428.7 us | 43.73 KB | 1.01 | | HttpStream_After | .NET 6.0 | 433.3 us | 42.3 KB | 0.97 | | HttpStream_Before | .NET Core 3.1 | 445.3 us | 43.5 KB | 1.00 | | HttpStream_After | .NET Core 3.1 | 448.5 us | 42.62 KB | 0.98 | **`SocketHandlerRequest` over UDS (..NET 6)** | Method | Mean | Allocated | Alloc Ratio | | ------------------------- | ----------: | --------: | ----------: | | SocketHandler_Before_Gzip | 5,070.65 us | 406.26 KB | 1.00 | | SocketHandler_After_Gzip | 97.66 us | 12.28 KB | 0.03 | | | | | | | SocketHandler_Before | 53.95 us | 13.01 KB | 1.00 | | SocketHandler_After | 87.64 us | 12.28 KB | 0.94 | <details><summary>Benchmark used (approx)</summary> ```csharp using System; using System.IO; using System.IO.Compression; using System.Net; using System.Text; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Configs; using Datadog.Trace; using Datadog.Trace.Agent; using Datadog.Trace.Agent.StreamFactories; using Datadog.Trace.Agent.Transports; using Datadog.Trace.Configuration; using Datadog.Trace.DogStatsd; using Datadog.Trace.HttpOverStreams; using Datadog.Trace.Tagging; using Datadog.Trace.Telemetry; using Datadog.Trace.Telemetry.Transports; using Datadog.Trace.Util; using Datadog.Trace.Vendors.Newtonsoft.Json; using Datadog.Trace.Vendors.Newtonsoft.Json.Serialization; namespace Benchmarks.Trace; [MemoryDiagnoser] [GroupBenchmarksBy(BenchmarkLogicalGroupRule.ByCategory)] [CategoriesColumn] public class TelemetryHttpClientBenchmark { private const string BaseUrl = "http://localhost:5035"; private const string Socket = @"C:\repos\temp\temp74\bin\Release\net10.0\test.socket"; private TelemetryData _telemetryData; private ApiWebRequestFactory _apiWebRequestFactory; private Uri _apiEndpointUri; #if NETCOREAPP3_1_OR_GREATER private HttpClientRequestFactory _httpClientRequestFactory; private Uri _httpClientEndpointUri; private HttpStreamRequestFactory _httpStreamRequestFactory; private Uri _httpStreamEndpointUri; #endif #if NET5_0_OR_GREATER private SocketHandlerRequestFactory _socketHandlerRequestFactory; private Uri _socketHandlerEndpointUri; #endif [GlobalSetup] public void GlobalSetup() { _telemetryData = GetData(); var config = TracerHelper.DefaultConfig; config.Add(ConfigurationKeys.TraceEnabled, false); var settings = TracerSettings.Create(config); _apiWebRequestFactory = new ApiWebRequestFactory(new Uri(BaseUrl), AgentHttpHeaderNames.MinimalHeaders); _apiEndpointUri = _apiWebRequestFactory.GetEndpoint("/"); #if NETCOREAPP3_1_OR_GREATER _httpClientRequestFactory = new HttpClientRequestFactory(new Uri(BaseUrl), AgentHttpHeaderNames.MinimalHeaders); _httpClientEndpointUri = _httpClientRequestFactory.GetEndpoint("/"); _httpStreamRequestFactory = new HttpStreamRequestFactory( new UnixDomainSocketStreamFactory(Socket), new DatadogHttpClient(new MinimalAgentHeaderHelper()), new Uri(BaseUrl)); _httpStreamEndpointUri = _httpStreamRequestFactory.GetEndpoint("/"); #endif #if NET5_0_OR_GREATER _socketHandlerRequestFactory = new SocketHandlerRequestFactory( new UnixDomainSocketStreamFactory(Socket), AgentHttpHeaderNames.MinimalHeaders, new Uri(BaseUrl)); _socketHandlerEndpointUri = _socketHandlerRequestFactory.GetEndpoint("/"); #endif } [GlobalCleanup] public void GlobalCleanup() { } [BenchmarkCategory("ApiWebRequest", "Uncompressed"), Benchmark(Baseline = true)] public async Task<int> ApiWebRequest_Before() { var request = _apiWebRequestFactory.Create(_apiEndpointUri); var data = SerializeTelemetry(_telemetryData); using var response = await request.PostAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(data)), "application/json", contentEncoding: null).ConfigureAwait(false); return response.StatusCode; } [BenchmarkCategory("ApiWebRequest", "Gzip"), Benchmark(Baseline = true)] public async Task<int> ApiWebRequest_Before_Gzip() { var request = _apiWebRequestFactory.Create(_apiEndpointUri); var data = SerializeTelemetryWithGzip(_telemetryData); using var response = await request.PostAsync(new ArraySegment<byte>(data), "application/json", contentEncoding: "gzip").ConfigureAwait(false); return response.StatusCode; } [BenchmarkCategory("ApiWebRequest", "Uncompressed"), Benchmark] public async Task<int> ApiWebRequest_After() { var request = _apiWebRequestFactory.Create(_apiEndpointUri); using var response = await request.PostAsJsonAsync(request, compression: MultipartCompression.None); return response.StatusCode; } [BenchmarkCategory("ApiWebRequest", "Gzip"), Benchmark] public async Task<int> ApiWebRequest_After_Gzip() { var request = _apiWebRequestFactory.Create(_apiEndpointUri); using var response = await request.PostAsJsonAsync(request, compression: MultipartCompression.None); return response.StatusCode; } #if NETCOREAPP3_1_OR_GREATER [BenchmarkCategory("HttpClient", "Uncompressed"), Benchmark(Baseline = true)] public async Task<int> HttpClient_Before() { var request = _httpClientRequestFactory.Create(_httpClientEndpointUri); var data = SerializeTelemetry(_telemetryData); using var response = await request.PostAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(data)), "application/json", contentEncoding: null).ConfigureAwait(false); return response.StatusCode; } [BenchmarkCategory("HttpClient", "Gzip"), Benchmark(Baseline = true)] public async Task<int> HttpClient_Before_Gzip() { var request = _httpClientRequestFactory.Create(_httpClientEndpointUri); var data = SerializeTelemetryWithGzip(_telemetryData); using var response = await request.PostAsync(new ArraySegment<byte>(data), "application/json", contentEncoding: "gzip").ConfigureAwait(false); return response.StatusCode; } [BenchmarkCategory("HttpClient", "Uncompressed"), Benchmark] public async Task<int> HttpClient_After() { var request = _httpClientRequestFactory.Create(_httpClientEndpointUri); using var response = await request.PostAsJsonAsync(request, compression: MultipartCompression.None); return response.StatusCode; } [BenchmarkCategory("HttpClient", "Gzip"), Benchmark] public async Task<int> HttpClient_After_Gzip() { var request = _httpClientRequestFactory.Create(_httpClientEndpointUri); using var response = await request.PostAsJsonAsync(request, compression: MultipartCompression.None); return response.StatusCode; } #endif #if NETCOREAPP3_1_OR_GREATER [BenchmarkCategory("HttpStream", "Uncompressed"), Benchmark(Baseline = true)] public async Task<int> HttpStream_Before() { var request = _httpStreamRequestFactory.Create(_httpStreamEndpointUri); var data = SerializeTelemetry(_telemetryData); using var response = await request.PostAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(data)), "application/json", contentEncoding: null).ConfigureAwait(false); return response.StatusCode; } [BenchmarkCategory("HttpStream", "Gzip"), Benchmark(Baseline = true)] public async Task<int> HttpStream_Before_Gzip() { var request = _httpStreamRequestFactory.Create(_httpStreamEndpointUri); var data = SerializeTelemetryWithGzip(_telemetryData); using var response = await request.PostAsync(new ArraySegment<byte>(data), "application/json", contentEncoding: "gzip").ConfigureAwait(false); return response.StatusCode; } [BenchmarkCategory("HttpStream", "Uncompressed"), Benchmark] public async Task<int> HttpStream_After() { var request = _httpStreamRequestFactory.Create(_httpStreamEndpointUri); using var response = await request.PostAsJsonAsync(request, compression: MultipartCompression.None); return response.StatusCode; } [BenchmarkCategory("HttpStream", "Gzip"), Benchmark] public async Task<int> HttpStream_After_Gzip() { var request = _httpStreamRequestFactory.Create(_httpStreamEndpointUri); using var response = await request.PostAsJsonAsync(request, compression: MultipartCompression.None); return response.StatusCode; } #endif #if NET5_0_OR_GREATER [BenchmarkCategory("SocketHandler", "Uncompressed"), Benchmark(Baseline = true)] public async Task<int> SocketHandler_Before() { var request = _socketHandlerRequestFactory.Create(_socketHandlerEndpointUri); var data = SerializeTelemetry(_telemetryData); using var response = await request.PostAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(data)), "application/json", contentEncoding: null).ConfigureAwait(false); return response.StatusCode; } [BenchmarkCategory("SocketHandler", "Gzip"), Benchmark(Baseline = true)] public async Task<int> SocketHandler_Before_Gzip() { var request = _socketHandlerRequestFactory.Create(_socketHandlerEndpointUri); var data = SerializeTelemetryWithGzip(_telemetryData); using var response = await request.PostAsync(new ArraySegment<byte>(data), "application/json", contentEncoding: "gzip").ConfigureAwait(false); return response.StatusCode; } [BenchmarkCategory("SocketHandler", "Uncompressed"), Benchmark] public async Task<int> SocketHandler_After() { var request = _socketHandlerRequestFactory.Create(_socketHandlerEndpointUri); using var response = await request.PostAsJsonAsync(request, compression: MultipartCompression.None); return response.StatusCode; } [BenchmarkCategory("SocketHandler", "Gzip"), Benchmark] public async Task<int> SocketHandler_After_Gzip() { var request = _socketHandlerRequestFactory.Create(_socketHandlerEndpointUri); using var response = await request.PostAsJsonAsync(request, compression: MultipartCompression.None); return response.StatusCode; } #endif internal static string SerializeTelemetry<T>(T data) => JsonConvert.SerializeObject(data, Formatting.None, JsonTelemetryTransport.SerializerSettings); internal static byte[] SerializeTelemetryWithGzip<T>(T data) { using var memStream = new MemoryStream(); using (var zipStream = new GZipStream(memStream, CompressionMode.Compress, true)) { using var streamWriter = new StreamWriter(zipStream); using var jsonWriter = new JsonTextWriter(streamWriter); var serializer = new JsonSerializer { NullValueHandling = NullValueHandling.Ignore, ContractResolver = new DefaultContractResolver { NamingStrategy = new SnakeCaseNamingStrategy(), }, Formatting = Formatting.None }; serializer.Serialize(jsonWriter, data); } return memStream.ToArray(); } private TelemetryData GetData() => new TelemetryData( requestType: TelemetryRequestTypes.GenerateMetrics, runtimeId: "20338dfd-f700-4e5c-b3f6-0d470f054ae8", seqId: 5672, tracerTime: 1628099086, application: new ApplicationTelemetryData( serviceName: "myapp", env: "prod", serviceVersion: "1.2.3", tracerVersion: "0.33.1", languageName: "node.js", languageVersion: "14.16.1", runtimeName: "dotnet", runtimeVersion: "7.0.3", commitSha: "testCommitSha", repositoryUrl: "testRepositoryUrl", processTags: "entrypoint.basedir:Users,entrypoint.workdir:Downloads"), host: new HostTelemetryData( hostname: "i-09ecf74c319c49be8", os: "GNU/Linux", architecture: "x86_64") { OsVersion = "ubuntu 18.04.5 LTS (Bionic Beaver)", KernelName = "Linux", KernelRelease = "5.4.0-1037-gcp", KernelVersion = "#40~18.04.1-Ubuntu SMP Fri Feb 5 15:41:35 UTC 2021" }, payload: new GenerateMetricsPayload( new MetricData[] { new( "tracer_init_time", new MetricSeries() { new(1575317847, 2241), new(1575317947, 2352), }, common: true, type: MetricTypeConstants.Count) { Tags = new[] { "org_id: 2", "environment:test" } }, new( "app_sec_initialization_time", new MetricSeries() { new(1575317447, 254), new(1575317547, 643), }, common: false, type: MetricTypeConstants.Gauge) { Namespace = MetricNamespaceConstants.ASM, Interval = 60, }, })); } ``` </details> ## Other details Part of a small stack - #8019 👈 - #8017
1 parent 72be34c commit c865b1b

File tree

8 files changed

+372
-0
lines changed

8 files changed

+372
-0
lines changed

tracer/src/Datadog.Trace/Agent/IApiRequest.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.IO;
88
using System.Threading.Tasks;
99
using Datadog.Trace.Agent.Transports;
10+
using Datadog.Trace.Vendors.Newtonsoft.Json;
1011

1112
namespace Datadog.Trace.Agent
1213
{
@@ -20,6 +21,10 @@ internal interface IApiRequest
2021

2122
Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType, string contentEncoding);
2223

24+
Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression);
25+
26+
Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings);
27+
2328
Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary);
2429

2530
Task<IApiResponse> PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None);

tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
using System.Threading.Tasks;
1212
using Datadog.Trace.Logging;
1313
using Datadog.Trace.Util;
14+
using Datadog.Trace.Vendors.Newtonsoft.Json;
15+
using Datadog.Trace.Vendors.Serilog.Events;
1416
using static Datadog.Trace.HttpOverStreams.DatadogHttpValues;
1517

1618
namespace Datadog.Trace.Agent.Transports
@@ -58,6 +60,42 @@ public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string conte
5860
return await FinishAndGetResponse().ConfigureAwait(false);
5961
}
6062

63+
public Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression)
64+
=> PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings);
65+
66+
public async Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings)
67+
{
68+
var contentEncoding = compression == MultipartCompression.GZip ? "gzip" : null;
69+
if (Log.IsEnabled(LogEventLevel.Debug))
70+
{
71+
Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, contentEncoding ?? "none");
72+
}
73+
74+
ResetRequest(method: "POST", contentType: MimeTypes.Json, contentEncoding: contentEncoding);
75+
76+
using (var reqStream = await _request.GetRequestStreamAsync().ConfigureAwait(false))
77+
{
78+
// wrap in gzip if requested
79+
using Stream gzip = (compression == MultipartCompression.GZip
80+
? new GZipStream(reqStream, CompressionMode.Compress, leaveOpen: true)
81+
: null);
82+
var streamToWriteTo = gzip ?? reqStream;
83+
84+
using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true);
85+
using var jsonWriter = new JsonTextWriter(streamWriter)
86+
{
87+
CloseOutput = false
88+
};
89+
var serializer = JsonSerializer.Create(settings);
90+
serializer.Serialize(jsonWriter, payload);
91+
await streamWriter.FlushAsync().ConfigureAwait(false);
92+
await streamToWriteTo.FlushAsync().ConfigureAwait(false);
93+
await reqStream.FlushAsync().ConfigureAwait(false);
94+
}
95+
96+
return await FinishAndGetResponse().ConfigureAwait(false);
97+
}
98+
6199
public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
62100
{
63101
ResetRequest(method: "POST", ContentTypeHelper.GetContentType(contentType, multipartBoundary), contentEncoding);

tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@
66
#if NETCOREAPP
77
using System;
88
using System.IO;
9+
using System.IO.Compression;
910
using System.Net.Http;
1011
using System.Net.Http.Headers;
1112
using System.Threading.Tasks;
1213
using Datadog.Trace.AppSec;
1314
using Datadog.Trace.HttpOverStreams;
1415
using Datadog.Trace.Logging;
16+
using Datadog.Trace.Util;
17+
using Datadog.Trace.Vendors.Newtonsoft.Json;
18+
using Datadog.Trace.Vendors.Serilog.Events;
1519

1620
namespace Datadog.Trace.Agent.Transports
1721
{
@@ -67,6 +71,50 @@ public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string conte
6771
}
6872
}
6973

74+
public Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression)
75+
=> PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings);
76+
77+
public async Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings)
78+
{
79+
if (Log.IsEnabled(LogEventLevel.Debug))
80+
{
81+
Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, compression == MultipartCompression.GZip ? "gzip" : "none");
82+
}
83+
84+
using var content = new PushStreamContent(stream => WriteAsJson(stream, payload, settings, compression));
85+
content.Headers.ContentType = new MediaTypeHeaderValue(MimeTypes.Json);
86+
87+
if (compression == MultipartCompression.GZip)
88+
{
89+
content.Headers.ContentEncoding.Add("gzip");
90+
}
91+
92+
_postRequest.Content = content;
93+
94+
var response = await _client.SendAsync(_postRequest).ConfigureAwait(false);
95+
return new HttpClientResponse(response);
96+
97+
static async Task WriteAsJson(Stream requestStream, T payload, JsonSerializerSettings serializationSettings, MultipartCompression compression)
98+
{
99+
// wrap in gzip if requested
100+
using Stream gzip = compression == MultipartCompression.GZip
101+
? new GZipStream(requestStream, CompressionMode.Compress, leaveOpen: true)
102+
: null;
103+
var streamToWriteTo = gzip ?? requestStream;
104+
105+
using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true);
106+
using var jsonWriter = new JsonTextWriter(streamWriter)
107+
{
108+
CloseOutput = false
109+
};
110+
var serializer = JsonSerializer.Create(serializationSettings);
111+
serializer.Serialize(jsonWriter, payload);
112+
await streamWriter.FlushAsync().ConfigureAwait(false);
113+
await streamToWriteTo.FlushAsync().ConfigureAwait(false);
114+
await requestStream.FlushAsync().ConfigureAwait(false);
115+
}
116+
}
117+
70118
public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
71119
{
72120
// re-create HttpContent on every retry because some versions of HttpClient always dispose of it, so we can't reuse.

tracer/src/Datadog.Trace/Agent/Transports/HttpStreamRequest.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,22 @@
55

66
using System;
77
using System.IO;
8+
using System.IO.Compression;
89
using System.Net;
910
using System.Threading.Tasks;
1011
using Datadog.Trace.HttpOverStreams;
1112
using Datadog.Trace.HttpOverStreams.HttpContent;
13+
using Datadog.Trace.Logging;
1214
using Datadog.Trace.Util;
15+
using Datadog.Trace.Vendors.Newtonsoft.Json;
16+
using Datadog.Trace.Vendors.Serilog.Events;
1317

1418
namespace Datadog.Trace.Agent.Transports
1519
{
1620
internal sealed class HttpStreamRequest : IApiRequest
1721
{
22+
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<HttpStreamRequest>();
23+
1824
private readonly Uri _uri;
1925
private readonly DatadogHttpClient _client;
2026
private readonly IStreamFactory _streamFactory;
@@ -41,6 +47,48 @@ public Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType
4147
public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType, string contentEncoding)
4248
=> (await SendAsync(WebRequestMethods.Http.Post, contentType, new BufferContent(bytes), contentEncoding, chunkedEncoding: false).ConfigureAwait(false)).Item1;
4349

50+
public Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression)
51+
=> PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings);
52+
53+
public async Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings)
54+
{
55+
var contentEncoding = compression == MultipartCompression.GZip ? "gzip" : null;
56+
if (Log.IsEnabled(LogEventLevel.Debug))
57+
{
58+
Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, contentEncoding ?? "none");
59+
}
60+
61+
var result = await SendAsync(
62+
WebRequestMethods.Http.Post,
63+
contentType: MimeTypes.Json,
64+
content: new HttpOverStreams.HttpContent.PushStreamContent(stream => WriteAsJson(stream, payload, settings, compression)),
65+
contentEncoding: contentEncoding,
66+
chunkedEncoding: true) // must use chunked encoding because push-stream content
67+
.ConfigureAwait(false);
68+
69+
return result.Item1;
70+
71+
static async Task WriteAsJson(Stream requestStream, T payload, JsonSerializerSettings serializationSettings, MultipartCompression compression)
72+
{
73+
// wrap in gzip if requested
74+
using Stream gzip = compression == MultipartCompression.GZip
75+
? new GZipStream(requestStream, CompressionMode.Compress, leaveOpen: true)
76+
: null;
77+
var streamToWriteTo = gzip ?? requestStream;
78+
79+
using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true);
80+
using var jsonWriter = new JsonTextWriter(streamWriter)
81+
{
82+
CloseOutput = false
83+
};
84+
var serializer = JsonSerializer.Create(serializationSettings);
85+
serializer.Serialize(jsonWriter, payload);
86+
await streamWriter.FlushAsync().ConfigureAwait(false);
87+
await streamToWriteTo.FlushAsync().ConfigureAwait(false);
88+
await requestStream.FlushAsync().ConfigureAwait(false);
89+
}
90+
}
91+
4492
public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
4593
=> (await SendAsync(WebRequestMethods.Http.Post, contentType, new HttpOverStreams.HttpContent.PushStreamContent(writeToRequestStream), contentEncoding, chunkedEncoding: true, multipartBoundary).ConfigureAwait(false)).Item1;
4694

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// <copyright file="SerializationHelpers.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using Datadog.Trace.Vendors.Newtonsoft.Json;
9+
using Datadog.Trace.Vendors.Newtonsoft.Json.Serialization;
10+
11+
namespace Datadog.Trace.Agent.Transports;
12+
13+
internal static class SerializationHelpers
14+
{
15+
public static readonly JsonSerializerSettings DefaultJsonSettings = new()
16+
{
17+
NullValueHandling = NullValueHandling.Ignore,
18+
ContractResolver = new DefaultContractResolver
19+
{
20+
NamingStrategy = new SnakeCaseNamingStrategy(),
21+
}
22+
};
23+
}

tracer/test/Datadog.Trace.TestHelpers/TransportHelpers/TestApiRequest.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using System.Threading.Tasks;
1111
using Datadog.Trace.Agent;
1212
using Datadog.Trace.Agent.Transports;
13+
using Datadog.Trace.Vendors.Newtonsoft.Json;
1314

1415
namespace Datadog.Trace.TestHelpers.TransportHelpers;
1516

@@ -64,6 +65,18 @@ public virtual Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string con
6465
return Task.FromResult((IApiResponse)response);
6566
}
6667

68+
public virtual Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression)
69+
=> PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings);
70+
71+
public virtual Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings)
72+
{
73+
var response = new TestApiResponse(_statusCode, _responseContent, _responseContentType);
74+
Responses.Add(response);
75+
ContentType = MimeTypes.Json;
76+
77+
return Task.FromResult((IApiResponse)response);
78+
}
79+
6780
public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
6881
{
6982
using (var ms = new MemoryStream())

0 commit comments

Comments
 (0)