Skip to content

Commit 99ae240

Browse files
committed
Allow parent/child linking in opentelemetry spans
1 parent 75822ae commit 99ae240

File tree

6 files changed

+131
-40
lines changed

6 files changed

+131
-40
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
~static OpenTelemetry.Trace.OpenTelemetryExtensions.AddRabbitMQInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action<RabbitMQ.Client.RabbitMQOpenTelemetryOptions> configure) -> OpenTelemetry.Trace.TracerProviderBuilder

projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,27 @@
88

99
namespace OpenTelemetry.Trace
1010
{
11+
1112
public static class OpenTelemetryExtensions
1213
{
13-
public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder)
14+
public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder, Action<RabbitMQOpenTelemetryOptions> configure)
1415
{
16+
var options = new RabbitMQOpenTelemetryOptions();
17+
configure?.Invoke(options);
18+
RabbitMQActivitySource.LinkType = options.LinkType;
19+
RabbitMQActivitySource.UseRoutingKeyAsOperationName = options.UseRoutingKeyAsOperationName;
20+
1521
RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor;
1622
RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector;
1723
builder.AddSource("RabbitMQ.Client.*");
1824
return builder;
1925
}
2026

27+
public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder)
28+
{
29+
return AddRabbitMQInstrumentation(builder, null);
30+
}
31+
2132
private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props)
2233
{
2334
// Extract the PropagationContext of the upstream parent from the message headers.

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public static class RabbitMQActivitySource
5252
DefaultContextExtractor;
5353

5454
public static bool UseRoutingKeyAsOperationName { get; set; } = true;
55+
public static OpenTelemetryLinkType LinkType { get; set; } = OpenTelemetryLinkType.AlwaysLink;
5556
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();
5657

5758
internal static readonly IEnumerable<KeyValuePair<string, object?>> CreationTags = new[]
@@ -115,9 +116,16 @@ public static class RabbitMQActivitySource
115116
}
116117

117118
// Extract the PropagationContext of the upstream parent from the message headers.
119+
ActivityContext linkedContext = LinkType == OpenTelemetryLinkType.AlwaysLink
120+
? ContextExtractor(readOnlyBasicProperties) : default;
121+
ActivityContext parentContext = LinkType == OpenTelemetryLinkType.AlwaysParentChild
122+
? ContextExtractor(readOnlyBasicProperties) : default;
123+
118124
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
119125
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer,
120-
ContextExtractor(readOnlyBasicProperties));
126+
linkedContext, parentContext);
127+
128+
121129
if (activity != null && activity.IsAllDataRequested)
122130
{
123131
PopulateMessagingTags(MessagingOperationTypeReceive, MessagingOperationNameBasicGet, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
@@ -128,21 +136,26 @@ public static class RabbitMQActivitySource
128136
}
129137

130138
internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag,
131-
IReadOnlyBasicProperties basicProperties, int bodySize)
139+
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
132140
{
133141
if (!s_subscriberSource.HasListeners())
134142
{
135143
return null;
136144
}
137145

138146
// Extract the PropagationContext of the upstream parent from the message headers.
147+
ActivityContext linkedContext = LinkType == OpenTelemetryLinkType.AlwaysLink
148+
? ContextExtractor(readOnlyBasicProperties) : default;
149+
ActivityContext parentContext = LinkType == OpenTelemetryLinkType.AlwaysParentChild
150+
? ContextExtractor(readOnlyBasicProperties) : default;
151+
139152
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
140153
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver,
141-
ActivityKind.Consumer, ContextExtractor(basicProperties));
154+
ActivityKind.Consumer, linkedContext, parentContext);
142155
if (activity != null && activity.IsAllDataRequested)
143156
{
144157
PopulateMessagingTags(MessagingOperationTypeProcess, MessagingOperationNameBasicDeliver, routingKey, exchange,
145-
deliveryTag, basicProperties, bodySize, activity);
158+
deliveryTag, readOnlyBasicProperties, bodySize, activity);
146159
}
147160

