Skip to content

Commit f9ae761

Browse files
Instrument events when they are enqueued in the buffered producer (Azure#38273)
* Instrument events when they are enqueued in the buffered producer * Fix tests * Fix tests * Set W3C format on net462 * updated wrong test * PR fb * Fix spacing
1 parent 890897e commit f9ae761

File tree

4 files changed

+234
-34
lines changed

4 files changed

+234
-34
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- When using the `EventHubBufferedProducerClient`, events are now instrumented when `EnqueueEventAsync` or `EnqueueEventsAsync` is called, rather than when the event is published. This ensures that the instrumentation is accurate when the event is published, regardless of whether the event is published immediately or buffered for a period of time.
12+
1113
### Other Changes
1214

1315
## 5.9.2 (2023-06-06)

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using Azure.Core;
1515
using Azure.Core.Diagnostics;
1616
using Azure.Core.Pipeline;
17+
using Azure.Core.Shared;
1718
using Azure.Messaging.EventHubs.Amqp;
1819
using Azure.Messaging.EventHubs.Core;
1920
using Azure.Messaging.EventHubs.Diagnostics;
@@ -128,6 +129,9 @@ public class EventHubBufferedProducerClient : IAsyncDisposable
128129
/// <summary>Indicates whether or not this instance has been closed.</summary>
129130
private volatile bool _isClosed;
130131

132+
/// <summary>The client diagnostics instance used to instrument events when enqueueing.</summary>
133+
private readonly MessagingClientDiagnostics _clientDiagnostics;
134+
131135
/// <summary>
132136
/// The fully qualified Event Hubs namespace that this producer is currently associated with, which will likely be similar
133137
/// to <c>{yournamespace}.servicebus.windows.net</c>.
@@ -459,6 +463,13 @@ public EventHubBufferedProducerClient(string connectionString,
459463
{
460464
Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString));
461465
_producer = new EventHubProducerClient(connectionString, eventHubName, (clientOptions ?? DefaultOptions).ToEventHubProducerClientOptions());
466+
467+
_clientDiagnostics = new MessagingClientDiagnostics(
468+
DiagnosticProperty.DiagnosticNamespace,
469+
DiagnosticProperty.ResourceProviderNamespace,
470+
DiagnosticProperty.EventHubsServiceContext,
471+
_producer.FullyQualifiedNamespace,
472+
_producer.EventHubName);
462473
}
463474

464475
/// <summary>
@@ -520,6 +531,13 @@ public EventHubBufferedProducerClient(EventHubConnection connection,
520531
EventHubBufferedProducerClientOptions clientOptions = default) : this(clientOptions)
521532
{
522533
_producer = new EventHubProducerClient(connection, (clientOptions ?? DefaultOptions).ToEventHubProducerClientOptions());
534+
535+
_clientDiagnostics = new MessagingClientDiagnostics(
536+
DiagnosticProperty.DiagnosticNamespace,
537+
DiagnosticProperty.ResourceProviderNamespace,
538+
DiagnosticProperty.EventHubsServiceContext,
539+
_producer.FullyQualifiedNamespace,
540+
_producer.EventHubName);
523541
}
524542

525543
/// <summary>
@@ -538,6 +556,12 @@ internal EventHubBufferedProducerClient(EventHubProducerClient producer,
538556
EventHubBufferedProducerClientOptions clientOptions = default) : this(clientOptions)
539557
{
540558
_producer = producer;
559+
_clientDiagnostics = new MessagingClientDiagnostics(
560+
DiagnosticProperty.DiagnosticNamespace,
561+
DiagnosticProperty.ResourceProviderNamespace,
562+
DiagnosticProperty.EventHubsServiceContext,
563+
_producer.FullyQualifiedNamespace,
564+
_producer.EventHubName);
541565
}
542566

543567
/// <summary>
@@ -575,6 +599,13 @@ private EventHubBufferedProducerClient(string fullyQualifiedNamespace,
575599
AzureNamedKeyCredential keyCred => new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, keyCred, options),
576600
_ => throw new ArgumentException(Resources.UnsupportedCredential, nameof(credential))
577601
};
602+
603+
_clientDiagnostics = new MessagingClientDiagnostics(
604+
DiagnosticProperty.DiagnosticNamespace,
605+
DiagnosticProperty.ResourceProviderNamespace,
606+
DiagnosticProperty.EventHubsServiceContext,
607+
_producer.FullyQualifiedNamespace,
608+
_producer.EventHubName);
578609
}
579610

