Skip to content

Commit e7dd03d

Browse files
authored
Added Otlp UDP Exporter for Lambda Support (#129)
1 parent ebc7afd commit e7dd03d

File tree

17 files changed

+2425
-38
lines changed

17 files changed

+2425
-38
lines changed

src/AWS.Distro.OpenTelemetry.AutoInstrumentation/AWS.Distro.OpenTelemetry.AutoInstrumentation.csproj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,19 @@
3131
<PackageReference Include="OpenTelemetry.ResourceDetectors.AWS" Version="1.4.0-beta.1" />
3232
<PackageReference Include="OpenTelemetry.Sampler.AWS" Version="0.1.0-alpha.2" />
3333
<PackageReference Include="OpenTelemetry.SemanticConventions" Version="1.0.0-rc9.9" />
34+
<PackageReference Include="Google.Protobuf" Version="3.28.2"/>
35+
<PackageReference Include="Grpc.Tools" Version="2.65.0" PrivateAssets="all" />
3436
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118">
3537
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
3638
<PrivateAssets>all</PrivateAssets>
3739
</PackageReference>
3840
<AdditionalFiles Include="$(MSBuildThisFileDirectory)../../stylecop.json" Link="stylecop.json" />
3941
</ItemGroup>
4042

43+
<ItemGroup>
44+
<Protobuf Include="opentelemetry\**\*.proto" GrpcServices="none" ProtoCompile="true" />
45+
</ItemGroup>
46+
4147
<!-- TODO: Once upstream release is done, move the dependency to use upstream version -->
4248
<ItemGroup>
4349
<ProjectReference Include="../OpenTelemetry.Instrumentation.AWS/OpenTelemetry.Instrumentation.AWS.csproj" PrivateAssets="All"/>

src/AWS.Distro.OpenTelemetry.AutoInstrumentation/GlobalSuppressions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,14 @@
1414
[assembly: SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1600:Elements should be documented", Justification = "Reviewed", Scope = "member", Target = "~M:AWS.Distro.OpenTelemetry.AutoInstrumentation.AttributePropagatingSpanProcessorBuilder.SetPropagationDataKey(System.String)~AWS.Distro.OpenTelemetry.AutoInstrumentation.AttributePropagatingSpanProcessorBuilder")]
1515
[assembly: SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1600:Elements should be documented", Justification = "Reviewed", Scope = "member", Target = "~M:AWS.Distro.OpenTelemetry.AutoInstrumentation.AwsMetricAttributesSpanExporterBuilder.Build~AWS.Distro.OpenTelemetry.AutoInstrumentation.AwsMetricAttributesSpanExporter")]
1616
[assembly: SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1600:Elements should be documented", Justification = "Reviewed", Scope = "member", Target = "~M:AWS.Distro.OpenTelemetry.AutoInstrumentation.AwsMetricAttributesSpanExporterBuilder.Create(OpenTelemetry.BaseExporter{System.Diagnostics.Activity},OpenTelemetry.Resources.Resource)~AWS.Distro.OpenTelemetry.AutoInstrumentation.AwsMetricAttributesSpanExporterBuilder")]
17+
[assembly: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:File may only contain a single type", Justification = "Reviewed", Scope = "type", Target = "~T:ByteStringConverter")]
18+
[assembly: SuppressMessage("Design", "CA1050:Declare types in namespaces", Justification = "Reviewed", Scope = "type", Target = "~T:ByteStringConverter")]
19+
[assembly: SuppressMessage("Design", "CA1050:Declare types in namespaces", Justification = "Reviewed", Scope = "type", Target = "~T:OtlpUdpExporter")]
20+
[assembly: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:File may only contain a single type", Justification = "Reviewed", Scope = "type", Target = "~T:SpanKindConverter")]
21+
[assembly: SuppressMessage("Design", "CA1050:Declare types in namespaces", Justification = "Reviewed", Scope = "type", Target = "~T:SpanKindConverter")]
22+
[assembly: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:File may only contain a single type", Justification = "Reviewed", Scope = "type", Target = "~T:UdpExporter")]
23+
[assembly: SuppressMessage("Design", "CA1050:Declare types in namespaces", Justification = "Reviewed", Scope = "type", Target = "~T:UdpExporter")]
24+
[assembly: SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1011:Closing square brackets should be spaced correctly", Justification = "Reviewed", Scope = "member", Target = "~M:OtlpUdpExporter.Export(OpenTelemetry.Batch{System.Diagnostics.Activity}@)~OpenTelemetry.ExportResult")]
25+
[assembly: SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1011:Closing square brackets should be spaced correctly", Justification = "Reviewed", Scope = "member", Target = "~M:OtlpUdpExporter.SerializeSpans(OpenTelemetry.Batch{System.Diagnostics.Activity})~System.Byte[]")]
1726

1827
// TODO, review these suppressions.
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Diagnostics;
5+
using System.Net.Sockets;
6+
using System.Reflection;
7+
using System.Text;
8+
using Google.Protobuf;
9+
using Microsoft.Extensions.Logging;
10+
using Newtonsoft.Json;
11+
using OpenTelemetry;
12+
using OpenTelemetry.Proto.Collector.Trace.V1;
13+
using OpenTelemetry.Proto.Trace.V1;
14+
using OpenTelemetry.Resources;
15+
using OtlpResource = OpenTelemetry.Proto.Resource.V1;
16+
17+
/// <summary>
18+
/// OTLP UDP Exporter class. This class is used to build an OtlpUdpExporter to registered as in exporter
19+
/// during the instrumentation initialization phase
20+
/// </summary>
21+
public class OtlpUdpExporter : BaseExporter<Activity>
22+
{
23+
private static readonly ILoggerFactory Factory = LoggerFactory.Create(builder => builder.AddConsole());
24+
private static readonly ILogger Logger = Factory.CreateLogger<OtlpUdpExporter>();
25+
26+
private UdpExporter udpExporter;
27+
private string signalPrefix;
28+
private Resource processResource;
29+
30+
/// <summary>
31+
/// Initializes a new instance of the <see cref="OtlpUdpExporter"/> class.
32+
/// </summary>
33+
/// <param name="endpoint">Endpoint to export requests to</param>
34+
/// <param name="signalPrefix">Sampled vs UnSampled signal prefix</param>
35+
/// <param name="processResource">Otel Resource object</param>
36+
public OtlpUdpExporter(Resource processResource, string? endpoint = null, string? signalPrefix = null)
37+
{
38+
endpoint = endpoint ?? UdpExporter.DefaultEndpoint;
39+
this.udpExporter = new UdpExporter(endpoint);
40+
this.signalPrefix = signalPrefix ?? UdpExporter.DefaultFormatOtelTracesBinaryPrefix;
41+
this.processResource = processResource;
42+
}
43+
44+
/// <inheritdoc/>
45+
public override ExportResult Export(in Batch<Activity> batch)
46+
{
47+
byte[]? serializedData = this.SerializeSpans(batch);
48+
if (serializedData == null)
49+
{
50+
return ExportResult.Failure;
51+
}
52+
53+
try
54+
{
55+
this.udpExporter.SendData(serializedData, this.signalPrefix);
56+
return ExportResult.Success;
57+
}
58+
catch (Exception ex)
59+
{
60+
Logger.LogError($"Error exporting spans: {ex.Message}");
61+
return ExportResult.Failure;
62+
}
63+
}
64+
65+
/// <inheritdoc/>
66+
protected override bool OnShutdown(int timeoutMilliseconds)
67+
{
68+
try
69+
{
70+
this.udpExporter.Shutdown();
71+
return true;
72+
}
73+
catch (Exception ex)
74+
{
75+
Logger.LogError($"Error shutting down exporter: {ex.Message}");
76+
return false;
77+
}
78+
}
79+
80+
// Function that uses reflection to call ResourceExtensions.ToOtlpResource function.
81+
// This functions converts from an OpenTelemetry.Resources.Resource to
82+
// OpenTelemetry.Proto.Resource.V1.Resource (protobuf resource to be exported)
83+
private OtlpResource.Resource? ToOtlpResource(Resource processResource)
84+
{
85+
Type? resourceExtensionsType = Type.GetType("OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ResourceExtensions, OpenTelemetry.Exporter.OpenTelemetryProtocol");
86+
87+
if (resourceExtensionsType == null)
88+
{
89+
Logger.LogTrace("ResourceExtensions Type was not found");
90+
return null;
91+
}
92+
93+
MethodInfo? toOtlpResourceMethod = resourceExtensionsType.GetMethod(
94+
"ToOtlpResource",
95+
BindingFlags.Static | BindingFlags.Public,
96+
null,
97+
new[] { typeof(Resource) },
98+
null);
99+
100+
if (toOtlpResourceMethod == null)
101+
{
102+
Logger.LogTrace("ResourceExtensions.ToOtlpResource Method was not found");
103+
return null;
104+
}
105+
106+
var otlpResource = toOtlpResourceMethod.Invoke(null, new object[] { processResource });
107+
108+
if (otlpResource == null)
109+
{
110+
Logger.LogTrace("OtlpResource object cannot be converted from OpenTelemetry.Resources");
111+
return null;
112+
}
113+
114+
// Below is a workaround to casting and works by converting an object into JSON then converting the
115+
// JSON string back into the required object type. The reason casting isn't working is because of different
116+
// assemblies being used. To use the protobuf library, we need to have a local copy of the protobuf assembly.
117+
// Since upstream also has their own copy of the protobuf library, casting is not possible since the complier
118+
// is recognizing them as two different types.
119+
try
120+
{
121+
// ToString method from OpenTelemetry.Proto.Resource.V1.Resource already converts the object into
122+
// Json using the proper converters.
123+
string? otlpResourceJson = otlpResource.ToString();
124+
if (otlpResourceJson == null)
125+
{
126+
Logger.LogTrace("OtlpResource object cannot be converted to JSON");
127+
return null;
128+
}
129+
130+
var otlpResourceConverted = JsonConvert.DeserializeObject<OtlpResource.Resource>(otlpResourceJson);
131+
return otlpResourceConverted;
132+
}
133+
catch (Exception ex)
134+
{
135+
Logger.LogError($"Error converting OtlpResource to/from JSON: {ex.Message}");
136+
return null;
137+
}
138+
}
139+
140+
// Uses reflection to the get the SdkLimitOptions required to invoke the ToOtlpSpan function used in the
141+
// SerializeSpans function below. More information about SdkLimitOptions can be found in this link:
142+
// https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/SdkLimitOptions.cs#L24
143+
private object? GetSdkLimitOptions()
144+
{
145+
Type? sdkLimitOptionsType = Type.GetType("OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.SdkLimitOptions, OpenTelemetry.Exporter.OpenTelemetryProtocol");
146+
147+
if (sdkLimitOptionsType == null)
148+
{
149+
Logger.LogTrace("SdkLimitOptions Type was not found");
150+
return null;
151+
}
152+
153+
// Create an instance of SdkLimitOptions using the default parameterless constructor
154+
object? sdkLimitOptionsInstance = Activator.CreateInstance(sdkLimitOptionsType);
155+
return sdkLimitOptionsInstance;
156+
}
157+
158+
// The SerializeSpans function builds a ExportTraceServiceRequest object by calling private "ToOtlpSpan" function
159+
// using reflection. "ToOtlpSpan" converts an Activity object into an OpenTelemetry.Proto.Trace.V1.Span object.
160+
// With the conversion above, the Activity object is converted to an Otel span object to be exported using the
161+
// UDP exporter. The "ToOtlpSpan" function can be found here:
162+
// https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ActivityExtensions.cs#L136
163+
private byte[]? SerializeSpans(Batch<Activity> batch)
164+
{
165+
Type? activityExtensionsType = Type.GetType("OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ActivityExtensions, OpenTelemetry.Exporter.OpenTelemetryProtocol");
166+
167+
Type? sdkLimitOptionsType = Type.GetType("OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.SdkLimitOptions, OpenTelemetry.Exporter.OpenTelemetryProtocol");
168+
169+
if (sdkLimitOptionsType == null)
170+
{
171+
Logger.LogTrace("SdkLimitOptions Type was not found");
172+
return null;
173+
}
174+
175+
MethodInfo? toOtlpSpanMethod = activityExtensionsType?.GetMethod(
176+
"ToOtlpSpan",
177+
BindingFlags.Static | BindingFlags.NonPublic,
178+
null,
179+
new[] { typeof(Activity), sdkLimitOptionsType },
180+
null);
181+
182+
var request = new ExportTraceServiceRequest();
183+
var sdkLimitOptions = this.GetSdkLimitOptions();
184+
185+
if (sdkLimitOptions == null)
186+
{
187+
Logger.LogTrace("SdkLimitOptions Object was not found/created properly using the default parameterless constructor");
188+
return null;
189+
}
190+
191+
OtlpResource.Resource? otlpResource = this.ToOtlpResource(this.processResource);
192+
193+
// Create a ResourceSpans instance to hold the span and the otlpResource
194+
ResourceSpans resourceSpans = new ResourceSpans
195+
{
196+
Resource = otlpResource,
197+
};
198+
var scopeSpans = new ScopeSpans();
199+
200+
if (toOtlpSpanMethod != null)
201+
{
202+
foreach (var activity in batch)
203+
{
204+
var otlpSpan = toOtlpSpanMethod.Invoke(null, new object[] { activity, sdkLimitOptions });
205+
206+
// The converters below are required since the the JsonConvert.DeserializeObject doesn't
207+
// know how to deserialize a BytesString or SpanKinds from otlp proto json object.
208+
var settings = new JsonSerializerSettings();
209+
settings.Converters.Add(new ByteStringConverter());
210+
settings.Converters.Add(new SpanKindConverter());
211+
212+
// Below is a workaround to casting and works by converting an object into JSON then converting the
213+
// JSON string back into the required object type. The reason casting isn't working is because of different
214+
// assemblies being used. To use the protobuf library, we need to have a local copy of the protobuf assembly.
215+
// Since upstream also has their own copy of the protobuf library, casting is not possible since the complier
216+
// is recognizing them as two different types.
217+
try
218+
{
219+
var otlpSpanJson = otlpSpan?.ToString();
220+
if (otlpSpanJson == null)
221+
{
222+
continue;
223+
}
224+
225+
var otlpSpanConverted = JsonConvert.DeserializeObject<Span>(otlpSpanJson, settings);
226+
scopeSpans.Spans.Add(otlpSpanConverted);
227+
}
228+
catch (Exception ex)
229+
{
230+
Logger.LogError($"Error converting OtlpSpan to/from JSON: {ex.Message}");
231+
}
232+
}
233+
234+
resourceSpans.ScopeSpans.Add(scopeSpans);
235+
request.ResourceSpans.Add(resourceSpans);
236+
}
237+
else
238+
{
239+
Logger.LogTrace("ActivityExtensions.ToOtlpSpan method is not found");
240+
}
241+
242+
return request.ToByteArray();
243+
}
244+
}
245+
246+
internal class UdpExporter
247+
{
248+
internal const string DefaultEndpoint = "127.0.0.1:2000";
249+
internal const string ProtocolHeader = "{\"format\":\"json\",\"version\":1}\n";
250+
internal const string DefaultFormatOtelTracesBinaryPrefix = "T1S";
251+
252+
private static readonly ILoggerFactory Factory = LoggerFactory.Create(builder => builder.AddConsole());
253+
private static readonly ILogger Logger = Factory.CreateLogger<UdpExporter>();
254+
255+
private string endpoint;
256+
private string host;
257+
private int port;
258+
private UdpClient udpClient;
259+
260+
/// <summary>
261+
/// Initializes a new instance of the <see cref="UdpExporter"/> class.
262+
/// </summary>
263+
/// <param name="endpoint">Endpoint to send udp request to</param>
264+
internal UdpExporter(string? endpoint = null)
265+
{
266+
this.endpoint = endpoint ?? DefaultEndpoint;
267+
(this.host, this.port) = this.ParseEndpoint(this.endpoint);
268+
this.udpClient = new UdpClient();
269+
this.udpClient.Client.ReceiveTimeout = 1000; // Optional: Set timeout
270+
}
271+
272+
internal void SendData(byte[] data, string signalFormatPrefix)
273+
{
274+
string base64EncodedString = Convert.ToBase64String(data);
275+
string message = $"{ProtocolHeader}{signalFormatPrefix}{base64EncodedString}";
276+
277+
try
278+
{
279+
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
280+
this.udpClient.Send(messageBytes, messageBytes.Length, this.host, this.port);
281+
}
282+
catch (Exception ex)
283+
{
284+
Logger.LogError($"Error sending UDP data: {ex.Message}");
285+
throw;
286+
}
287+
}
288+
289+
internal void Shutdown()
290+
{
291+
this.udpClient.Close();
292+
}
293+
294+
private (string, int) ParseEndpoint(string endpoint)
295+
{
296+
try
297+
{
298+
var parts = endpoint.Split(':');
299+
if (parts.Length != 2 || !int.TryParse(parts[1], out int port))
300+
{
301+
throw new ArgumentException($"Invalid endpoint: {endpoint}");
302+
}
303+
304+
return (parts[0], port);
305+
}
306+
catch (Exception ex)
307+
{
308+
throw new ArgumentException($"Invalid endpoint: {endpoint}", ex);
309+
}
310+
}
311+
}
312+
313+
internal class ByteStringConverter : JsonConverter<ByteString>
314+
{
315+
/// <inheritdoc/>
316+
public override ByteString? ReadJson(JsonReader reader, Type objectType, ByteString? existingValue, bool hasExistingValue, JsonSerializer serializer)
317+
{
318+
var base64String = (string?)reader.Value;
319+
return ByteString.FromBase64(base64String);
320+
}
321+
322+
/// <inheritdoc/>
323+
public override void WriteJson(JsonWriter writer, ByteString? value, JsonSerializer serializer)
324+
{
325+
writer.WriteValue(value?.ToBase64());
326+
}
327+
}
328+
329+
internal class SpanKindConverter : JsonConverter<Span.Types.SpanKind>
330+
{
331+
/// <inheritdoc/>
332+
public override Span.Types.SpanKind ReadJson(JsonReader reader, Type objectType, Span.Types.SpanKind existingValue, bool hasExistingValue, JsonSerializer serializer)
333+
{
334+
// Handle the string to enum conversion
335+
string? enumString = reader.Value?.ToString();
336+
337+
// Convert the string representation to the corresponding enum value
338+
switch (enumString)
339+
{
340+
case "SPAN_KIND_CLIENT":
341+
return Span.Types.SpanKind.Client;
342+
case "SPAN_KIND_SERVER":
343+
return Span.Types.SpanKind.Server;
344+
case "SPAN_KIND_INTERNAL":
345+
return Span.Types.SpanKind.Internal;
346+
case "SPAN_KIND_PRODUCER":
347+
return Span.Types.SpanKind.Producer;
348+
case "SPAN_KIND_CONSUMER":
349+
return Span.Types.SpanKind.Consumer;
350+
default:
351+
throw new JsonSerializationException($"Unknown SpanKind: {enumString}");
352+
}
353+
}
354+
355+
/// <inheritdoc/>
356+
public override void WriteJson(JsonWriter writer, Span.Types.SpanKind value, JsonSerializer serializer)
357+
{
358+
// Write the string representation of the enum
359+
writer.WriteValue(value.ToString());
360+
}
361+
}

0 commit comments

Comments
 (0)