Skip to content

Commit c1c54f1

Browse files
YayBurritoslukebakken
authored andcommitted
Pass basic props to RabbitMQActivitySource.BasicPublish to allow messageid tag to be included in trace span
updated tests for messageid tag corrected broken tests after bad conflict resolution from merging main into feature branch
1 parent 44d8be3 commit c1c54f1

File tree

5 files changed

+48
-197
lines changed

5 files changed

+48
-197
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
6161
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6262

6363
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
64-
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
64+
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length, basicProperties)
6565
: default;
6666

6767
ulong publishSequenceNumber = 0;
@@ -116,7 +116,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
116116
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
117117

118118
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
119-
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
119+
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length, basicProperties)
120120
: default;
121121

122122
ulong publishSequenceNumber = 0;

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public static bool UseRoutingKeyAsOperationName
6666
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
6767
};
6868

69-
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
69+
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, IReadOnlyBasicProperties basicProperties,
7070
ActivityContext linkedContext = default)
7171
{
7272
if (!s_publisherSource.HasListeners())
@@ -83,7 +83,7 @@ public static bool UseRoutingKeyAsOperationName
8383
ActivityKind.Producer, linkedContext);
8484
if (activity != null && activity.IsAllDataRequested)
8585
{
86-
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
86+
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, basicProperties, bodySize, activity);
8787
}
8888

8989
return activity;

projects/Test/SequentialIntegration/TestActivitySource.cs

Lines changed: 22 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -75,120 +75,6 @@ void AssertIntTagGreaterThanZero(Activity activity, string name)
7575
Assert.True(activity.GetTagItem(name) is int result && result > 0);
7676
}
7777

78-
[Theory]
79-
[InlineData(true, true)]
80-
[InlineData(true, false)]
81-
[InlineData(false, true)]
82-
[InlineData(false, false)]
83-
public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
84-
{
85-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
86-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
87-
var _activities = new List<Activity>();
88-
using ActivityListener activityListener = StartActivityListener(_activities);
89-
await Task.Delay(500);
90-
string queueName = $"{Guid.NewGuid()}";
91-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
92-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
93-
byte[] consumeBody = null;
94-
var consumer = new AsyncEventingBasicConsumer(_channel);
95-
var consumerReceivedTcs =
96-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
97-
consumer.ReceivedAsync += (o, a) =>
98-
{
99-
consumeBody = a.Body.ToArray();
100-
consumerReceivedTcs.SetResult(true);
101-
return Task.CompletedTask;
102-
};
103-
104-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
105-
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
106-
107-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
108-
Assert.True(await consumerReceivedTcs.Task);
109-
110-
await _channel.BasicCancelAsync(consumerTag);
111-
await Task.Delay(500);
112-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
113-
}
114-
115-
[Theory]
116-
[InlineData(true, true)]
117-
[InlineData(true, false)]
118-
[InlineData(false, true)]
119-
[InlineData(false, false)]
120-
public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
121-
{
122-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
123-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
124-
var _activities = new List<Activity>();
125-
using ActivityListener activityListener = StartActivityListener(_activities);
126-
await Task.Delay(500);
127-
string queueName = $"{Guid.NewGuid()}";
128-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
129-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
130-
byte[] consumeBody = null;
131-
var consumer = new AsyncEventingBasicConsumer(_channel);
132-
var consumerReceivedTcs =
133-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
134-
consumer.ReceivedAsync += (o, a) =>
135-
{
136-
consumeBody = a.Body.ToArray();
137-
consumerReceivedTcs.SetResult(true);
138-
return Task.CompletedTask;
139-
};
140-
141-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
142-
CachedString exchange = new CachedString("");
143-
CachedString routingKey = new CachedString(q.QueueName);
144-
await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody);
145-
146-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
147-
Assert.True(await consumerReceivedTcs.Task);
148-
149-
await _channel.BasicCancelAsync(consumerTag);
150-
await Task.Delay(500);
151-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
152-
}
153-
154-
[Theory]
155-
[InlineData(true, true)]
156-
[InlineData(true, false)]
157-
[InlineData(false, true)]
158-
[InlineData(false, false)]
159-
public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
160-
{
161-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
162-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
163-
var _activities = new List<Activity>();
164-
using ActivityListener activityListener = StartActivityListener(_activities);
165-
await Task.Delay(500);
166-
string queueName = $"{Guid.NewGuid()}";
167-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
168-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
169-
byte[] consumeBody = null;
170-
var consumer = new AsyncEventingBasicConsumer(_channel);
171-
var consumerReceivedTcs =
172-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
173-
consumer.ReceivedAsync += (o, a) =>
174-
{
175-
consumeBody = a.Body.ToArray();
176-
consumerReceivedTcs.SetResult(true);
177-
return Task.CompletedTask;
178-
};
179-
180-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
181-
PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName);
182-
await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
183-
184-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
185-
Assert.True(await consumerReceivedTcs.Task);
186-
187-
await _channel.BasicCancelAsync(consumerTag);
188-
await Task.Delay(500);
189-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
190-
}
191-
19278
[Theory]
19379
[InlineData(true, true)]
19480
[InlineData(true, false)]
@@ -307,11 +193,15 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
307193
}
308194

