Skip to content

Commit 0a253a1

Browse files
authored
[Event Hubs] Fixed producer semaphore release (Azure#47965)
* [Event Hubs] Fixed producer semaphore release The focus of these changes is to fox an obscure edge case in the `EventHubBufferedProducer` client where an obscure race condition when flushing/enqueuing events concurrently with disposing the producer could cause a semaphore to be released inappropriately. This error superseded the `TaskCanceledException` that should have been surfaced. Also moved to volatile reads for the total buffer count when making state-related decisions, to ensure that the most recent value is used.
1 parent d2200c6 commit 0a253a1

File tree

7 files changed

+52
-298
lines changed

7 files changed

+52
-298
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ Thank you to our developer community members who helped to make the Event Hubs c
1616

1717
- Querying runtime data and other management operations will now correctly guards against the race condition where an AMQP link is in the process of closing as the operation attempts to use it. These errors will now properly be classified as retriable as they are for producer and consumer operations.
1818

19+
- Fixed an obscure edge case in the `EventHubBufferedProducer` client where an obscure race condition when flushing/enqueuing events concurrently with disposing the producer could cause a semaphore to be released inappropriately. This error superseded the `TaskCanceledException` that should have been surfaced.
20+
1921
### Other Changes
2022

2123
- Added annotations to make the package compatible with trimming and native AOT compilation.

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public virtual bool IsClosed
196196
/// The total number of events that are currently buffered and waiting to be published, across all partitions.
197197
/// </summary>
198198
///
199-
public virtual int TotalBufferedEventCount => _totalBufferedEventCount;
199+
public virtual int TotalBufferedEventCount => Volatile.Read(ref _totalBufferedEventCount);
200200

201201
/// <summary>
202202
/// The instance of <see cref="EventHubsEventSource" /> which can be mocked for testing.
@@ -603,7 +603,7 @@ private EventHubBufferedProducerClient(string fullyQualifiedNamespace,
603603
{
604604
TokenCredential tokenCred => new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, tokenCred, options),
605605
AzureSasCredential sasCred => new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, sasCred, options),
606-
AzureNamedKeyCredential keyCred => new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, keyCred, options),
606+
AzureNamedKeyCredential keyCred => new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, keyCred, options),
607607
_ => throw new ArgumentException(Resources.UnsupportedCredential, nameof(credential))
608608
};
609609