148161
return activity;
@@ -157,8 +170,13 @@ public static class RabbitMQActivitySource
157170
private static Activity? StartLinkedRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
158171
ActivityContext linkedContext = default, ActivityContext parentContext = default)
159172
{
173+
var links = new List<ActivityLink>();
174+
if (linkedContext != default)
175+
{
176+
links.Add(new ActivityLink(linkedContext));
177+
}
160178
return source.CreateActivity(name, kind, parentContext: parentContext,
161-
links: new[] { new ActivityLink(linkedContext) }, idFormat: ActivityIdFormat.W3C,
179+
links: links, idFormat: ActivityIdFormat.W3C,
162180
tags: CreationTags)
163181
?.Start();
164182
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
namespace RabbitMQ.Client
2+
{
3+
public enum OpenTelemetryLinkType
4+
{
5+
AlwaysLink,
6+
AlwaysParentChild
7+
}
8+
9+
public class RabbitMQOpenTelemetryOptions
10+
{
11+
public bool UseRoutingKeyAsOperationName { get; set; }
12+
public OpenTelemetryLinkType LinkType { get; set; } = OpenTelemetryLinkType.AlwaysLink;
13+
}
14+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
RabbitMQ.Client.OpenTelemetryLinkType
2+
RabbitMQ.Client.OpenTelemetryLinkType.AlwaysLink = 0 -> RabbitMQ.Client.OpenTelemetryLinkType
3+
RabbitMQ.Client.OpenTelemetryLinkType.AlwaysParentChild = 1 -> RabbitMQ.Client.OpenTelemetryLinkType
4+
RabbitMQ.Client.RabbitMQOpenTelemetryOptions
5+
RabbitMQ.Client.RabbitMQOpenTelemetryOptions.LinkType.get -> RabbitMQ.Client.OpenTelemetryLinkType
6+
RabbitMQ.Client.RabbitMQOpenTelemetryOptions.LinkType.set -> void
7+
RabbitMQ.Client.RabbitMQOpenTelemetryOptions.RabbitMQOpenTelemetryOptions() -> void
8+
RabbitMQ.Client.RabbitMQOpenTelemetryOptions.UseRoutingKeyAsOperationName.get -> bool
9+
RabbitMQ.Client.RabbitMQOpenTelemetryOptions.UseRoutingKeyAsOperationName.set -> void
10+
static RabbitMQ.Client.RabbitMQActivitySource.LinkType.get -> RabbitMQ.Client.OpenTelemetryLinkType
11+
static RabbitMQ.Client.RabbitMQActivitySource.LinkType.set -> void

projects/Test/SequentialIntegration/TestOpenTelemetry.cs

Lines changed: 70 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,25 @@ void AssertIntTagGreaterThanZero(Activity activity, string name)
8383
}
8484

8585
[Theory]
86-
[InlineData(true)]
87-
[InlineData(false)]
88-
public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName)
86+
[InlineData(true, true)]
87+
[InlineData(true, false)]
88+
[InlineData(false, true)]
89+
[InlineData(false, false)]
90+
public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool useParentChildLinking)
8991
{
9092
var exportedItems = new List<Activity>();
9193
using var tracer = Sdk.CreateTracerProviderBuilder()
92-
.AddRabbitMQInstrumentation()
94+
.AddRabbitMQInstrumentation(options =>
95+
{
96+
options.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
97+
options.LinkType = useParentChildLinking ? OpenTelemetryLinkType.AlwaysParentChild : OpenTelemetryLinkType.AlwaysLink;
98+
})
9399
.AddInMemoryExporter(exportedItems)
94100
.Build();
95101
string baggageGuid = Guid.NewGuid().ToString();
96102
Baggage.SetBaggage("TestItem", baggageGuid);
97103
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
98104

99-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
100105
await Task.Delay(500);
101106
string queueName = $"{Guid.NewGuid()}";
102107
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
@@ -132,24 +137,29 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera
132137

133138
await _channel.BasicCancelAsync(consumerTag);
134139
await Task.Delay(500);
135-
AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
140+
AssertActivityData(useRoutingKeyAsOperationName, useParentChildLinking, queueName, exportedItems, true);
136141
}
137142

