Skip to content

Commit f0a9a8d

Browse files
authored
add eventhub name into load balancer logs (Azure#47271)
1 parent 42a6b1f commit f0a9a8d

File tree

3 files changed

+66
-52
lines changed

3 files changed

+66
-52
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/PartitionLoadBalancerEventSource.cs

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ protected PartitionLoadBalancerEventSource() : base(EventSourceName)
4343
/// </summary>
4444
///
4545
/// <param name="count"> Minimum partitions per event processor.</param>
46+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
4647
///
47-
[Event(1, Level = EventLevel.Verbose, Message = "Expected minimum partitions per event processor '{0}'.")]
48-
public virtual void MinimumPartitionsPerEventProcessor(int count)
48+
[Event(1, Level = EventLevel.Verbose, Message = "Expected minimum partitions per event processor '{0}' for Event Hub '{1}'.")]
49+
public virtual void MinimumPartitionsPerEventProcessor(int count, string eventHubName)
4950
{
5051
if (IsEnabled())
5152
{
52-
WriteEvent(1, count);
53+
WriteEvent(1, count, eventHubName);
5354
}
5455
}
5556

@@ -59,14 +60,16 @@ public virtual void MinimumPartitionsPerEventProcessor(int count)
5960
///
6061
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
6162
/// <param name="count"> Current ownership count.</param>
63+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
6264
///
63-
[Event(2, Level = EventLevel.Informational, Message = "Current ownership count is {0}. (Identifier: '{1}')")]
65+
[Event(2, Level = EventLevel.Informational, Message = "Current ownership count is {0} for Event Hub '{2}'. (Identifier: '{1}')")]
6466
public virtual void CurrentOwnershipCount(int count,
65-
string identifier)
67+
string identifier,
68+
string eventHubName)
6669
{
6770
if (IsEnabled())
6871
{
69-
WriteEvent(2, count, identifier ?? string.Empty);
72+
WriteEvent(2, count, identifier ?? string.Empty, eventHubName);
7073
}
7174
}
7275

@@ -75,13 +78,14 @@ public virtual void CurrentOwnershipCount(int count,
7578
/// </summary>
7679
///
7780
/// <param name="unclaimedPartitions">List of unclaimed partitions.</param>
81+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
7882
///
79-
[Event(3, Level = EventLevel.Informational, Message = "Unclaimed partitions: '{0}'.")]
80-
public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions)
83+
[Event(3, Level = EventLevel.Informational, Message = "Unclaimed partitions: '{0}' for Event Hub '{1}'.")]
84+
public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions, string eventHubName)
8185
{
8286
if (IsEnabled())
8387
{
84-
WriteEvent(3, string.Join(", ", unclaimedPartitions));
88+
WriteEvent(3, string.Join(", ", unclaimedPartitions), eventHubName);
8589
}
8690
}
8791

@@ -90,13 +94,14 @@ public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions)
9094
/// </summary>
9195
///
9296
/// <param name="partitionId">The identifier of the Event Hub partition whose ownership claim attempt is starting.</param>
97+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
9398
///
94-
[Event(4, Level = EventLevel.Informational, Message = "Attempting to claim ownership of partition '{0}'.")]
95-
public virtual void ClaimOwnershipStart(string partitionId)
99+
[Event(4, Level = EventLevel.Informational, Message = "Attempting to claim ownership of partition '{0}' for Event Hub '{1}'.")]
100+
public virtual void ClaimOwnershipStart(string partitionId, string eventHubName)
96101
{
97102
if (IsEnabled())
98103
{
99-
WriteEvent(4, partitionId ?? string.Empty);
104+
WriteEvent(4, partitionId ?? string.Empty, eventHubName);
100105
}
101106
}
102107