580611
/// <summary>
@@ -789,6 +820,7 @@ public virtual async Task<int> EnqueueEventAsync(EventData eventData,
789820
var partitionState = _activePartitionStateMap.GetOrAdd(partitionId, partitionId => new PartitionPublishingState(partitionId, _options));
790821
var writer = partitionState.PendingEventsWriter;
791822

823+
_clientDiagnostics.InstrumentMessage(eventData.Properties, DiagnosticProperty.EventActivityName, out _, out _);
792824
await writer.WriteAsync(eventData, cancellationToken).ConfigureAwait(false);
793825

794826
var count = Interlocked.Increment(ref _totalBufferedEventCount);
@@ -968,6 +1000,7 @@ public virtual async Task<int> EnqueueEventsAsync(IEnumerable<EventData> events,
9681000
var activePartitionState = partitionState ?? _activePartitionStateMap.GetOrAdd(eventPartitionId, partitionId => new PartitionPublishingState(eventPartitionId, _options));
9691001
var writer = activePartitionState.PendingEventsWriter;
9701002

1003+
_clientDiagnostics.InstrumentMessage(eventData.Properties, DiagnosticProperty.EventActivityName, out _, out _);
9711004
await writer.WriteAsync(eventData, cancellationToken).ConfigureAwait(false);
9721005

9731006
var count = Interlocked.Increment(ref _totalBufferedEventCount);

sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsTests.cs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,136 @@ await producer.SendAsync(new[]
187187
}
188188
}
189189

190+
/// <summary>
191+
/// Verifies diagnostics functionality of the <see cref="EventHubBufferedProducerClient" />
192+
/// class.
193+
/// </summary>
194+
///
195+
[Test]
196+
public async Task EventHubBufferedProducerAppliesDiagnosticIdToEventsOnEnqueue()
197+
{
198+
using var testListener = new ClientDiagnosticListener(DiagnosticProperty.DiagnosticNamespace);
199+
Activity activity = new Activity("SomeActivity");
200+
#if NET462
201+
activity.SetIdFormat(ActivityIdFormat.W3C);
202+
#endif
203+
activity.Start();
204+
205+
var eventHubName = "SomeName";
206+
var endpoint = "some.endpoint.com";
207+
var fakeConnection = new MockConnection(endpoint, eventHubName);
208+
209+
var writtenEventsData = new List<EventData>();
210+
211+
var mockProducer = new Mock<EventHubProducerClient>(fakeConnection, new EventHubProducerClientOptions());
212+
mockProducer
213+
.Setup(producer => producer.GetPartitionIdsAsync(It.IsAny<CancellationToken>()))
214+
.ReturnsAsync(new[] { "0"});
215+
int times = 0;
216+
mockProducer
217+
.Setup(producer => producer.CreateBatchAsync(It.IsAny<CreateBatchOptions>(), It.IsAny<CancellationToken>()))
218+
.Returns<CreateBatchOptions, CancellationToken>((options, token) =>
219+
{
220+
List<EventData> events = new List<EventData>();
221+
if (times++ == 0)
222+
{
223+
events = writtenEventsData;
224+
}
225+
226+
return new ValueTask<EventDataBatch>(
227+
EventHubsModelFactory.EventDataBatch(long.MaxValue, events, options));
228+
});
229+
230+
mockProducer
231+
.Setup(m => m.SendAsync(It.IsAny<EventDataBatch>(), It.IsAny<CancellationToken>()))
232+
.Returns(Task.CompletedTask);
233+
234+
var bufferedProducer = new EventHubBufferedProducerClient(mockProducer.Object);
235+
236+
bufferedProducer.SendEventBatchFailedAsync += args => Task.CompletedTask;
237+
await bufferedProducer.EnqueueEventsAsync(new[]
238+
{
239+
new EventData(ReadOnlyMemory<byte>.Empty),
240+
new EventData(ReadOnlyMemory<byte>.Empty)
241+
});
242+
243+
// Stop the activity before flushing so that we verify that the events are instrumented at time of enqueueing
244+
activity.Stop();
245+
246+
await bufferedProducer.FlushAsync();
247+
Assert.That(writtenEventsData.Count, Is.EqualTo(2), "All events should have been instrumented.");
248+
249+
foreach (EventData eventData in writtenEventsData)
250+
{
251+
Assert.That(eventData.Properties.TryGetValue(MessagingClientDiagnostics.DiagnosticIdAttribute, out object value), Is.True, "The events should have a diagnostic identifier property.");
252+
ActivityContext.TryParse((string)value, null, out var context);
253+
Assert.That(context.TraceId, Is.EqualTo(activity.TraceId), "The trace identifier should match the activity in the active scope when the events were enqueued.");
254+
}
255+
}
256+
257+
/// <summary>
258+
/// Verifies diagnostics functionality of the <see cref="EventHubBufferedProducerClient" />
259+
/// class when enqueueing a single event.
260+
/// </summary>
261+
///
262+
[Test]
263+
public async Task EventHubBufferedProducerAppliesDiagnosticIdToSingleEventOnEnqueue()
264+
{
265+
using var testListener = new ClientDiagnosticListener(DiagnosticProperty.DiagnosticNamespace);
266+
Activity activity = new Activity("SomeActivity");
267+
#if NET462
268+
activity.SetIdFormat(ActivityIdFormat.W3C);
269+
#endif
270+
activity.Start();
271+
272+
var eventHubName = "SomeName";
273+
var endpoint = "some.endpoint.com";
274+
var fakeConnection = new MockConnection(endpoint, eventHubName);
275+
276+
var writtenEventsData = new List<EventData>();
277+
278+
var mockProducer = new Mock<EventHubProducerClient>(fakeConnection, new EventHubProducerClientOptions());
279+
mockProducer
280+
.Setup(producer => producer.GetPartitionIdsAsync(It.IsAny<CancellationToken>()))
281+
.ReturnsAsync(new[] { "0"});
282+
int times = 0;
283+
mockProducer
284+
.Setup(producer => producer.CreateBatchAsync(It.IsAny<CreateBatchOptions>(), It.IsAny<CancellationToken>()))
285+
.Returns<CreateBatchOptions, CancellationToken>((options, token) =>
286+
{
287+
List<EventData> events = new List<EventData>();
288+
if (times++ == 0)
289+
{
290+
events = writtenEventsData;
291+
}
292+
293+
return new ValueTask<EventDataBatch>(
294+
EventHubsModelFactory.EventDataBatch(long.MaxValue, events, options));
295+
});
296+
297+
mockProducer
298+
.Setup(m => m.SendAsync(It.IsAny<EventDataBatch>(), It.IsAny<CancellationToken>()))
299+
.Returns(Task.CompletedTask);
300+
301+
var bufferedProducer = new EventHubBufferedProducerClient(mockProducer.Object);
302+
303+
bufferedProducer.SendEventBatchFailedAsync += args => Task.CompletedTask;
304+
await bufferedProducer.EnqueueEventAsync(new EventData(ReadOnlyMemory<byte>.Empty));
305+
306+
// Stop the activity before flushing so that we verify that the events are instrumented at time of enqueueing
307+
activity.Stop();
308+
309+
await bufferedProducer.FlushAsync();
310+
Assert.That(writtenEventsData.Count, Is.EqualTo(1), "All events should have been instrumented.");
311+
312+
foreach (EventData eventData in writtenEventsData)
313+
{
314+
Assert.That(eventData.Properties.TryGetValue(MessagingClientDiagnostics.DiagnosticIdAttribute, out object value), Is.True, "The events should have a diagnostic identifier property.");
315+
ActivityContext.TryParse((string)value, null, out var context);
316+
Assert.That(context.TraceId, Is.EqualTo(activity.TraceId), "The trace identifier should match the activity in the active scope when the events were enqueued.");
317+
}
318+
}
319+
190320
/// <summary>
191321
/// Verifies diagnostics functionality of the <see cref="EventHubProducerClient" />
192322
/// class.

0 commit comments

Comments
 (0)