@@ -772,13 +772,13 @@ public virtual async Task<int> EnqueueEventAsync(EventData eventData,
772772

773773
if ((!IsPublishing) || (_producerManagementTask?.IsCompleted ?? false))
774774
{
775-
try
775+
if (!_stateGuard.Wait(0, cancellationToken))
776776
{
777-
if (!_stateGuard.Wait(0, cancellationToken))
778-
{
779-
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
780-
}
777+
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
778+
}
781779

780+
try
781+
{
782782
Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient));
783783

784784
// StartPublishingAsync will verify that publishing is not already taking
@@ -935,13 +935,13 @@ public virtual async Task<int> EnqueueEventsAsync(IEnumerable<EventData> events,
935935

936936
if ((!IsPublishing) || (_producerManagementTask?.IsCompleted ?? false))
937937
{
938-
try
938+
if (!_stateGuard.Wait(0, cancellationToken))
939939
{
940-
if (!_stateGuard.Wait(0, cancellationToken))
941-
{
942-
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
943-
}
940+
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
941+
}
944942

943+
try
944+
{
945945
Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient));
946946

947947
// StartPublishingAsync will verify that publishing is not already taking
@@ -1059,6 +1059,15 @@ public virtual async Task FlushAsync(CancellationToken cancellationToken = defau
10591059

10601060
await StopPublishingAsync(cancelActiveSendOperations: false, cancellationToken).ConfigureAwait(false);
10611061
await FlushInternalAsync(cancellationToken).ConfigureAwait(false);
1062+
1063+
// There's an unlikely race condition where it is possible that an event batch was being enqueued before publishing
1064+
// stopped and was not written to the partition buffer until after the flush for that partition completed. To guard
1065+
// against this, restart publishing if any events are pending.
1066+
1067+
if (Volatile.Read(ref _totalBufferedEventCount) > 0)
1068+
{
1069+
await StartPublishingAsync(cancellationToken).ConfigureAwait(false);
1070+
}
10621071
}
10631072
finally
10641073
{
@@ -1092,15 +1101,13 @@ public virtual async Task CloseAsync(bool flush = true,
10921101

10931102
var capturedExceptions = default(List<Exception>);
10941103

1095-
try
1104+
if (!_stateGuard.Wait(0, cancellationToken))
10961105
{
1097-
if (!_stateGuard.Wait(0, cancellationToken))
1098-
{
1099-
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
1100-
}
1101-
1102-
// If we've reached this point without an exception, the guard is held.
1106+
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
1107+
}
11031108

1109+
try
1110+
{
11041111
if (_isClosed)
11051112
{
11061113
return;
@@ -1675,7 +1682,7 @@ internal virtual async Task DrainAndPublishPartitionEvents(PartitionPublishingSt
16751682
}
16761683

16771684
var partitionStateDelta = (partitionState.BufferedEventCount * -1);
1678-
var totalDeltaZero = (_totalBufferedEventCount * -1);
1685+
var totalDeltaZero = (Volatile.Read(ref _totalBufferedEventCount) * -1);
16791686

16801687
Interlocked.Add(ref _totalBufferedEventCount, Math.Max(totalDeltaZero, partitionStateDelta));
16811688
Interlocked.Exchange(ref partitionState.BufferedEventCount, 0);
@@ -1840,7 +1847,7 @@ private async Task StartPublishingAsync(CancellationToken cancellationToken)
18401847
// If there is already a task running for the background management process,
18411848
// then no further initialization is needed.
18421849

1843-
if ((IsPublishing) && (!_producerManagementTask.IsCompleted))
1850+
if ((IsPublishing) && (!_producerManagementTask.IsCompleted) || (_isClosed))
18441851
{
18451852
return;
18461853
}
@@ -1954,13 +1961,12 @@ private async Task StopPublishingAsync(bool cancelActiveSendOperations,
19541961

19551962
_producerManagementTask = null;
19561963
}
1964+
catch (OperationCanceledException ex) when (ex is not TaskCanceledException)
1965+
{
1966+
throw new TaskCanceledException(ex.Message, ex);
1967+
}
19571968
catch (Exception ex)
19581969
{
1959-
if (ex is OperationCanceledException opEx)
1960-
{
1961-
throw new TaskCanceledException(opEx.Message, opEx);
1962-
}
1963-
19641970
Logger.BufferedProducerBackgroundProcessingStopError(Identifier, EventHubName, ex.Message);
19651971
(capturedExceptions ??= new List<Exception>()).Add(ex);
19661972
}
@@ -2247,7 +2253,7 @@ private Task RunPublishingAsync(CancellationToken cancellationToken) =>
22472253
if ((!cancellationToken.IsCancellationRequested)
22482254
&& (_activePartitionStateMap.TryGetValue(partition, out var partitionState))
22492255
&& (partitionState.BufferedEventCount > 0)
2250-
&& ((partitionState.PartitionGuard.Wait(0, cancellationToken)) || (await partitionState.PartitionGuard.WaitAsync(PartitionPublishingGuardAcquireLimitMilliseconds, cancellationToken).ConfigureAwait((false)))))
2256+
&& ((partitionState.PartitionGuard.Wait(0, cancellationToken)) || (await partitionState.PartitionGuard.WaitAsync(PartitionPublishingGuardAcquireLimitMilliseconds, cancellationToken).ConfigureAwait(false))))
22512257
{
22522258
// Responsibility for releasing the guard semaphore is passed to the task.
22532259

@@ -2257,7 +2263,7 @@ private Task RunPublishingAsync(CancellationToken cancellationToken) =>
22572263
// If there are no events in the buffer, avoid a tight loop by blocking to wait for events to be enqueued
22582264
// after a small delay.
22592265

2260-
if (_totalBufferedEventCount == 0)
2266+
if (Volatile.Read(ref _totalBufferedEventCount) == 0)
22612267
{
22622268
// If completion source doesn't exist or was already set, then swap in a new completion source to be
22632269
// set when an event is enqueued. Allow the publishing loop to tick for one additional check of the
@@ -2281,10 +2287,15 @@ private Task RunPublishingAsync(CancellationToken cancellationToken) =>
22812287

22822288
var idleWatch = ValueStopwatch.StartNew();
22832289

2284-
await _eventEnqueuedCompletionSource.Task.AwaitWithCancellation(cancellationToken);
2285-
_eventEnqueuedCompletionSource = null;
2286-
2287-
Logger.BufferedProducerIdleComplete(Identifier, EventHubName, operationId, idleWatch.GetElapsedTime().TotalSeconds);
2290+
try
2291+
{
2292+
await _eventEnqueuedCompletionSource.Task.AwaitWithCancellation(cancellationToken);
2293+
_eventEnqueuedCompletionSource = null;
2294+
}
2295+
finally
2296+
{
2297+
Logger.BufferedProducerIdleComplete(Identifier, EventHubName, operationId, idleWatch.GetElapsedTime().TotalSeconds);
2298+
}
22882299
}
22892300
}
22902301
}
@@ -2332,7 +2343,7 @@ private Task RunPublishingAsync(CancellationToken cancellationToken) =>
23322343
/// set of Event Hub partitions has changed since they were last queried.
23332344
/// </summary>
23342345
///
2335-
/// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
2346+
/// v
23362347
///
23372348
/// <remarks>
23382349
/// This method will potentially modify class state, overwriting the tracked set of partitions.
@@ -2468,12 +2479,12 @@ internal sealed class PartitionPublishingState : IDisposable
24682479
/// <summary>The writer to use for enqueuing events to be published.</summary>
24692480
public ChannelWriter<EventData> PendingEventsWriter => _pendingEvents.Writer;
24702481

2471-
/// <summary>The identifier of the partition that is being published.</summary>
2472-
public readonly string PartitionId;
2473-
24742482
/// <summary>The primitive for synchronizing access for publishing to the partition.</summary>
24752483
public readonly SemaphoreSlim PartitionGuard;
24762484

2485+
/// <summary>The identifier of the partition that is being published.</summary>
2486+
public readonly string PartitionId;
2487+
24772488
/// <summary>The number of events that are currently buffered and waiting to be published for this partition.</summary>
24782489
public int BufferedEventCount;
24792490

sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,41 +1333,6 @@ public async Task ConsumerCannotReadFromInvalidConsumerGroup()
13331333
}
13341334
}
13351335

1336-
/// <summary>
1337-
/// Verifies that the <see cref="EventHubConsumerClient" /> is able to
1338-
/// connect to the Event Hubs service and perform operations.
1339-
/// </summary>
1340-
///
1341-
[Test]
1342-
public async Task ConsumerCannotReadWithInvalidProxy()
1343-
{
1344-
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
1345-
{
1346-
using var cancellationSource = new CancellationTokenSource();
1347-
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
1348-
1349-
var clientOptions = new EventHubConsumerClientOptions();
1350-
clientOptions.RetryOptions.MaximumRetries = 0;
1351-
clientOptions.RetryOptions.MaximumDelay = TimeSpan.FromMilliseconds(5);
1352-
clientOptions.RetryOptions.TryTimeout = TimeSpan.FromSeconds(45);
1353-
clientOptions.ConnectionOptions.Proxy = new WebProxy("http://1.2.3.4:9999");
1354-
clientOptions.ConnectionOptions.TransportType = EventHubsTransportType.AmqpWebSockets;
1355-
1356-
await using (var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, scope.EventHubName, EventHubsTestEnvironment.Instance.Credential))
1357-
await using (var invalidProxyConsumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, scope.EventHubName, EventHubsTestEnvironment.Instance.Credential, clientOptions))
1358-
{
1359-
var partition = (await producer.GetPartitionIdsAsync(cancellationSource.Token)).First();
1360-
1361-
// The sockets implementation in .NET Core on some platforms, such as Linux, does not trigger a specific socket exception and
1362-
// will, instead, hang indefinitely. The try timeout is intentionally set to a value smaller than the cancellation token to
1363-
// invoke a timeout exception in these cases.
1364-
1365-
Assert.That(async () => await ReadNothingAsync(invalidProxyConsumer, partition, cancellationSource.Token, iterationCount: 25), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
1366-
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
1367-
}
1368-
}
1369-
}
1370-
13711336
/// <summary>
13721337
/// Verifies that the <see cref="EventHubConsumerClient" /> is able to
13731338
/// connect to the Event Hubs service and perform operations.
@@ -2200,53 +2165,6 @@ public async Task ConsumerCannotRetrievePartitionPropertiesWithInvalidPartitionI
22002165
}
22012166
}
22022167