@@ -106,14 +111,16 @@ public virtual void ClaimOwnershipStart(string partitionId)
106111
///
107112
/// <param name="partitionId">The identifier of the Event Hub partition.</param>
108113
/// <param name="errorMessage">The message for the exception that occurred.</param>
114+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
109115
///
110-
[Event(5, Level = EventLevel.Error, Message = "Failed to claim ownership of partition '{0}'. (ErrorMessage: '{1}')")]
116+
[Event(5, Level = EventLevel.Error, Message = "Failed to claim ownership of partition '{0}' for Event Hub '{2}'. (ErrorMessage: '{1}')")]
111117
public virtual void ClaimOwnershipError(string partitionId,
112-
string errorMessage)
118+
string errorMessage,
119+
string eventHubName)
113120
{
114121
if (IsEnabled())
115122
{
116-
WriteEvent(5, partitionId ?? string.Empty, errorMessage ?? string.Empty);
123+
WriteEvent(5, partitionId ?? string.Empty, errorMessage ?? string.Empty, eventHubName);
117124
}
118125
}
119126

@@ -122,13 +129,14 @@ public virtual void ClaimOwnershipError(string partitionId,
122129
/// </summary>
123130
///
124131
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
132+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
125133
///
126-
[Event(6, Level = EventLevel.Informational, Message = "Load is unbalanced and this load balancer should steal a partition. (Identifier: '{0}')")]
127-
public virtual void ShouldStealPartition(string identifier)
134+
[Event(6, Level = EventLevel.Informational, Message = "Load is unbalanced and this load balancer should steal a partition for Event Hub '{1}'. (Identifier: '{0}')")]
135+
public virtual void ShouldStealPartition(string identifier, string eventHubName)
128136
{
129137
if (IsEnabled())
130138
{
131-
WriteEvent(6, identifier ?? string.Empty);
139+
WriteEvent(6, identifier ?? string.Empty, eventHubName);
132140
}
133141
}
134142

@@ -139,15 +147,17 @@ public virtual void ShouldStealPartition(string identifier)
139147
/// <param name="partitionId">The identifier of the partition that was selected to be stolen.</param>
140148
/// <param name="stolenFrom">The identifier of the event processor that is being stolen from.</param>
141149
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
150+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
142151
///
143-
[Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}'. (Identifier: '{2}')")]
152+
[Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}' for Event Hub '{3}'. (Identifier: '{2}').")]
144153
public virtual void StealPartition(string partitionId,
145154
string stolenFrom,
146-
string identifier)
155+
string identifier,
156+
string eventHubName)
147157
{
148158
if (IsEnabled())
149159
{
150-
WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty);
160+
WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty, eventHubName);
151161
}
152162
}
153163

@@ -156,13 +166,14 @@ public virtual void StealPartition(string partitionId,
156166
/// </summary>
157167
///
158168
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
169+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
159170
///
160-
[Event(8, Level = EventLevel.Verbose, Message = "Attempting to renew ownership. (Identifier: '{0}')")]
161-
public virtual void RenewOwnershipStart(string identifier)
171+
[Event(8, Level = EventLevel.Verbose, Message = "Attempting to renew ownership for Event Hub '{1}'. (Identifier: '{0}')")]
172+
public virtual void RenewOwnershipStart(string identifier, string eventHubName)
162173
{
163174
if (IsEnabled())
164175
{
165-
WriteEvent(8, identifier ?? string.Empty);
176+
WriteEvent(8, identifier ?? string.Empty, eventHubName);
166177
}
167178
}
168179

@@ -172,14 +183,16 @@ public virtual void RenewOwnershipStart(string identifier)
172183
///
173184
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
174185
/// <param name="errorMessage">The message for the exception that occurred.</param>
186+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
175187
///
176-
[Event(9, Level = EventLevel.Error, Message = "Failed to renew ownership. (Identifier: '{0}'; ErrorMessage: '{0}')")]
188+
[Event(9, Level = EventLevel.Error, Message = "Failed to renew ownership for Event Hub '{2}'. (Identifier: '{0}'; ErrorMessage: '{1}')")]
177189
public virtual void RenewOwnershipError(string identifier,
178-
string errorMessage)
190+
string errorMessage,
191+
string eventHubName)
179192
{
180193
if (IsEnabled())
181194
{
182-
WriteEvent(9, identifier ?? string.Empty, errorMessage ?? string.Empty);
195+
WriteEvent(9, identifier ?? string.Empty, errorMessage ?? string.Empty, eventHubName);
183196
}
184197
}
185198

