Skip to content

Commit 18c5a26

Browse files
authored
[Instrumentation.ConfluentKafka] Add named instrumentation support (open-telemetry#2074)
1 parent 95b0372 commit 18c5a26

File tree

10 files changed

+287
-116
lines changed

10 files changed

+287
-116
lines changed

src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@ static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.TryExtractPropagatio
1717
static Confluent.Kafka.OpenTelemetryProducerBuilderExtensions.AsInstrumentedProducerBuilder<TKey, TValue>(this Confluent.Kafka.ProducerBuilder<TKey, TValue>! producerBuilder) -> Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>!
1818
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
1919
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>! consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
20+
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name) -> OpenTelemetry.Metrics.MeterProviderBuilder!
2021
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>? consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
2122
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
2223
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>! producerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
24+
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name) -> OpenTelemetry.Metrics.MeterProviderBuilder!
2325
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>? producerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
2426
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder!
2527
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>! consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
28+
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name) -> OpenTelemetry.Trace.TracerProviderBuilder!
2629
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, Confluent.Kafka.InstrumentedConsumerBuilder<TKey, TValue>? consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
2730
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder!
2831
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>! producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
32+
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name) -> OpenTelemetry.Trace.TracerProviderBuilder!
2933
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, Confluent.Kafka.InstrumentedProducerBuilder<TKey, TValue>? producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
3034
virtual Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>.Invoke(Confluent.Kafka.ConsumeResult<TKey, TValue>! consumeResult, System.Diagnostics.Activity? activity, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask

src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
## Unreleased
44

5+
- Add named instrumentation support
6+
([#2074](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2074))
7+
58
## 0.1.0-alpha.1
69

710
Released 2024-Sep-16
811

9-
* Initial release
12+
- Initial release

src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
using System.Diagnostics;
54
using OpenTelemetry.Instrumentation.ConfluentKafka;
65

76
namespace Confluent.Kafka;
@@ -13,6 +12,8 @@ namespace Confluent.Kafka;
1312
/// <typeparam name="TValue">Type of value.</typeparam>
1413
public sealed class InstrumentedConsumerBuilder<TKey, TValue> : ConsumerBuilder<TKey, TValue>
1514
{
15+
private readonly ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options = new();
16+
1617
/// <summary>
1718
/// Initializes a new instance of the <see cref="InstrumentedConsumerBuilder{TKey, TValue}"/> class.
1819
/// </summary>
@@ -22,19 +23,27 @@ public InstrumentedConsumerBuilder(IEnumerable<KeyValuePair<string, string>> con
2223
{
2324
}
2425

25-
internal ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>? Options { get; set; }
26+
internal bool EnableMetrics
27+
{
28+
get => this.options.Metrics;
29+
set => this.options.Metrics = value;
30+
}
31+
32+
internal bool EnableTraces
33+
{
34+
get => this.options.Traces;
35+
set => this.options.Traces = value;
36+
}
2637

2738
/// <summary>
2839
/// Build a new IConsumer instance.
2940
/// </summary>
3041
/// <returns>an <see cref="IProducer{TKey,TValue}"/>.</returns>
3142
public override IConsumer<TKey, TValue> Build()
3243
{
33-
Debug.Assert(this.Options != null, "Options should not be null.");
34-
3544
ConsumerConfig config = (ConsumerConfig)this.Config;
3645

37-
var consumer = new InstrumentedConsumer<TKey, TValue>(base.Build(), this.Options!);
46+
var consumer = new InstrumentedConsumer<TKey, TValue>(base.Build(), this.options);
3847
consumer.GroupId = config.GroupId;
3948

4049
return consumer;

src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ public InstrumentedProducer(
2727

2828
public string Name => this.producer.Name;
2929

30-
internal ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> Options => this.options;
31-
3230
public int AddBrokers(string brokers)
3331
{
3432
return this.producer.AddBrokers(brokers);
@@ -326,6 +324,11 @@ private static void RecordPublish(TopicPartition topicPartition, TimeSpan durati
326324

327325
private Activity? StartPublishActivity(DateTimeOffset start, string topic, Message<TKey, TValue> message, int? partition = null)
328326
{
327+
if (!this.options.Traces)
328+
{
329+
return null;
330+
}
331+
329332
var spanName = string.Concat(topic, " ", ConfluentKafkaCommon.PublishOperationName);
330333
var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(name: spanName, kind: ActivityKind.Producer, startTime: start);
331334
if (activity == null)

src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
using System.Diagnostics;
54
using OpenTelemetry.Instrumentation.ConfluentKafka;
65

76
namespace Confluent.Kafka;
@@ -13,6 +12,8 @@ namespace Confluent.Kafka;
1312
/// <typeparam name="TValue">Type of value.</typeparam>
1413
public sealed class InstrumentedProducerBuilder<TKey, TValue> : ProducerBuilder<TKey, TValue>
1514
{
15+
private readonly ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> options = new();
16+
1617
/// <summary>
1718
/// Initializes a new instance of the <see cref="InstrumentedProducerBuilder{TKey, TValue}"/> class.
1819
/// </summary>
@@ -22,16 +23,24 @@ public InstrumentedProducerBuilder(IEnumerable<KeyValuePair<string, string>> con
2223
{
2324
}
2425

25-
internal ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>? Options { get; set; }
26+
internal bool EnableMetrics
27+
{
28+
get => this.options.Metrics;
29+
set => this.options.Metrics = value;
30+
}
31+
32+
internal bool EnableTraces
33+
{
34+
get => this.options.Traces;
35+
set => this.options.Traces = value;
36+
}
2637

2738
/// <summary>
2839
/// Build a new IProducer instance.
2940
/// </summary>
3041
/// <returns>an <see cref="IProducer{TKey,TValue}"/>.</returns>
3142
public override IProducer<TKey, TValue> Build()
3243
{
33-
Debug.Assert(this.Options != null, "Options should not be null.");
34-
35-
return new InstrumentedProducer<TKey, TValue>(base.Build(), this.Options!);
44+
return new InstrumentedProducer<TKey, TValue>(base.Build(), this.options);
3645
}
3746
}

src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Consumer.cs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using Confluent.Kafka;
55
using Microsoft.Extensions.DependencyInjection;
6-
using Microsoft.Extensions.Options;
76
using OpenTelemetry.Instrumentation.ConfluentKafka;
87
using OpenTelemetry.Internal;
98

@@ -25,6 +24,18 @@ public static MeterProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>
2524
this MeterProviderBuilder builder)
2625
=> AddKafkaConsumerInstrumentation<TKey, TValue>(builder, name: null, consumerBuilder: null);
2726

27+
/// <summary>
28+
/// Enables automatic data collection of outgoing requests to Kafka.
29+
/// </summary>
30+
/// <typeparam name="TKey">The type of the key.</typeparam>
31+
/// <typeparam name="TValue">The type of the value.</typeparam>
32+
/// <param name="builder"><see cref="MeterProviderBuilder"/> being configured.</param>
33+
/// <param name="name">The name of the instrumentation.</param>
34+
/// <returns>The instance of <see cref="MeterProviderBuilder"/> to chain the calls.</returns>
35+
public static MeterProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>(
36+
this MeterProviderBuilder builder, string? name)
37+
=> AddKafkaConsumerInstrumentation<TKey, TValue>(builder, name: name, consumerBuilder: null);
38+
2839
/// <summary>
2940
/// Enables automatic data collection of outgoing requests to Kafka.
3041
/// </summary>
@@ -58,34 +69,21 @@ public static MeterProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>
5869
{
5970
Guard.ThrowIfNull(builder);
6071

61-
name ??= Options.DefaultName;
62-
63-
builder.ConfigureServices(services =>
64-
{
65-
services.Configure<ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>>(name, EnableMetrics);
66-
});
67-
6872
return builder
6973
.AddMeter(ConfluentKafkaCommon.InstrumentationName)
7074
.AddInstrumentation(sp =>
7175
{
72-
if (consumerBuilder == null)
76+
if (name == null)
7377
{
74-
consumerBuilder = sp.GetRequiredService<InstrumentedConsumerBuilder<TKey, TValue>>();
75-
var options = sp.GetRequiredService<IOptionsMonitor<ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>>>();
76-
consumerBuilder.Options = options.Get(name);
78+
consumerBuilder ??= sp.GetRequiredService<InstrumentedConsumerBuilder<TKey, TValue>>();
7779
}
78-
79-
if (consumerBuilder.Options == null)
80+
else
8081
{
81-
consumerBuilder.Options = new ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>();
82-
EnableMetrics(consumerBuilder.Options);
82+
consumerBuilder ??= sp.GetRequiredKeyedService<InstrumentedConsumerBuilder<TKey, TValue>>(name);
8383
}
8484

85+
consumerBuilder.EnableMetrics = true;
8586
return new ConfluentKafkaConsumerInstrumentation<TKey, TValue>(consumerBuilder);
8687
});
8788
}
88-
89-
private static void EnableMetrics<TKey, TValue>(ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options) =>
90-
options.Metrics = true;
9189
}

src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Producer.cs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using Confluent.Kafka;
55
using Microsoft.Extensions.DependencyInjection;
6-
using Microsoft.Extensions.Options;
76
using OpenTelemetry.Instrumentation.ConfluentKafka;
87
using OpenTelemetry.Internal;
98

@@ -25,6 +24,18 @@ public static MeterProviderBuilder AddKafkaProducerInstrumentation<TKey, TValue>
2524
this MeterProviderBuilder builder)
2625
=> AddKafkaProducerInstrumentation<TKey, TValue>(builder, name: null, producerBuilder: null);
2726

27+
/// <summary>
28+
/// Enables automatic data collection of outgoing requests to Kafka.
29+
/// </summary>
30+
/// <typeparam name="TKey">The type of the key.</typeparam>
31+
/// <typeparam name="TValue">The type of the value.</typeparam>
32+
/// <param name="builder"><see cref="MeterProviderBuilder"/> being configured.</param>
33+
/// <param name="name">The name of the instrumentation.</param>
34+
/// <returns>The instance of <see cref="MeterProviderBuilder"/> to chain the calls.</returns>
35+
public static MeterProviderBuilder AddKafkaProducerInstrumentation<TKey, TValue>(
36+
this MeterProviderBuilder builder, string? name)
37+
=> AddKafkaProducerInstrumentation<TKey, TValue>(builder, name: name, producerBuilder: null);
38+
2839
/// <summary>
2940
/// Enables automatic data collection of outgoing requests to Kafka.
3041
/// </summary>
@@ -58,34 +69,21 @@ public static MeterProviderBuilder AddKafkaProducerInstrumentation<TKey, TValue>
5869
{
5970
Guard.ThrowIfNull(builder);
6071

61-
name ??= Options.DefaultName;
62-
63-
builder.ConfigureServices(services =>
64-
{
65-
services.Configure<ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>>(name, EnableMetrics);
66-
});
67-
6872
return builder
6973
.AddMeter(ConfluentKafkaCommon.InstrumentationName)
7074
.AddInstrumentation(sp =>
7175
{
72-
if (producerBuilder == null)
76+
if (name == null)
7377
{
74-
producerBuilder = sp.GetRequiredService<InstrumentedProducerBuilder<TKey, TValue>>();
75-
var options = sp.GetRequiredService<IOptionsMonitor<ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>>>();
76-
producerBuilder.Options = options.Get(name);
78+
producerBuilder ??= sp.GetRequiredService<InstrumentedProducerBuilder<TKey, TValue>>();
7779
}
78-
79-
if (producerBuilder.Options == null)
80+
else
8081
{
81-
producerBuilder.Options = new ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>();
82-
EnableMetrics(producerBuilder.Options);
82+
producerBuilder ??= sp.GetRequiredKeyedService<InstrumentedProducerBuilder<TKey, TValue>>(name);
8383
}
8484

85+
producerBuilder.EnableMetrics = true;
8586
return new ConfluentKafkaProducerInstrumentation<TKey, TValue>(producerBuilder);
8687
});
8788
}
88-
89-
private static void EnableMetrics<TKey, TValue>(ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> options) =>
90-
options.Metrics = true;
9189
}

src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using Confluent.Kafka;
55
using Microsoft.Extensions.DependencyInjection;
6-
using Microsoft.Extensions.Options;
76
using OpenTelemetry.Instrumentation.ConfluentKafka;
87
using OpenTelemetry.Internal;
98

@@ -25,6 +24,18 @@ public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue
2524
this TracerProviderBuilder builder)
2625
=> AddKafkaConsumerInstrumentation<TKey, TValue>(builder, name: null, consumerBuilder: null);
2726

27+
/// <summary>
28+
/// Enables automatic data collection of outgoing requests to Kafka.
29+
/// </summary>
30+
/// <typeparam name="TKey">The type of the key.</typeparam>
31+
/// <typeparam name="TValue">The type of the value.</typeparam>
32+
/// <param name="builder"><see cref="TracerProviderBuilder"/> being configured.</param>
33+
/// <param name="name">The name of the instrumentation.</param>
34+
/// <returns>The instance of <see cref="TracerProviderBuilder"/> to chain the calls.</returns>
35+
public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>(
36+
this TracerProviderBuilder builder, string? name)
37+
=> AddKafkaConsumerInstrumentation<TKey, TValue>(builder, name: name, consumerBuilder: null);
38+
2839
/// <summary>
2940
/// Enables automatic data collection of outgoing requests to Kafka.
3041
/// </summary>
@@ -48,7 +59,7 @@ public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue
4859
/// <typeparam name="TKey">The type of the key.</typeparam>
4960
/// <typeparam name="TValue">The type of the value.</typeparam>
5061
/// <param name="builder"><see cref="TracerProviderBuilder"/> being configured.</param>
51-
/// <param name="name">Optional name which is used when retrieving options.</param>
62+
/// <param name="name">The name of the instrumentation.</param>
5263
/// <param name="consumerBuilder">Optional <see cref="InstrumentedConsumerBuilder{TKey, TValue}"/> to instrument.</param>
5364
/// <returns>The instance of <see cref="TracerProviderBuilder"/> to chain the calls.</returns>
5465
public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue>(
@@ -58,34 +69,21 @@ public static TracerProviderBuilder AddKafkaConsumerInstrumentation<TKey, TValue
5869
{
5970
Guard.ThrowIfNull(builder);
6071

61-
name ??= Options.DefaultName;
62-
63-
builder.ConfigureServices(services =>
64-
{
65-
services.Configure<ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>>(name, EnableTracing);
66-
});
67-
6872
return builder
6973
.AddSource(ConfluentKafkaCommon.InstrumentationName)
7074
.AddInstrumentation(sp =>
7175
{
72-
if (consumerBuilder == null)
76+
if (name == null)
7377
{
74-
consumerBuilder = sp.GetRequiredService<InstrumentedConsumerBuilder<TKey, TValue>>();
75-
var options = sp.GetRequiredService<IOptionsMonitor<ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>>>();
76-
consumerBuilder.Options = options.Get(name);
78+
consumerBuilder ??= sp.GetRequiredService<InstrumentedConsumerBuilder<TKey, TValue>>();
7779
}
78-
79-
if (consumerBuilder.Options == null)
80+
else
8081
{
81-
consumerBuilder.Options = new ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>();
82-
EnableTracing(consumerBuilder.Options);
82+
consumerBuilder ??= sp.GetRequiredKeyedService<InstrumentedConsumerBuilder<TKey, TValue>>(name);
8383
}
8484

85+
consumerBuilder.EnableTraces = true;
8586
return new ConfluentKafkaConsumerInstrumentation<TKey, TValue>(consumerBuilder);
8687
});
8788
}
88-
89-
private static void EnableTracing<TKey, TValue>(ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options) =>
90-
options.Traces = true;
9189
}

0 commit comments

Comments
 (0)