2203-
/// <summary>
2204-
/// Verifies that the <see cref="EventHubConsumerClient" /> is able to
2205-
/// connect to the Event Hubs service.
2206-
/// </summary>
2207-
///
2208-
[Test]
2209-
public async Task ConsumerCannotRetrieveMetadataWithInvalidProxy()
2210-
{
2211-
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
2212-
{
2213-
using var cancellationSource = new CancellationTokenSource();
2214-
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
2215-
2216-
var invalidProxyOptions = new EventHubConsumerClientOptions();
2217-
invalidProxyOptions.RetryOptions.MaximumRetries = 0;
2218-
invalidProxyOptions.RetryOptions.MaximumDelay = TimeSpan.FromMilliseconds(5);
2219-
invalidProxyOptions.RetryOptions.TryTimeout = TimeSpan.FromSeconds(45);
2220-
invalidProxyOptions.ConnectionOptions.Proxy = new WebProxy("http://1.2.3.4:9999");
2221-
invalidProxyOptions.ConnectionOptions.TransportType = EventHubsTransportType.AmqpWebSockets;
2222-
2223-
await using (var consumer = new EventHubConsumerClient(
2224-
EventHubConsumerClient.DefaultConsumerGroupName,
2225-
EventHubsTestEnvironment.Instance.FullyQualifiedNamespace,
2226-
scope.EventHubName,
2227-
EventHubsTestEnvironment.Instance.Credential))
2228-
2229-
await using (var invalidProxyConsumer = new EventHubConsumerClient(
2230-
EventHubConsumerClient.DefaultConsumerGroupName,
2231-
EventHubsTestEnvironment.Instance.FullyQualifiedNamespace,
2232-
scope.EventHubName,
2233-
EventHubsTestEnvironment.Instance.Credential,
2234-
invalidProxyOptions))
2235-
{
2236-
var partition = (await consumer.GetPartitionIdsAsync(cancellationSource.Token)).First();
2237-
2238-
// The sockets implementation in .NET Core on some platforms, such as Linux, does not trigger a specific socket exception and
2239-
// will, instead, hang indefinitely. The try timeout is intentionally set to a value smaller than the cancellation token to
2240-
// invoke a timeout exception in these cases.
2241-
2242-
Assert.That(async () => await invalidProxyConsumer.GetPartitionIdsAsync(cancellationSource.Token), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
2243-
Assert.That(async () => await invalidProxyConsumer.GetEventHubPropertiesAsync(cancellationSource.Token), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
2244-
Assert.That(async () => await invalidProxyConsumer.GetPartitionPropertiesAsync(partition, cancellationSource.Token), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
2245-
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
2246-
}
2247-
}
2248-
}
2249-
22502168
/// <summary>
22512169
/// Verifies that the <see cref="EventHubConsumerClient" /> is able to
22522170
/// connect to the Event Hubs service and perform operations.

0 commit comments

Comments
 (0)