138143
[Theory]
139-
[InlineData(true)]
140-
[InlineData(false)]
141-
public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
144+
[InlineData(true, true)]
145+
[InlineData(true, false)]
146+
[InlineData(false, true)]
147+
[InlineData(false, false)]
148+
public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName, bool useParentChildLinking)
142149
{
143150
var exportedItems = new List<Activity>();
144151
using var tracer = Sdk.CreateTracerProviderBuilder()
145-
.AddRabbitMQInstrumentation()
152+
.AddRabbitMQInstrumentation(options =>
153+
{
154+
options.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
155+
options.LinkType = useParentChildLinking ? OpenTelemetryLinkType.AlwaysParentChild : OpenTelemetryLinkType.AlwaysLink;
156+
})
146157
.AddInMemoryExporter(exportedItems)
147158
.Build();
148159
string baggageGuid = Guid.NewGuid().ToString();
149160
Baggage.SetBaggage("TestItem", baggageGuid);
150161
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
151162

152-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
153163
await Task.Delay(500);
154164

155165
string queueName = $"{Guid.NewGuid()}";
@@ -186,24 +196,29 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs
186196

187197
await _channel.BasicCancelAsync(consumerTag);
188198
await Task.Delay(500);
189-
AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
199+
AssertActivityData(useRoutingKeyAsOperationName, useParentChildLinking, queueName, exportedItems, true);
190200
}
191201

192202
[Theory]
193-
[InlineData(true)]
194-
[InlineData(false)]
195-
public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
203+
[InlineData(true, true)]
204+
[InlineData(true, false)]
205+
[InlineData(false, true)]
206+
[InlineData(false, false)]
207+
public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName, bool useParentChildLinking)
196208
{
197209
var exportedItems = new List<Activity>();
198210
using var tracer = Sdk.CreateTracerProviderBuilder()
199-
.AddRabbitMQInstrumentation()
211+
.AddRabbitMQInstrumentation(options =>
212+
{
213+
options.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
214+
options.LinkType = useParentChildLinking ? OpenTelemetryLinkType.AlwaysParentChild : OpenTelemetryLinkType.AlwaysLink;
215+
})
200216
.AddInMemoryExporter(exportedItems)
201217
.Build();
202218
string baggageGuid = Guid.NewGuid().ToString();
203219
Baggage.SetBaggage("TestItem", baggageGuid);
204220
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
205221

206-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
207222
await Task.Delay(500);
208223

209224
string queueName = $"{Guid.NewGuid()}";
@@ -241,24 +256,29 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
241256

242257
await _channel.BasicCancelAsync(consumerTag);
243258
await Task.Delay(500);
244-
AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
259+
AssertActivityData(useRoutingKeyAsOperationName, useParentChildLinking, queueName, exportedItems, true);
245260
}
246261

247262
[Theory]
248-
[InlineData(true)]
249-
[InlineData(false)]
250-
public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
263+
[InlineData(true, true)]
264+
[InlineData(true, false)]
265+
[InlineData(false, true)]
266+
[InlineData(false, false)]
267+
public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName, bool useParentChildLinking)
251268
{
252269
var exportedItems = new List<Activity>();
253270
using var tracer = Sdk.CreateTracerProviderBuilder()
254-
.AddRabbitMQInstrumentation()
271+
.AddRabbitMQInstrumentation(options =>
272+
{
273+
options.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
274+
options.LinkType = useParentChildLinking ? OpenTelemetryLinkType.AlwaysParentChild : OpenTelemetryLinkType.AlwaysLink;
275+
})
255276
.AddInMemoryExporter(exportedItems)
256277
.Build();
257278
string baggageGuid = Guid.NewGuid().ToString();
258279
Baggage.SetBaggage("TestItem", baggageGuid);
259280
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
260281

261-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
262282
await Task.Delay(500);
263283

264284
string queueName = $"{Guid.NewGuid()}";
@@ -297,23 +317,28 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo
297317

298318
await _channel.BasicCancelAsync(consumerTag);
299319
await Task.Delay(500);
300-
AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
320+
AssertActivityData(useRoutingKeyAsOperationName, useParentChildLinking, queueName, exportedItems, true);
301321
}
302322

