diff --git a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs index 8acaf9b56..4fa58c266 100644 --- a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs +++ b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs @@ -1,24 +1,26 @@ -using System.Collections.Generic; using System.Diagnostics; -using System.Linq; +using System.Text; namespace KafkaFlow.OpenTelemetry; internal static class ActivitySourceAccessor { - internal const string ActivityString = "otel_activity"; + internal const string ActivityContextItemKey = "otel_activity"; internal const string MessagingSystemId = "kafka"; - internal const string AttributeMessagingOperation = "messaging.operation"; - internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key"; - internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset"; internal static readonly ActivitySource s_activitySource = new(KafkaFlowInstrumentation.ActivitySourceName, KafkaFlowInstrumentation.Version); - internal static void SetGenericTags(Activity activity, IEnumerable bootstrapServers) + internal static void SetGenericTags(Activity activity) { // https://opentelemetry.io/docs/languages/net/libraries/#note-on-versioning // https://github.com/open-telemetry/opentelemetry-dotnet/blob/core-1.9.0/src/Shared/SemanticConventions.cs activity?.SetTag("messaging.system", MessagingSystemId); - activity?.SetTag("peer.service", string.Join(",", bootstrapServers ?? Enumerable.Empty())); } + + internal static string FormatMessageKey(object key) => key switch + { + null => null, + byte[] bytes => Encoding.UTF8.GetString(bytes), + _ => key.ToString(), + }; } diff --git a/src/KafkaFlow.OpenTelemetry/AttributeKeys.cs b/src/KafkaFlow.OpenTelemetry/AttributeKeys.cs new file mode 100644 index 000000000..d18cc6139 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/AttributeKeys.cs @@ -0,0 +1,16 @@ +namespace KafkaFlow.OpenTelemetry; + +internal static class AttributeKeys +{ + public const string ClientId = "messaging.client.id"; + public const string ConsumerGroupName = "messaging.consumer.group.name"; + public const string DestinationName = "messaging.destination.name"; + public const string DestinationPartitionId = "messaging.destination.partition.id"; + public const string ErrorType = "error.type"; + public const string KafkaMessageKey = "messaging.kafka.message.key"; + public const string KafkaMessageTombstone = "messaging.kafka.message.tombstone"; + public const string KafkaOffset = "messaging.kafka.offset"; + public const string MessageBodySize = "messaging.message.body.size"; + public const string OperationName = "messaging.operation.name"; + public const string OperationType = "messaging.operation.type"; +} diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index 8024edfb4..6d4d23b63 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Text; @@ -12,9 +12,6 @@ namespace KafkaFlow.OpenTelemetry; internal static class OpenTelemetryConsumerEventsHandler { private const string ProcessString = "process"; - private const string AttributeMessagingSourceName = "messaging.source.name"; - private const string AttributeMessagingKafkaConsumerGroup = "messaging.kafka.consumer.group"; - private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition"; private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator; public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrumentationOptions options) @@ -37,11 +34,11 @@ public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrument activity?.AddBaggage(item.Key, item.Value); } - context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); + context?.Items.Add(ActivitySourceAccessor.ActivityContextItemKey, activity); - ActivitySourceAccessor.SetGenericTags(activity, context?.Brokers); + ActivitySourceAccessor.SetGenericTags(activity); - if (activity != null && activity.IsAllDataRequested) + if (activity is { IsAllDataRequested: true }) { SetConsumerTags(context, activity); } @@ -58,9 +55,9 @@ public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrument public static Task OnConsumeCompleted(IMessageContext context) { - if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) + if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityContextItemKey, out var value) && value is Activity activity) { - activity?.Dispose(); + activity.Dispose(); } return Task.CompletedTask; @@ -68,12 +65,12 @@ public static Task OnConsumeCompleted(IMessageContext context) public static Task OnConsumeError(IMessageContext context, Exception ex) { - if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) + if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityContextItemKey, out var value) && value is Activity activity) { - activity?.SetStatus(ActivityStatusCode.Error, ex.Message); - activity?.RecordException(ex); - - activity?.Dispose(); + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.SetTag(AttributeKeys.ErrorType, ex.GetType().FullName); + activity.AddException(ex); + activity.Dispose(); } return Task.CompletedTask; @@ -86,13 +83,29 @@ private static IEnumerable ExtractTraceContextIntoBasicProperties(IMessa private static void SetConsumerTags(IMessageContext context, Activity activity) { - string messageKey = context.Message.Key != null ? Encoding.UTF8.GetString(context.Message.Key as byte[]) : string.Empty; - - activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString); - activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic); - activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset); - activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition); + activity.SetTag(AttributeKeys.OperationType, ProcessString); + activity.SetTag(AttributeKeys.OperationName, ProcessString); + activity.SetTag(AttributeKeys.DestinationName, context.ConsumerContext.Topic); + activity.SetTag(AttributeKeys.DestinationPartitionId, context.ConsumerContext.Partition.ToString()); + activity.SetTag(AttributeKeys.ConsumerGroupName, context.ConsumerContext.GroupId); + activity.SetTag(AttributeKeys.ClientId, context.ConsumerContext.ConsumerName); + activity.SetTag(AttributeKeys.KafkaOffset, context.ConsumerContext.Offset); + + var messageKey = ActivitySourceAccessor.FormatMessageKey(context.Message.Key); + + if (messageKey != null) + { + activity.SetTag(AttributeKeys.KafkaMessageKey, messageKey); + } + + if (context.Message.Value == null) + { + activity.SetTag(AttributeKeys.KafkaMessageTombstone, true); + } + + if (context.Message.Value is byte[] body) + { + activity.SetTag(AttributeKeys.MessageBodySize, body.Length); + } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index 5be369695..7867e3a6a 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -11,16 +11,15 @@ namespace KafkaFlow.OpenTelemetry; internal static class OpenTelemetryProducerEventsHandler { + private const string SendString = "send"; private const string PublishString = "publish"; - private const string AttributeMessagingDestinationName = "messaging.destination.name"; - private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator; public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumentationOptions options) { try { - var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString; + var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context.ProducerContext.Topic} {PublishString}" : PublishString; // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. @@ -35,7 +34,7 @@ public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumen if (activity != null) { - context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); + context?.Items.Add(ActivitySourceAccessor.ActivityContextItemKey, activity); contextToInject = activity.Context; @@ -53,9 +52,9 @@ public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumen // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. s_propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties); - ActivitySourceAccessor.SetGenericTags(activity, context?.Brokers); + ActivitySourceAccessor.SetGenericTags(activity); - if (activity != null && activity.IsAllDataRequested) + if (activity is { IsAllDataRequested: true }) { SetProducerTags(context, activity); } @@ -72,9 +71,9 @@ public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumen public static Task OnProducerCompleted(IMessageContext context) { - if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) + if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityContextItemKey, out var value) && value is Activity activity) { - activity?.Dispose(); + activity.Dispose(); } return Task.CompletedTask; @@ -82,12 +81,12 @@ public static Task OnProducerCompleted(IMessageContext context) public static Task OnProducerError(IMessageContext context, Exception ex) { - if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) + if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityContextItemKey, out var value) && value is Activity activity) { - activity?.SetStatus(ActivityStatusCode.Error, ex.Message); - activity?.RecordException(ex); - - activity?.Dispose(); + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.SetTag(AttributeKeys.ErrorType, ex.GetType().FullName); + activity.AddException(ex); + activity.Dispose(); } return Task.CompletedTask; @@ -95,7 +94,7 @@ public static Task OnProducerError(IMessageContext context, Exception ex) private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value) { - if (!context.Headers.Any(x => x.Key == key)) + if (context.Headers.All(x => x.Key != key)) { context.Headers.SetString(key, value, Encoding.ASCII); } @@ -103,10 +102,30 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex private static void SetProducerTags(IMessageContext context, Activity activity) { - activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString); - activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic); - activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset); + activity.SetTag(AttributeKeys.OperationType, SendString); + activity.SetTag(AttributeKeys.OperationName, PublishString); + activity.SetTag(AttributeKeys.DestinationName, context?.ProducerContext.Topic); + + if (context?.ProducerContext.Partition.HasValue == true) + { + activity.SetTag(AttributeKeys.DestinationPartitionId, context.ProducerContext.Partition.Value.ToString()); + } + + if (context?.ProducerContext.Offset.HasValue == true) + { + activity.SetTag(AttributeKeys.KafkaOffset, context.ProducerContext.Offset); + } + + var messageKey = ActivitySourceAccessor.FormatMessageKey(context?.Message.Key); + + if (messageKey != null) + { + activity.SetTag(AttributeKeys.KafkaMessageKey, messageKey); + } + + if (context?.Message.Value == null) + { + activity.SetTag(AttributeKeys.KafkaMessageTombstone, true); + } } -} \ No newline at end of file +}