Skip to content

Commit d970ca1

Browse files
authored
[Event Hubs] Add missing metadata to PartitionContext (#26737)
The focus of these changes is to add metadata about the associated Event Hub instance to the `PartitionContext` used by the consumer and processor clients. These fields are present in Java, Python, and JavaScript but are missing in .NET for some reason.
1 parent fecd566 commit d970ca1

13 files changed

+279
-47
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.slnf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
"Azure.Messaging.EventHubs.Processor\\tests\\Azure.Messaging.EventHubs.Processor.Tests.csproj"
77
]
88
}
9-
}
9+
}

sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@
44

55
### Features Added
66

7-
### Breaking Changes
8-
9-
### Bugs Fixed
10-
11-
### Other Changes
7+
- Added `FullyQualifiedNamespace`, `EventHubName`, and `ConsumerGroup` to the partition context associated with events dispatched for processing.
128

139
## 5.7.0-beta.2 (2022-01-13)
1410

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
<PackageReference Include="Azure.Core.Experimental" />
2222
<!-- END TEMP -->
2323

24-
<PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.7.0-beta.2" /><!-- TEMP: Remove after the 5.7.0 line goes GA -->
24+
<!--
25+
TEMP: Restore to package reference after the next release.
26+
27+
<PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.7.0-beta.2" />
28+
-->
29+
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
30+
<!-- END TEMP -->
31+
2532
<PackageReference Include="Azure.Storage.Blobs" />
2633
<PackageReference Include="Microsoft.Azure.Amqp" />
2734
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData>
923923

924924
try
925925
{
926-
context ??= new ProcessorPartitionContext(partition.PartitionId, () => ReadLastEnqueuedEventProperties(partition.PartitionId));
926+
context ??= new ProcessorPartitionContext(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partition.PartitionId, () => ReadLastEnqueuedEventProperties(partition.PartitionId));
927927
eventArgs = new ProcessEventArgs(context, eventData, updateToken => UpdateCheckpointAsync(eventData, context, updateToken), cancellationToken);
928928

929929
await _processEventAsync(eventArgs).ConfigureAwait(false);
@@ -945,7 +945,7 @@ protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData>
945945

946946
if (emptyBatch)
947947
{
948-
eventArgs = new ProcessEventArgs(new EmptyPartitionContext(partition.PartitionId), null, EmptyEventUpdateCheckpoint, cancellationToken);
948+
eventArgs = new ProcessEventArgs(new EmptyPartitionContext(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partition.PartitionId), null, EmptyEventUpdateCheckpoint, cancellationToken);
949949
await _processEventAsync(eventArgs).ConfigureAwait(false);
950950
}
951951
}
@@ -1367,11 +1367,17 @@ private class ProcessorPartitionContext : PartitionContext
13671367
/// Initializes a new instance of the <see cref="EmptyPartitionContext" /> class.
13681368
/// </summary>
13691369
///
1370-
/// <param name="partitionId">The identifier of the partition that the context represents.</param>
1370+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace this context is associated with.</param>
1371+
/// <param name="eventHubName">The name of the Event Hub partition this context is associated with.</param>
1372+
/// <param name="consumerGroup">The name of the consumer group this context is associated with.</param>
1373+
/// <param name="partitionId">The identifier of the Event Hub partition this context is associated with.</param>
13711374
/// <param name="readLastEnqueuedEventProperties">A function that can be used to read the last enqueued event properties for the partition.</param>
13721375
///
1373-
public ProcessorPartitionContext(string partitionId,
1374-
Func<LastEnqueuedEventProperties> readLastEnqueuedEventProperties) : base(partitionId)
1376+
public ProcessorPartitionContext(string fullyQualifiedNamespace,
1377+
string eventHubName,
1378+
string consumerGroup,
1379+
string partitionId,
1380+
Func<LastEnqueuedEventProperties> readLastEnqueuedEventProperties) : base(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId)
13751381
{
13761382
_readLastEnqueuedEventProperties = readLastEnqueuedEventProperties;
13771383
}
@@ -1399,9 +1405,15 @@ private class EmptyPartitionContext : PartitionContext
13991405
/// Initializes a new instance of the <see cref="EmptyPartitionContext" /> class.
14001406
/// </summary>
14011407
///
1402-
/// <param name="partitionId">The identifier of the partition that the context represents.</param>
1408+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace this context is associated with.</param>
1409+
/// <param name="eventHubName">The name of the Event Hub partition this context is associated with.</param>
1410+
/// <param name="consumerGroup">The name of the consumer group this context is associated with.</param>
1411+
/// <param name="partitionId">The identifier of the Event Hub partition this context is associated with.</param>
14031412
///
1404-
public EmptyPartitionContext(string partitionId) : base(partitionId)
1413+
public EmptyPartitionContext(string fullyQualifiedNamespace,
1414+
string eventHubName,
1415+
string consumerGroup,
1416+
string partitionId) : base(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId)
14051417
{
14061418
}
14071419

sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,9 @@ public async Task ProcessorRaisesProcessEventHandlerWhenEventsAreRead()
939939
for (var index = 0; index < eventBatch.Length; ++index)
940940
{
941941
Assert.That(capturedEventArgs[index].HasEvent, Is.True, $"The event arguments should contain an event at index { index }.");
942+
Assert.That(capturedEventArgs[index].Partition.FullyQualifiedNamespace, Is.EqualTo(processorClient.FullyQualifiedNamespace), "The fully qualified namespace should have been propagated.");
943+
Assert.That(capturedEventArgs[index].Partition.EventHubName, Is.EqualTo(processorClient.EventHubName), "The event hub name should have been propagated.");
944+
Assert.That(capturedEventArgs[index].Partition.ConsumerGroup, Is.EqualTo(processorClient.ConsumerGroup), "The consumer group should have been propagated.");
942945
Assert.That(capturedEventArgs[index].Partition.PartitionId, Is.EqualTo(partitionId), $"The partition identifier should have been propagated at index { index }.");
943946
Assert.That(capturedEventArgs[index].Data.IsEquivalentTo(eventBatch[index]), Is.True, $"The event should have been propagated and order preserved at index { index }.");
944947
Assert.That(capturedEventArgs[index].CancellationToken, Is.EqualTo(cancellationSource.Token), $"The cancellation token should have been propagated at index { index }.");
@@ -984,6 +987,9 @@ public async Task ProcessorRaisesProcessEventHandlerWithAnEmptyContextWhenThereA
984987

985988
Assert.That(capturedEventArgs, Is.Not.Null, "The event handler should have been fired.");
986989
Assert.That(capturedEventArgs.HasEvent, Is.False, "The event arguments should not contain an event.");
990+
Assert.That(capturedEventArgs.Partition.FullyQualifiedNamespace, Is.EqualTo(processorClient.FullyQualifiedNamespace), "The fully qualified namespace should have been propagated.");
991+
Assert.That(capturedEventArgs.Partition.EventHubName, Is.EqualTo(processorClient.EventHubName), "The event hub name should have been propagated.");
992+
Assert.That(capturedEventArgs.Partition.ConsumerGroup, Is.EqualTo(processorClient.ConsumerGroup), "The consumer group should have been propagated.");
987993
Assert.That(capturedEventArgs.Partition.PartitionId, Is.EqualTo(partitionId), "The partition identifier should have been propagated.");
988994
Assert.That(capturedEventArgs.Data, Is.Null, "No event data should have been propagated.");
989995
Assert.That(capturedEventArgs.CancellationToken, Is.EqualTo(cancellationSource.Token), "The cancellation token should have been propagated.");

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@
44

55
### Features Added
66

7-
### Breaking Changes
8-
9-
### Bugs Fixed
10-
11-
### Other Changes
7+
- Added `FullyQualifiedNamespace`, `EventHubName`, and `ConsumerGroup` to the partition context associated with events read by the `EventHubConsumerClient`.
128

139
## 5.7.0-beta.2 (2022-01-13)
1410

sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ public static partial class EventHubsModelFactory
140140
public static Azure.Messaging.EventHubs.Producer.EventDataBatch EventDataBatch(long batchSizeBytes, System.Collections.Generic.IList<Azure.Messaging.EventHubs.EventData> batchEventStore, Azure.Messaging.EventHubs.Producer.CreateBatchOptions batchOptions = null, System.Func<Azure.Messaging.EventHubs.EventData, bool> tryAddCallback = null) { throw null; }
141141
public static Azure.Messaging.EventHubs.EventHubProperties EventHubProperties(string name, System.DateTimeOffset createdOn, string[] partitionIds) { throw null; }
142142
public static Azure.Messaging.EventHubs.Consumer.LastEnqueuedEventProperties LastEnqueuedEventProperties(long? lastSequenceNumber, long? lastOffset, System.DateTimeOffset? lastEnqueuedTime, System.DateTimeOffset? lastReceivedTime) { throw null; }
143+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
143144
public static Azure.Messaging.EventHubs.Consumer.PartitionContext PartitionContext(string partitionId, Azure.Messaging.EventHubs.Consumer.LastEnqueuedEventProperties lastEnqueuedEventProperties = default(Azure.Messaging.EventHubs.Consumer.LastEnqueuedEventProperties)) { throw null; }
145+
public static Azure.Messaging.EventHubs.Consumer.PartitionContext PartitionContext(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Azure.Messaging.EventHubs.Consumer.LastEnqueuedEventProperties lastEnqueuedEventProperties = default(Azure.Messaging.EventHubs.Consumer.LastEnqueuedEventProperties)) { throw null; }
144146
public static Azure.Messaging.EventHubs.PartitionProperties PartitionProperties(string eventHubName, string partitionId, bool isEmpty, long beginningSequenceNumber, long lastSequenceNumber, long lastOffset, System.DateTimeOffset lastEnqueuedTime) { throw null; }
145147
}
146148
public enum EventHubsRetryMode
@@ -281,7 +283,12 @@ public partial struct LastEnqueuedEventProperties : System.IEquatable<Azure.Mess
281283
}
282284
public partial class PartitionContext
283285
{
284-
protected internal PartitionContext(string partitionId) { }
286+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
287+
protected PartitionContext(string partitionId) { }
288+
protected internal PartitionContext(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) { }
289+
public string ConsumerGroup { get { throw null; } }
290+
public string EventHubName { get { throw null; } }
291+
public string FullyQualifiedNamespace { get { throw null; } }
285292
public string PartitionId { get { throw null; } }
286293
public virtual Azure.Messaging.EventHubs.Consumer.LastEnqueuedEventProperties ReadLastEnqueuedEventProperties() { throw null; }
287294
}

sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ public virtual async IAsyncEnumerable<PartitionEvent> ReadEventsFromPartitionAsy
487487
try
488488
{
489489
transportConsumer = Connection.CreateTransportConsumer(ConsumerGroup, partitionId, Identifier, startingPosition, RetryPolicy, readOptions.TrackLastEnqueuedEventProperties, InvalidateConsumerWhenPartitionIsStolen, readOptions.OwnerLevel, (uint)readOptions.PrefetchCount);
490-
partitionContext = new PartitionContext(partitionId, transportConsumer);
490+
partitionContext = new PartitionContext(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, transportConsumer);
491491
emptyPartitionEvent = new PartitionEvent(partitionContext, null);
492492
}
493493
catch (Exception ex)
@@ -987,7 +987,7 @@ void exceptionCallback(Exception ex)
987987
(
988988
transportConsumer,
989989
channel,
990-
new PartitionContext(partitionId, transportConsumer),
990+
new PartitionContext(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, transportConsumer),
991991
receiveBatchSize,
992992
exceptionCallback,
993993
publishingCancellationSource.Token

sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/PartitionContext.cs

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
using System;
5+
using System.ComponentModel;
56
using Azure.Core;
67
using Azure.Messaging.EventHubs.Core;
78

@@ -14,6 +15,24 @@ namespace Azure.Messaging.EventHubs.Consumer
1415
///
1516
public class PartitionContext
1617
{
18+
/// <summary>
19+
/// The fully qualified Event Hubs namespace that this context is associated with.
20+
/// </summary>
21+
///
22+
public string FullyQualifiedNamespace { get; }
23+
24+
/// <summary>
25+
/// The name of the Event Hub that this context is associated with.
26+
/// </summary>
27+
///
28+
public string EventHubName { get; }
29+
30+
/// <summary>
31+
/// The name of the consumer group that this context is associated with.
32+
/// </summary>
33+
///
34+
public string ConsumerGroup { get; }
35+
1736
/// <summary>
1837
/// The identifier of the Event Hub partition this context is associated with.
1938
/// </summary>
@@ -62,6 +81,9 @@ public virtual LastEnqueuedEventProperties ReadLastEnqueuedEventProperties()
6281
/// Initializes a new instance of the <see cref="PartitionContext"/> class.
6382
/// </summary>
6483
///
84+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace this context is associated with.</param>
85+
/// <param name="eventHubName">The name of the Event Hub partition this context is associated with.</param>
86+
/// <param name="consumerGroup">The name of the consumer group this context is associated with.</param>
6587
/// <param name="partitionId">The identifier of the Event Hub partition this context is associated with.</param>
6688
/// <param name="consumer">The <see cref="TransportConsumer" /> for this context to use as the source for information.</param>
6789
///
@@ -71,8 +93,11 @@ public virtual LastEnqueuedEventProperties ReadLastEnqueuedEventProperties()
7193
/// consumer instance.
7294
/// </remarks>
7395
///
74-
internal PartitionContext(string partitionId,
75-
TransportConsumer consumer) : this(partitionId)
96+
internal PartitionContext(string fullyQualifiedNamespace,
97+
string eventHubName,
98+
string consumerGroup,
99+
string partitionId,
100+
TransportConsumer consumer) : this(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId)
76101
{
77102
Argument.AssertNotNull(consumer, nameof(consumer));
78103
SourceConsumer = new WeakReference<TransportConsumer>(consumer);
@@ -82,9 +107,40 @@ internal PartitionContext(string partitionId,
82107
/// Initializes a new instance of the <see cref="PartitionContext"/> class.
83108
/// </summary>
84109
///
110+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace this context is associated with.</param>
111+
/// <param name="eventHubName">The name of the Event Hub partition this context is associated with.</param>
112+
/// <param name="consumerGroup">The name of the consumer group this context is associated with.</param>
85113
/// <param name="partitionId">The identifier of the Event Hub partition this context is associated with.</param>
86114
///
87-
protected internal PartitionContext(string partitionId)
115+
protected internal PartitionContext(string fullyQualifiedNamespace,
116+
string eventHubName,
117+
string consumerGroup,
118+
string partitionId)
119+
{
120+
Argument.AssertNotNullOrEmpty(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace));
121+
Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
122+
Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
123+
Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
124+
125+
FullyQualifiedNamespace = fullyQualifiedNamespace;
126+
EventHubName = eventHubName;
127+
ConsumerGroup = consumerGroup;
128+
PartitionId = partitionId;
129+
}
130+
131+
/// <summary>
132+
/// Initializes a new instance of the <see cref="PartitionContext"/> class.
133+
/// </summary>
134+
///
135+
/// <param name="partitionId">The identifier of the Event Hub partition this context is associated with.</param>
136+
///
137+
/// <remarks>
138+
/// This overload should no longer be used; it does not set the members of
139+
/// the context not specified.
140+
/// </remarks>
141+
///
142+
[EditorBrowsable(EditorBrowsableState.Never)]
143+
protected PartitionContext(string partitionId)
88144
{
89145
Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
90146
PartitionId = partitionId;

0 commit comments

Comments
 (0)