@@ -188,13 +201,14 @@ public virtual void RenewOwnershipError(string identifier,
188201
/// </summary>
189202
///
190203
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
204+
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
191205
///
192-
[Event(10, Level = EventLevel.Verbose, Message = "Attempt to renew ownership has completed. (Identifier: '{0}')")]
193-
public virtual void RenewOwnershipComplete(string identifier)
206+
[Event(10, Level = EventLevel.Verbose, Message = "Attempt to renew ownership has completed for Event Hub '{1}'. (Identifier: '{0}')")]
207+
public virtual void RenewOwnershipComplete(string identifier, string eventHubName)
194208
{
195209
if (IsEnabled())
196210
{
197-
WriteEvent(10, identifier ?? string.Empty);
211+
WriteEvent(10, identifier ?? string.Empty, eventHubName);
198212
}
199213
}
200214
}

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -350,10 +350,10 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
350350

351351
var unevenPartitionDistribution = (partitionCount % ActiveOwnershipWithDistribution.Keys.Count) > 0;
352352
var minimumOwnedPartitionsCount = partitionCount / ActiveOwnershipWithDistribution.Keys.Count;
353-
Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount);
353+
Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount, EventHubName);
354354

355355
var ownedPartitionsCount = ActiveOwnershipWithDistribution[OwnerIdentifier].Count;
356-
Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier);
356+
Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier, EventHubName);
357357

358358
// There are two possible situations in which we may need to claim a partition ownership:
359359
//
@@ -374,7 +374,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
374374
{
375375
// Look for unclaimed partitions. If any, randomly pick one of them to claim.
376376

377-
Logger.UnclaimedPartitions(unclaimedPartitions);
377+
Logger.UnclaimedPartitions(unclaimedPartitions, EventHubName);
378378

379379
if (unclaimedPartitions.Count > 0)
380380
{
@@ -423,7 +423,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
423423
if ((ownedPartitionsCount < minimumOwnedPartitionsCount)
424424
|| (ownedPartitionsCount < maximumOwnedPartitionsCount && partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount.Count > 0))
425425
{
426-
Logger.ShouldStealPartition(OwnerIdentifier);
426+
Logger.ShouldStealPartition(OwnerIdentifier, EventHubName);
427427

428428
// Prefer stealing from a processor that owns more than the maximum number of partitions.
429429

@@ -444,7 +444,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
444444
}
445445
}
446446

447-
Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier);
447+
Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier, EventHubName);
448448

449449
var returnTask = ClaimOwnershipAsync(
450450
partitionToSteal,
@@ -474,7 +474,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
474474
}
475475
}
476476

477-
Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier);
477+
Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier, EventHubName);
478478

479479
var returnTask = ClaimOwnershipAsync(
480480
partitionToSteal,
@@ -501,7 +501,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
501501
{
502502
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
503503

504-
Logger.RenewOwnershipStart(OwnerIdentifier);
504+
Logger.RenewOwnershipStart(OwnerIdentifier, EventHubName);
505505

506506
var utcNow = GetDateTimeOffsetNow();
507507

@@ -543,7 +543,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
543543
// If ownership renewal fails just give up and try again in the next cycle. The processor may
544544
// end up losing some of its ownership.
545545

546-
Logger.RenewOwnershipError(OwnerIdentifier, ex.Message);
546+
Logger.RenewOwnershipError(OwnerIdentifier, ex.Message, EventHubName);
547547

548548
// Set the EventHubName to null so it doesn't modify the exception message. This exception message is
549549
// used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
@@ -553,7 +553,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
553553
}
554554
finally
555555
{
556-
Logger.RenewOwnershipComplete(OwnerIdentifier);
556+
Logger.RenewOwnershipComplete(OwnerIdentifier, EventHubName);
557557
}
558558
}
559559

