Skip to content

Commit 3987a55

Browse files
committed
Add diagnostics
1 parent 52c1f00 commit 3987a55

File tree

4 files changed

+68
-13
lines changed

4 files changed

+68
-13
lines changed

src/Microsoft.Azure.ServiceBus/Core/Batch.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public Batch(ulong maximumBatchSize, Func<Message, Task<Message>> pluginsCallbac
3838
this.pluginsCallback = pluginsCallback;
3939
this.datas = new List<Data>();
4040
this.result = AmqpMessage.Create(datas);
41+
OriginalMessageList = new List<Message>();
4142
}
4243

4344
/// <summary>
@@ -66,6 +67,7 @@ public async Task<bool> TryAdd(Message message)
6667

6768
if (Size <= maximumBatchSize)
6869
{
70+
OriginalMessageList.Add(message);
6971
return true;
7072
}
7173

@@ -127,8 +129,8 @@ public void Dispose()
127129
firstMessage?.Dispose();
128130
result?.Dispose();
129131

130-
firstMessage = null;
131-
result = null;
132+
datas.Clear();
133+
OriginalMessageList.Clear();
132134
}
133135

134136
private void ThrowIfDisposed()
@@ -139,6 +141,8 @@ private void ThrowIfDisposed()
139141
}
140142
}
141143

144+
internal List<Message> OriginalMessageList { get; }
145+
142146
private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}";
143147
}
144148
}

src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -280,33 +280,31 @@ public async Task SendAsync(Batch batch)
280280

281281
MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length);
282282

283-
// var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
284-
// var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null;
283+
var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
284+
var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(batch.OriginalMessageList) : null;
285285
Task sendTask = null;
286286

287287
try
288288
{
289-
//var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false);
290-
291289
sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout);
292290
await sendTask.ConfigureAwait(false);
293291
}
294292
catch (Exception exception)
295293
{
296-
// if (isDiagnosticSourceEnabled)
297-
// {
298-
// this.diagnosticSource.ReportException(exception);
299-
// }
294+
if (isDiagnosticSourceEnabled)
295+
{
296+
this.diagnosticSource.ReportException(exception);
297+
}
300298

301299
MessagingEventSource.Log.MessageSendException(this.ClientId, exception);
302300
throw;
303301
}
304302
finally
305303
{
306-
// this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status);
304+
this.diagnosticSource.SendStop(activity, batch.OriginalMessageList, sendTask?.Status);
307305
}
308306

309-
// MessagingEventSource.Log.MessageSendStop(this.ClientId);
307+
MessagingEventSource.Log.MessageSendStop(this.ClientId);
310308
}
311309

312310
/// <summary>

test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,40 @@ async Task BatchSendReceiveFireEvents()
309309
Assert.True(this.events.IsEmpty);
310310
}
311311

312+
[Fact]
313+
[DisplayTestMethodName]
314+
async Task BatchSendWithBatchReceiveFireEvents()
315+
{
316+
this.queueClient = new QueueClient(TestUtility.NamespaceConnectionString, TestConstants.NonPartitionedQueueName,
317+
ReceiveMode.ReceiveAndDelete);
318+
319+
this.listener.Enable( (name, queuName, arg) => name.Contains("Send") || name.Contains("Receive") );
320+
await TestUtility.SendMessagesUsingBatchAsync(this.queueClient.InnerSender, 5);
321+
var messages = await TestUtility.ReceiveMessagesAsync(this.queueClient.InnerReceiver, 5);
322+
323+
Assert.True(this.events.TryDequeue(out var sendStart1));
324+
AssertSendStart(sendStart1.eventName, sendStart1.payload, sendStart1.activity, null, 5);
325+
326+
Assert.True(this.events.TryDequeue(out var sendStop1));
327+
AssertSendStop(sendStop1.eventName, sendStop1.payload, sendStop1.activity, sendStop1.activity, 5);
328+
329+
int receivedStopCount = 0;
330+
string relatedTo = "";
331+
while (this.events.TryDequeue(out var receiveStart))
332+
{
333+
var startCount = AssertReceiveStart(receiveStart.eventName, receiveStart.payload, receiveStart.activity, -1);
334+
335+
Assert.True(this.events.TryDequeue(out var receiveStop));
336+
receivedStopCount += AssertReceiveStop(receiveStop.eventName, receiveStop.payload, receiveStop.activity, receiveStart.activity, null, startCount, -1);
337+
relatedTo += receiveStop.activity.Tags.Single(t => t.Key == "RelatedTo").Value;
338+
}
339+
340+
Assert.Equal(5, receivedStopCount);
341+
Assert.Contains(sendStart1.activity.Id, relatedTo);
342+
343+
Assert.True(this.events.IsEmpty);
344+
}
345+
312346
[Fact]
313347
[DisplayTestMethodName]
314348
async Task PeekFireEvents()

test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,25 @@ internal static async Task SendMessagesAsync(IMessageSender messageSender, int m
6666
await messageSender.SendAsync(messagesToSend);
6767
Log($"Sent {messageCount} messages");
6868
}
69+
internal static async Task SendMessagesUsingBatchAsync(IMessageSender messageSender, int messageCount)
70+
{
71+
if (messageCount == 0)
72+
{
73+
await Task.FromResult(false);
74+
}
75+
76+
var sender = (MessageSender)messageSender;
77+
var batch = await sender.CreateBatch();
78+
for (var i = 0; i < messageCount; i++)
79+
{
80+
var message = new Message(Encoding.UTF8.GetBytes("test" + i));
81+
message.Label = "test" + i;
82+
await batch.TryAdd(message);
83+
}
84+
85+
await sender.SendAsync(batch);
86+
Log($"Sent {messageCount} messages");
87+
}
6988

7089
internal static async Task<IList<Message>> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount)
7190
{
@@ -98,7 +117,7 @@ internal static async Task<IList<Message>> ReceiveDeferredMessagesAsync(IMessage
98117
var msg = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);
99118
if (msg != null)
100119
{
101-
messagesToReturn.Add(msg);
120+
messagesToReturn.Add(msg);
102121
}
103122
}
104123

0 commit comments

Comments
 (0)