303323
[Theory]
304-
[InlineData(true)]
305-
[InlineData(false)]
306-
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
324+
[InlineData(true, true)]
325+
[InlineData(true, false)]
326+
[InlineData(false, true)]
327+
[InlineData(false, false)]
328+
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool useParentChildLinking)
307329
{
308330
var exportedItems = new List<Activity>();
309331
using var tracer = Sdk.CreateTracerProviderBuilder()
310-
.AddRabbitMQInstrumentation()
332+
.AddRabbitMQInstrumentation(options =>
333+
{
334+
options.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
335+
options.LinkType = useParentChildLinking ? OpenTelemetryLinkType.AlwaysParentChild : OpenTelemetryLinkType.AlwaysLink;
336+
})
311337
.AddInMemoryExporter(exportedItems)
312338
.Build();
313339
string baggageGuid = Guid.NewGuid().ToString();
314340
Baggage.SetBaggage("TestItem", baggageGuid);
315341
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
316-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
317342
await Task.Delay(500);
318343
string queue = $"queue-{Guid.NewGuid()}";
319344
const string msg = "for basic.get";
@@ -331,15 +356,15 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
331356
ok = await _channel.QueueDeclarePassiveAsync(queue);
332357
Assert.Equal(0u, ok.MessageCount);
333358
await Task.Delay(500);
334-
AssertActivityData(useRoutingKeyAsOperationName, queue, exportedItems, false);
359+
AssertActivityData(useRoutingKeyAsOperationName, useParentChildLinking, queue, exportedItems, false);
335360
}
336361
finally
337362
{
338363
await _channel.QueueDeleteAsync(queue);
339364
}
340365
}
341366

342-
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
367+
private void AssertActivityData(bool useRoutingKeyAsOperationName, bool useParentChildLinking, string queueName,
343368
List<Activity> activityList, bool isDeliver = false, string baggageGuid = null)
344369
{
345370
string childName = isDeliver ? "deliver" : "fetch";
@@ -359,11 +384,21 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
359384
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
360385
routingKeyTag == $"{queueName}");
361386
Activity receiveActivity = activities.Single(x =>
362-
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
363-
x.Links.First().Context.TraceId == sendActivity.TraceId);
387+
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName));
364388
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
365389
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
366-
Assert.Null(receiveActivity.ParentId);
390+
if (useParentChildLinking)
391+
{
392+
Assert.Empty(receiveActivity.Links);
393+
Assert.Equal(sendActivity.Id, receiveActivity.ParentId);
394+
Assert.Equal(sendActivity.TraceId, receiveActivity.TraceId);
395+
}
396+
else
397+
{
398+
Assert.Equal(sendActivity.TraceId, receiveActivity.Links.Single().Context.TraceId);
399+
Assert.Null(receiveActivity.ParentId);
400+
Assert.NotEqual(sendActivity.TraceId, receiveActivity.TraceId);
401+
}
367402
AssertStringTagNotNullOrEmpty(sendActivity, "network.peer.address");
368403
AssertStringTagNotNullOrEmpty(sendActivity, "network.local.address");
369404
AssertStringTagNotNullOrEmpty(sendActivity, "server.address");
@@ -385,6 +420,7 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
385420
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationName, childName);
386421
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send");
387422
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish");
423+
388424
}
389425
}
390426
}

0 commit comments

Comments
 (0)