@@ -572,7 +572,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
572572
CancellationToken cancellationToken)
573573
{
574574
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
575-
Logger.ClaimOwnershipStart(partitionId);
575+
Logger.ClaimOwnershipStart(partitionId, EventHubName);
576576

577577
// We need the eTag from the most recent ownership of this partition, even if it's expired. We want to keep the offset and
578578
// the sequence number as well.
@@ -602,7 +602,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
602602

603603
// If ownership claim fails, just treat it as a usual ownership claim failure.
604604

605-
Logger.ClaimOwnershipError(partitionId, ex.Message);
605+
Logger.ClaimOwnershipError(partitionId, ex.Message, EventHubName);
606606

607607
// Set the EventHubName to null so it doesn't modify the exception message. This exception message is
608608
// used so the processor can retrieve the raw Operation string, and adding the EventHubName would append

sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealFromItself()
540540

541541
// Verify that no attempts to steal were logged.
542542

543-
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
543+
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
544544
}
545545

546546
/// <summary>
@@ -597,7 +597,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealWhenLessPartitionsThanProcess
597597

598598
// Verify that no attempts to steal were logged.
599599

600-
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
600+
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
601601
}
602602

603603
/// <summary>
@@ -678,7 +678,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealWhenTheLoadIsBalanced(int[] a
678678

679679
// Verify that no attempts to steal were logged.
680680

681-
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
681+
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
682682
}
683683

684684
/// <summary>
@@ -902,14 +902,14 @@ public async Task VerifiesEventProcessorLogs()
902902

903903
await loadbalancer.RelinquishOwnershipAsync(CancellationToken.None);
904904

905-
mockLog.Verify(m => m.RenewOwnershipStart(loadbalancer.OwnerIdentifier));
906-
mockLog.Verify(m => m.RenewOwnershipComplete(loadbalancer.OwnerIdentifier));
907-
mockLog.Verify(m => m.ClaimOwnershipStart(It.Is<string>(p => partitionIds.Contains(p))));
908-
mockLog.Verify(m => m.MinimumPartitionsPerEventProcessor(MinimumpartitionCount));
909-
mockLog.Verify(m => m.CurrentOwnershipCount(MinimumpartitionCount, loadbalancer.OwnerIdentifier));
910-
mockLog.Verify(m => m.StealPartition(It.IsAny<string>(), It.IsAny<string>(), loadbalancer.OwnerIdentifier));
911-
mockLog.Verify(m => m.ShouldStealPartition(loadbalancer.OwnerIdentifier));
912-
mockLog.Verify(m => m.UnclaimedPartitions(It.Is<HashSet<string>>(set => set.Count == 0 || set.All(item => partitionIds.Contains(item)))));
905+
mockLog.Verify(m => m.RenewOwnershipStart(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
906+
mockLog.Verify(m => m.RenewOwnershipComplete(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
907+
mockLog.Verify(m => m.ClaimOwnershipStart(It.Is<string>(p => partitionIds.Contains(p)), loadbalancer.EventHubName));
908+
mockLog.Verify(m => m.MinimumPartitionsPerEventProcessor(MinimumpartitionCount, loadbalancer.EventHubName));
909+
mockLog.Verify(m => m.CurrentOwnershipCount(MinimumpartitionCount, loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
910+
mockLog.Verify(m => m.StealPartition(It.IsAny<string>(), It.IsAny<string>(), loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
911+
mockLog.Verify(m => m.ShouldStealPartition(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
912+
mockLog.Verify(m => m.UnclaimedPartitions(It.Is<HashSet<string>>(set => set.Count == 0 || set.All(item => partitionIds.Contains(item))), loadbalancer.EventHubName));
913913
}
914914

915915
/// <summary>

0 commit comments

Comments
 (0)