309195
[Theory]
310-
[InlineData(true, true)]
311-
[InlineData(true, false)]
312-
[InlineData(false, true)]
313-
[InlineData(false, false)]
314-
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
196+
[InlineData(true, true, true)]
197+
[InlineData(true, true, false)]
198+
[InlineData(true, false, true)]
199+
[InlineData(true, false, false)]
200+
[InlineData(false, true, true)]
201+
[InlineData(false, true, false)]
202+
[InlineData(false, false, true)]
203+
[InlineData(false, false, false)]
204+
public async Task TestPublisherAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId)
315205
{
316206
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
317207
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -321,18 +211,20 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
321211
string queue = $"queue-{Guid.NewGuid()}";
322212
const string msg = "for basic.get";
323213

214+
var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties();
215+
324216
try
325217
{
326218
await _channel.QueueDeclareAsync(queue, false, false, false, null);
327-
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
219+
await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg));
328220
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
329221
Assert.Equal(1u, ok.MessageCount);
330222
BasicGetResult res = await _channel.BasicGetAsync(queue, true);
331223
Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray()));
332224
ok = await _channel.QueueDeclarePassiveAsync(queue);
333225
Assert.Equal(0u, ok.MessageCount);
334226
await Task.Delay(500);
335-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false);
227+
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false, basicProps.MessageId);
336228
}
337229
finally
338230
{
@@ -345,7 +237,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
345237
[InlineData(true, false)]
346238
[InlineData(false, true)]
347239
[InlineData(false, false)]
348-
public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
240+
public async Task TestPublisherWithCachedStringsAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
349241
{
350242
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
351243
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -381,7 +273,7 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use
381273
[InlineData(true, false)]
382274
[InlineData(false, true)]
383275
[InlineData(false, false)]
384-
public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
276+
public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
385277
{
386278
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
387279
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -427,7 +319,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
427319
}
428320

429321
private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName,
430-
List<Activity> activityList, bool isDeliver = false)
322+
List<Activity> activityList, bool isDeliver = false, string messageId = null)
431323
{
432324
string childName = isDeliver ? "deliver" : "fetch";
433325
Activity[] activities = activityList.ToArray();
@@ -480,6 +372,12 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli
480372
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
481373
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
482374
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
375+
376+
if (messageId is not null)
377+
{
378+
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId);
379+
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId);
380+
}
483381
}
484382
}
485383
}

projects/Test/SequentialIntegration/TestHeartbeats.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public override Task InitializeAsync()
5959

6060
[SkippableFact(Timeout = 35000)]
6161
[Trait("Category", "LongRunning")]
62-
public async Task TestThatHeartbeatWriterUsesConfigurableInterval()
62+
public async Task TestThatHeartbeatWriterUsesConfigurableIntervalAsync()
6363
{
6464
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
6565

@@ -72,7 +72,7 @@ public async Task TestThatHeartbeatWriterUsesConfigurableInterval()
7272

7373
[SkippableFact]
7474
[Trait("Category", "LongRunning")]
75-
public async Task TestThatHeartbeatWriterWithTLSEnabled()
75+
public async Task TestThatHeartbeatWriterWithTLSEnabledAsync()
7676
{
7777
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
7878

@@ -94,7 +94,7 @@ public async Task TestThatHeartbeatWriterWithTLSEnabled()
9494

9595
[SkippableFact(Timeout = 90000)]
9696
[Trait("Category", "LongRunning")]
97-
public async Task TestHundredsOfConnectionsWithRandomHeartbeatInterval()
97+
public async Task TestHundredsOfConnectionsWithRandomHeartbeatIntervalAsync()
9898
{
9999
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
100100

0 commit comments

Comments
 (0)