Skip to content

Commit 1f0e3ce

Browse files
aloivaPranava Vedagnya Gaddam
andauthored
Add concurrency for external API tasks in eventhub GetMetricsAsync (Azure#52448)
* batching for checkpoint tasks * concurrency = 50 * add cancellation token. * fix UT setup * improve logging * add timeout * extend ut to check logs * add more log details in ut * add comments * add ut to chech task cancellation * add delay to ensure semaphore assignment * add threshold for partition task * set concurrency to processorCount*2 * edit timeouts --------- Co-authored-by: Pranava Vedagnya Gaddam <[email protected]>
1 parent c757d75 commit 1f0e3ce

File tree

2 files changed

+185
-17
lines changed

2 files changed

+185
-17
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubMetricsProvider.cs

Lines changed: 82 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,26 +56,95 @@ public async Task<EventHubsTriggerMetrics> GetMetricsAsync()
5656

5757
// Get the PartitionRuntimeInformation for all partitions
5858
_logger.LogInformation($"Querying partition information for {partitions.Length} partitions.");
59-
var partitionPropertiesTasks = new Task<PartitionProperties>[partitions.Length];
60-
var checkpointTasks = new Task<EventProcessorCheckpoint>[partitionPropertiesTasks.Length];
6159

62-
for (int i = 0; i < partitions.Length; i++)
63-
{
64-
partitionPropertiesTasks[i] = _client.GetPartitionPropertiesAsync(partitions[i]);
60+
// Limit number of concurrent requests to eventhub client
61+
int ConcurrencyLimit = Environment.ProcessorCount * 2;
62+
const int CheckpointWaitTimeoutMs = 10000;
63+
const int PartitionPropertiesWaitTimeoutMs = 30000;
64+
using var semaphore = new SemaphoreSlim(ConcurrencyLimit, ConcurrencyLimit);
65+
using var cts = new CancellationTokenSource();
6566

66-
checkpointTasks[i] = _checkpointStore.GetCheckpointAsync(
67-
_client.FullyQualifiedNamespace,
68-
_client.EventHubName,
69-
_client.ConsumerGroup,
70-
partitions[i],
71-
CancellationToken.None);
67+
// Get partition properties
68+
var partitionPropertiesTasks = new Task<PartitionProperties>[partitions.Length];
69+
try
70+
{
71+
partitionPropertiesTasks = partitions.Select(async partition =>
72+
{
73+
bool acquired = false;
74+
try
75+
{
76+
acquired = await semaphore.WaitAsync(PartitionPropertiesWaitTimeoutMs, cts.Token).ConfigureAwait(false);
77+
if (!acquired)
78+
{
79+
throw new TimeoutException(
80+
$"Failed to acquire EH client concurrency slot within {PartitionPropertiesWaitTimeoutMs}ms for Event Hub '{_client.EventHubName}', partition '{partition}'.");
81+
}
82+
return await _client.GetPartitionPropertiesAsync(partition).ConfigureAwait(false);
83+
}
84+
catch (Exception e)
85+
{
86+
if (!cts.Token.IsCancellationRequested)
87+
{
88+
_logger.LogDebug($"Requesting cancellation of other partition info tasks. Error while getting partition info for eventhub '{_client.EventHubName}', partition '{partition}': {e.Message}");
89+
cts.Cancel();
90+
}
91+
throw;
92+
}
93+
finally
94+
{
95+
if (acquired)
96+
{
97+
semaphore.Release();
98+
}
99+
}
100+
}).ToArray();
101+
await Task.WhenAll(partitionPropertiesTasks).ConfigureAwait(false);
102+
}
103+
catch (Exception e)
104+
{
105+
_logger.LogWarning($"Encountered an exception while getting partition information for Event Hub '{_client.EventHubName}' used for scaling. Error: {e.Message}");
72106
}
73107

74-
await Task.WhenAll(partitionPropertiesTasks).ConfigureAwait(false);
108+
// Get checkpoints
75109
EventProcessorCheckpoint[] checkpoints = null;
76-
77110
try
78111
{
112+
var checkpointTasks = partitions.Select(async partition =>
113+
{
114+
bool acquired = false;
115+
try
116+
{
117+
acquired = await semaphore.WaitAsync(CheckpointWaitTimeoutMs, cts.Token).ConfigureAwait(false);
118+
if (!acquired)
119+
{
120+
throw new TimeoutException(
121+
$"Failed to acquire checkpoint concurrency slot within {CheckpointWaitTimeoutMs}ms for Event Hub '{_client.EventHubName}', partition '{partition}'.");
122+
}
123+
124+
return await _checkpointStore.GetCheckpointAsync(
125+
_client.FullyQualifiedNamespace,
126+
_client.EventHubName,
127+
_client.ConsumerGroup,
128+
partition,
129+
cts.Token).ConfigureAwait(false);
130+
}
131+
catch (Exception e)
132+
{
133+
if (!cts.Token.IsCancellationRequested)
134+
{
135+
_logger.LogDebug($"Requesting cancellation of other checkpoint tasks. Error while getting checkpoint for eventhub '{_client.EventHubName}', partition '{partition}': {e.Message}");
136+
cts.Cancel();
137+
}
138+
throw;
139+
}
140+
finally
141+
{
142+
if (acquired)
143+
{
144+
semaphore.Release();
145+
}
146+
}
147+
});
79148
checkpoints = await Task.WhenAll(checkpointTasks).ConfigureAwait(false);
80149
}
81150
catch (Exception e)

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsMetricsProviderTests.cs

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class EventHubsMetricsProviderTests
3434
private IEnumerable<PartitionProperties> _partitions;
3535
private IEnumerable<EventProcessorCheckpoint> _checkpoints;
3636

37+
private readonly string _errorMessage = "Uh oh";
38+
3739
[SetUp]
3840
public void SetUp()
3941
{
@@ -52,8 +54,26 @@ public void SetUp()
5254

5355
this._mockCheckpointStore = new Mock<BlobCheckpointStoreInternal>(MockBehavior.Strict);
5456

55-
_mockCheckpointStore.Setup(s => s.GetCheckpointAsync(_namespace, _eventHubName, _consumerGroup, It.IsAny<string>(), default))
56-
.Returns<string, string, string, string, CancellationToken>((ns, hub, cg, partitionId, ct) => Task.FromResult(_checkpoints == null ? null : _checkpoints.SingleOrDefault(cp => cp.PartitionId == partitionId)));
57+
_mockCheckpointStore
58+
.Setup(s => s.GetCheckpointAsync(
59+
_namespace,
60+
_eventHubName,
61+
_consumerGroup,
62+
It.IsAny<string>(),
63+
It.IsAny<CancellationToken>()))
64+
.Returns<string, string, string, string, CancellationToken>((ns, hub, cg, partitionId, ct) =>
65+
{
66+
if (ct.IsCancellationRequested)
67+
{
68+
throw new TaskCanceledException();
69+
}
70+
71+
var checkpoint = _checkpoints == null
72+
? null
73+
: _checkpoints.SingleOrDefault(cp => cp.PartitionId == partitionId);
74+
75+
return Task.FromResult(checkpoint);
76+
});
5777

5878
_metricsProvider = new EventHubMetricsProvider(
5979
_functionId,
@@ -196,7 +216,9 @@ public async Task CreateTriggerMetrics_HandlesExceptions()
196216
// StorageException
197217
_mockCheckpointStore
198218
.Setup(c => c.GetCheckpointAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
199-
.ThrowsAsync(new RequestFailedException(404, "Uh oh"));
219+
.ThrowsAsync(new RequestFailedException(404, _errorMessage));
220+
// Clear previous logs
221+
_loggerProvider.ClearAllLogMessages();
200222

201223
_partitions = new List<PartitionProperties>
202224
{
@@ -208,11 +230,14 @@ public async Task CreateTriggerMetrics_HandlesExceptions()
208230
Assert.AreEqual(1, metrics.PartitionCount);
209231
Assert.AreEqual(1, metrics.EventCount);
210232
Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
233+
AssertGetCheckpointAsyncErrorLogs(_partitions.First().Id, _errorMessage);
211234

212235
// Generic Exception
213236
_mockCheckpointStore
214237
.Setup(c => c.GetCheckpointAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
215-
.ThrowsAsync(new Exception("Uh oh"));
238+
.ThrowsAsync(new Exception(_errorMessage));
239+
// Clear previous logs
240+
_loggerProvider.ClearAllLogMessages();
216241

217242
_partitions = new List<PartitionProperties>
218243
{
@@ -224,10 +249,22 @@ public async Task CreateTriggerMetrics_HandlesExceptions()
224249
Assert.AreEqual(1, metrics.PartitionCount);
225250
Assert.AreEqual(1, metrics.EventCount);
226251
Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
252+
AssertGetCheckpointAsyncErrorLogs(_partitions.First().Id, _errorMessage);
227253

228254
_loggerProvider.ClearAllLogMessages();
229255
}
230256

257+
private void AssertGetCheckpointAsyncErrorLogs(string partitionId, string message)
258+
{
259+
var logs = _loggerProvider.GetAllLogMessages().ToList();
260+
Assert.That(logs.Any(l =>
261+
l.Level == LogLevel.Debug),
262+
$"Requesting cancellation of other checkpoint tasks. Error while getting checkpoint for eventhub '{_eventHubName}', partition '{partitionId}': {message}");
263+
Assert.That(logs.Any(l =>
264+
l.Level == LogLevel.Warning),
265+
$"Encountered an exception while getting checkpoints for Event Hub '{_eventHubName}' used for scaling. Error: {message}");
266+
}
267+
231268
[TestCase(false, 0, -1, -1, 0)] // Microsoft.Azure.Functions.Worker.Extensions.EventHubs < 5.0.0, auto created checkpoint, no events sent
232269
[TestCase(true, 0, -1, -1, 0)] // Microsoft.Azure.Functions.Worker.Extensions.EventHubs >= 5.0.0, no checkpoint, no events sent
233270
[TestCase(false, 0, 0, 0, 0)] // Microsoft.Azure.Functions.Worker.Extensions.EventHubs < 5.0.0, auto created checkpoint, 1 event sent. Know issue: we are not scalling out in this case.
@@ -267,5 +304,67 @@ public async Task CreateTriggerMetrics_DifferentCheckpointFormats_ReturnsExpecte
267304
var metrics = await _metricsProvider.GetMetricsAsync();
268305
Assert.AreEqual(expectedUnprocessedMessageCount, metrics.EventCount);
269306
}
307+
308+
private Task<EventProcessorCheckpoint> WaitTillCancelled(CancellationToken ct, string partition)
309+
{
310+
var tcs = new TaskCompletionSource<EventProcessorCheckpoint>(TaskCreationOptions.RunContinuationsAsynchronously);
311+
312+
if (ct.IsCancellationRequested)
313+
{
314+
_loggerProvider.CreatedLoggers.First().LogDebug($"Cancellation requested for partition {partition}");
315+
throw new TaskCanceledException();
316+
}
317+
318+
ct.Register(state => ((TaskCompletionSource<EventProcessorCheckpoint>)state).TrySetCanceled(),
319+
tcs);
320+
321+
return tcs.Task;
322+
}
323+
324+
[Test]
325+
public async Task CreateTriggerMetric_CancellationCascades_AfterFirstFailure()
326+
{
327+
_partitions = new List<PartitionProperties>
328+
{
329+
new TestPartitionProperties(partitionId: "0"),
330+
new TestPartitionProperties(partitionId: "1"),
331+
new TestPartitionProperties(partitionId: "2")
332+
};
333+
334+
_checkpoints = Array.Empty<EventProcessorCheckpoint>();
335+
336+
_loggerProvider.ClearAllLogMessages();
337+
338+
// First partition triggers token cancellation
339+
_mockCheckpointStore
340+
.Setup(s => s.GetCheckpointAsync(_namespace, _eventHubName, _consumerGroup, "0", It.IsAny<CancellationToken>()))
341+
.Returns(async (string ns, string hub, string cg, string partitionId, CancellationToken ct) =>
342+
{
343+
await Task.Delay(500);
344+
throw new Exception(_errorMessage);
345+
});
346+
347+
// Other partitions wait till cancellation is requested
348+
_mockCheckpointStore
349+
.Setup(s => s.GetCheckpointAsync(
350+
_namespace, _eventHubName, _consumerGroup,
351+
It.Is<string>(p => p == "1" || p == "2"),
352+
It.IsAny<CancellationToken>()))
353+
.Returns<string, string, string, string, CancellationToken>((ns, hub, cg, pid, ct) =>
354+
WaitTillCancelled(ct, pid));
355+
356+
var metrics = await _metricsProvider.GetMetricsAsync();
357+
358+
Assert.AreEqual(3, metrics.PartitionCount);
359+
var logs = _loggerProvider.GetAllLogMessages().ToList();
360+
361+
AssertGetCheckpointAsyncErrorLogs("0", _errorMessage);
362+
Assert.That(logs.Any(), "Cancellation requested for partition 1");
363+
Assert.That(logs.Any(), "Cancellation requested for partition 2");
364+
365+
_mockCheckpointStore.Verify(s =>
366+
s.GetCheckpointAsync(_namespace, _eventHubName, _consumerGroup, It.IsAny<string>(), It.IsAny<CancellationToken>()),
367+
Times.Exactly(3));
368+
}
270369
}
271370
}

0 commit comments

Comments
 (0)