Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;

namespace KafkaFlow.OpenTelemetry;

internal static class ActivitySourceAccessor
{
internal const string ActivityString = "otel_activity";
internal const string ActivityContextItemKey = "otel_activity";
internal const string MessagingSystemId = "kafka";
internal const string AttributeMessagingOperation = "messaging.operation";
internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key";
internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset";

internal static readonly ActivitySource s_activitySource = new(KafkaFlowInstrumentation.ActivitySourceName, KafkaFlowInstrumentation.Version);

internal static void SetGenericTags(Activity activity, IEnumerable<string> bootstrapServers)
internal static void SetGenericTags(Activity activity)
{
// https://opentelemetry.io/docs/languages/net/libraries/#note-on-versioning
// https://github.com/open-telemetry/opentelemetry-dotnet/blob/core-1.9.0/src/Shared/SemanticConventions.cs
activity?.SetTag("messaging.system", MessagingSystemId);
activity?.SetTag("peer.service", string.Join(",", bootstrapServers ?? Enumerable.Empty<string>()));
}

internal static string FormatMessageKey(object key) => key switch
{
null => null,
byte[] bytes => Encoding.UTF8.GetString(bytes),
_ => key.ToString(),
};
}
16 changes: 16 additions & 0 deletions src/KafkaFlow.OpenTelemetry/AttributeKeys.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace KafkaFlow.OpenTelemetry;

internal static class AttributeKeys
{
public const string ClientId = "messaging.client.id";
public const string ConsumerGroupName = "messaging.consumer.group.name";
public const string DestinationName = "messaging.destination.name";
public const string DestinationPartitionId = "messaging.destination.partition.id";
public const string ErrorType = "error.type";
public const string KafkaMessageKey = "messaging.kafka.message.key";
public const string KafkaMessageTombstone = "messaging.kafka.message.tombstone";
public const string KafkaOffset = "messaging.kafka.offset";
public const string MessageBodySize = "messaging.message.body.size";
public const string OperationName = "messaging.operation.name";
public const string OperationType = "messaging.operation.type";
}
60 changes: 37 additions & 23 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
Expand All @@ -11,10 +11,8 @@ namespace KafkaFlow.OpenTelemetry;

internal static class OpenTelemetryConsumerEventsHandler
{
private const string ReceiveString = "receive";
private const string ProcessString = "process";
private const string AttributeMessagingSourceName = "messaging.source.name";
private const string AttributeMessagingKafkaConsumerGroup = "messaging.kafka.consumer.group";
private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition";
private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;

public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrumentationOptions options)
Expand All @@ -37,11 +35,11 @@ public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrument
activity?.AddBaggage(item.Key, item.Value);
}

context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);
context?.Items.Add(ActivitySourceAccessor.ActivityContextItemKey, activity);

ActivitySourceAccessor.SetGenericTags(activity, context?.Brokers);
ActivitySourceAccessor.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
if (activity is { IsAllDataRequested: true })
{
SetConsumerTags(context, activity);
}
Expand All @@ -58,22 +56,22 @@ public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrument

public static Task OnConsumeCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityContextItemKey, out var value) && value is Activity activity)
{
activity?.Dispose();
activity.Dispose();
}

return Task.CompletedTask;
}

public static Task OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityContextItemKey, out var value) && value is Activity activity)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);

activity?.Dispose();
activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.SetTag(AttributeKeys.ErrorType, ex.GetType().FullName);
activity.AddException(ex);
activity.Dispose();
}

return Task.CompletedTask;
Expand All @@ -86,13 +84,29 @@ private static IEnumerable<string> ExtractTraceContextIntoBasicProperties(IMessa

private static void SetConsumerTags(IMessageContext context, Activity activity)
{
string messageKey = context.Message.Key != null ? Encoding.UTF8.GetString(context.Message.Key as byte[]) : string.Empty;

activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString);
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition);
activity.SetTag(AttributeKeys.OperationType, ReceiveString);
activity.SetTag(AttributeKeys.OperationName, ProcessString);
activity.SetTag(AttributeKeys.DestinationName, context.ConsumerContext.Topic);
activity.SetTag(AttributeKeys.DestinationPartitionId, context.ConsumerContext.Partition.ToString());
activity.SetTag(AttributeKeys.ConsumerGroupName, context.ConsumerContext.GroupId);
activity.SetTag(AttributeKeys.ClientId, context.ConsumerContext.ConsumerName);
activity.SetTag(AttributeKeys.KafkaOffset, context.ConsumerContext.Offset);

var messageKey = ActivitySourceAccessor.FormatMessageKey(context.Message.Key);

if (messageKey != null)
{
activity.SetTag(AttributeKeys.KafkaMessageKey, messageKey);
}

if (context.Message.Value == null)
{
activity.SetTag(AttributeKeys.KafkaMessageTombstone, true);
}

if (context.Message.Value is byte[] body)
{
activity.SetTag(AttributeKeys.MessageBodySize, body.Length);
}
}
}
}
59 changes: 39 additions & 20 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ namespace KafkaFlow.OpenTelemetry;

internal static class OpenTelemetryProducerEventsHandler
{
private const string SendString = "send";
private const string PublishString = "publish";
private const string AttributeMessagingDestinationName = "messaging.destination.name";
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;

public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumentationOptions options)
{
try
{
var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString;
var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context.ProducerContext.Topic} {PublishString}" : PublishString;

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
Expand All @@ -35,7 +34,7 @@ public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumen

if (activity != null)
{
context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);
context?.Items.Add(ActivitySourceAccessor.ActivityContextItemKey, activity);

contextToInject = activity.Context;

Expand All @@ -53,9 +52,9 @@ public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumen
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
s_propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties);

ActivitySourceAccessor.SetGenericTags(activity, context?.Brokers);
ActivitySourceAccessor.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
if (activity is { IsAllDataRequested: true })
{
SetProducerTags(context, activity);
}
Expand All @@ -72,41 +71,61 @@ public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumen

public static Task OnProducerCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityContextItemKey, out var value) && value is Activity activity)
{
activity?.Dispose();
activity.Dispose();
}

return Task.CompletedTask;
}

public static Task OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityContextItemKey, out var value) && value is Activity activity)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);

activity?.Dispose();
activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.SetTag(AttributeKeys.ErrorType, ex.GetType().FullName);
activity.AddException(ex);
activity.Dispose();
}

return Task.CompletedTask;
}

private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value)
{
if (!context.Headers.Any(x => x.Key == key))
if (context.Headers.All(x => x.Key != key))
{
context.Headers.SetString(key, value, Encoding.ASCII);
}
}

private static void SetProducerTags(IMessageContext context, Activity activity)
{
activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString);
activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic);
activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
activity.SetTag(AttributeKeys.OperationType, SendString);
activity.SetTag(AttributeKeys.OperationName, PublishString);
activity.SetTag(AttributeKeys.DestinationName, context?.ProducerContext.Topic);

if (context?.ProducerContext.Partition.HasValue == true)
{
activity.SetTag(AttributeKeys.DestinationPartitionId, context.ProducerContext.Partition.Value.ToString());
}

if (context?.ProducerContext.Offset.HasValue == true)
{
activity.SetTag(AttributeKeys.KafkaOffset, context.ProducerContext.Offset);
}

var messageKey = ActivitySourceAccessor.FormatMessageKey(context?.Message.Key);

if (messageKey != null)
{
activity.SetTag(AttributeKeys.KafkaMessageKey, messageKey);
}

if (context?.Message.Value == null)
{
activity.SetTag(AttributeKeys.KafkaMessageTombstone, true);
}
}
}
}