Skip to content

Commit b429f79

Browse files
authored
[Event Hubs] Set GeoDR on management link (Azure#48204)
The focus of these changes is to set the desired capability for GeoDR on the management link, so that requests for Event Hub and partition metadata can take it into account for offset formatting. Also included are tweaks to the tests for `EventPosition.Latest` to reduce flakininess due to timing issues during nightly runs.
1 parent f36d192 commit b429f79

File tree

3 files changed

+128
-30
lines changed

3 files changed

+128
-30
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,9 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
603603
var linkSettings = new AmqpLinkSettings { OperationTimeout = operationTimeout };
604604
linkSettings.AddProperty(AmqpProperty.Timeout, (uint)linkTimeout.TotalMilliseconds);
605605

606+
linkSettings.DesiredCapabilities ??= new Multiple<AmqpSymbol>();
607+
linkSettings.DesiredCapabilities.Add(AmqpProperty.GeoReplication);
608+
606609
link = new RequestResponseAmqpLink(AmqpManagement.LinkType, session, AmqpManagement.Address, linkSettings.Properties);
607610

608611
// Track the link before returning it, so that it can be managed with the scope.

sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs

Lines changed: 93 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -821,16 +821,44 @@ public async Task ConsumerCanReadFromLatest()
821821

822822
// Give the receiver a moment to ensure that it is established and then send events for it to read.
823823

824-
await Task.Delay(250);
825-
await SendEventsAsync(scope.EventHubName, EventGenerator.CreateEvents(50), new CreateBatchOptions { PartitionId = partition }, cancellationSource.Token);
824+
var sendTask = Task.Run(async () =>
825+
{
826+
try
827+
{
828+
while (!cancellationSource.IsCancellationRequested)
829+
{
830+
await Task.Delay(150);
831+
await SendEventsAsync(scope.EventHubName, EventGenerator.CreateEvents(5), new CreateBatchOptions { PartitionId = partition }, cancellationSource.Token);
832+
}
833+
}
834+
catch (TaskCanceledException)
835+
{
836+
// Expected
837+
}
838+
});
839+
840+
while ((eventsRead < 1) && (!cancellationSource.IsCancellationRequested))
841+
{
842+
try
843+
{
844+
await Task.Delay(250, cancellationSource.Token);
845+
}
846+
catch (TaskCanceledException)
847+
{
848+
// Expected
849+
}
850+
}
826851

827852
// Await reading of the events and validate that we were able to read at least one event.
828853

829854
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
830855
Assert.That(eventsRead, Is.GreaterThanOrEqualTo(1), "At least one event should have been read.");
831-
}
832856

833-
cancellationSource.Cancel();
857+
cancellationSource.Cancel();
858+
859+
await readTask;
860+
await sendTask;
861+
}
834862
}
835863
}
836864

@@ -2402,44 +2430,83 @@ public async Task ConsumerCanReadFromAllPartitionsStartingWithLatest()
24022430
using var cancellationSource = new CancellationTokenSource();
24032431
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
24042432

2405-
var sourceEvents = EventGenerator.CreateEvents(50).ToList();
2406-
2407-
await using (var consumer = new EventHubConsumerClient(
2433+
await using var consumer = new EventHubConsumerClient(
24082434
EventHubConsumerClient.DefaultConsumerGroupName,
24092435
EventHubsTestEnvironment.Instance.FullyQualifiedNamespace,
24102436
scope.EventHubName,
2411-
EventHubsTestEnvironment.Instance.Credential))
2412-
{
2413-
// Send a set of seed events, which should not be read.
2437+
EventHubsTestEnvironment.Instance.Credential);
24142438

2415-
var partitions = (await consumer.GetPartitionIdsAsync(cancellationSource.Token)).ToArray();
2416-
await SendEventsToAllPartitionsAsync(scope.EventHubName, EventGenerator.CreateEvents(50), partitions, cancellationSource.Token);
2439+
// Send a set of seed events to the partition, which should not be read.
24172440

2418-
// Begin reading though no events have been published. This is necessary to open the connection and
2419-
// ensure that the consumer is watching the partition.
2441+
var partitions = await consumer.GetPartitionIdsAsync(cancellationSource.Token);
2442+
await SendEventsToAllPartitionsAsync(scope.EventHubName, EventGenerator.CreateEvents(50), partitions, cancellationSource.Token);
24202443

2421-
var readTask = ReadEventsFromAllPartitionsAsync(consumer, sourceEvents.Select(evt => evt.MessageId), cancellationSource.Token, startFromEarliest: false);
2444+
// Begin reading in the background, though no events should be read until the next publish. This is necessary to open the connection and
2445+
// ensure that the receiver is watching the partition.
24222446

2423-
// Give the consumer a moment to ensure that it is established and then send events for it to read.
2447+
var eventsRead = 0;
24242448

2425-
await Task.Delay(1500);
2426-
await SendEventsToAllPartitionsAsync(scope.EventHubName, sourceEvents, partitions, cancellationSource.Token);
2449+
var readTask = Task.Run(async () =>
2450+
{
2451+
await Task.Yield();
24272452

2428-
// Read the events and validate the resulting state.
2453+
try
2454+
{
2455+
await foreach (var item in consumer.ReadEventsAsync(startReadingAtEarliestEvent: false, cancellationToken: cancellationSource.Token))
2456+
{
2457+
// If more than one event was read, no need to keep going.
24292458

2430-
var readState = await readTask;
2431-
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
2432-
Assert.That(readState.Events.Count, Is.EqualTo(sourceEvents.Count), "Only the source events should have been read.");
2459+
if (++eventsRead > 1)
2460+
{
2461+
break;
2462+
}
2463+
}
2464+
}
2465+
catch (TaskCanceledException)
2466+
{
2467+
// Expected
2468+
}
2469+
});
24332470

2434-
foreach (var sourceEvent in sourceEvents)
2471+
// Give the receiver a moment to ensure that it is established and then send events for it to read.
2472+
2473+
var sendTask = Task.Run(async () =>
2474+
{
2475+
try
24352476
{
2436-
var sourceId = sourceEvent.MessageId;
2437-
Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
2438-
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
2477+
while (!cancellationSource.IsCancellationRequested)
2478+
{
2479+
await Task.Delay(150);
2480+
await SendEventsToAllPartitionsAsync(scope.EventHubName, EventGenerator.CreateEvents(5), partitions, cancellationSource.Token);
2481+
}
2482+
}
2483+
catch (TaskCanceledException)
2484+
{
2485+
// Expected
2486+
}
2487+
});
2488+
2489+
while ((eventsRead < 1) && (!cancellationSource.IsCancellationRequested))
2490+
{
2491+
try
2492+
{
2493+
await Task.Delay(250, cancellationSource.Token);
2494+
}
2495+
catch (TaskCanceledException)
2496+
{
2497+
// Expected
24392498
}
24402499
}
24412500

2501+
// Await reading of the events and validate that we were able to read at least one event.
2502+
2503+
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
2504+
Assert.That(eventsRead, Is.GreaterThanOrEqualTo(1), "At least one event should have been read.");
2505+
24422506
cancellationSource.Cancel();
2507+
2508+
await readTask;
2509+
await sendTask;
24432510
}
24442511
}
24452512

sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverLiveTests.cs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -622,16 +622,44 @@ public async Task ReceiverCanReadFromLatest()
622622

623623
// Give the receiver a moment to ensure that it is established and then send events for it to read.
624624

625-
await Task.Delay(250);
626-
await SendEventsAsync(scope.EventHubName, EventGenerator.CreateEvents(50), new CreateBatchOptions { PartitionId = partition }, cancellationSource.Token);
625+
var sendTask = Task.Run(async () =>
626+
{
627+
try
628+
{
629+
while (!cancellationSource.IsCancellationRequested)
630+
{
631+
await Task.Delay(150);
632+
await SendEventsAsync(scope.EventHubName, EventGenerator.CreateEvents(5), new CreateBatchOptions { PartitionId = partition }, cancellationSource.Token);
633+
}
634+
}
635+
catch (TaskCanceledException)
636+
{
637+
// Expected
638+
}
639+
});
640+
641+
while ((eventsRead < 1) && (!cancellationSource.IsCancellationRequested))
642+
{
643+
try
644+
{
645+
await Task.Delay(250, cancellationSource.Token);
646+
}
647+
catch (TaskCanceledException)
648+
{
649+
// Expected
650+
}
651+
}
627652

628653
// Await reading of the events and validate that we were able to read at least one event.
629654

630655
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
631656
Assert.That(eventsRead, Is.GreaterThanOrEqualTo(1), "At least one event should have been read.");
632-
}
633657

634-
cancellationSource.Cancel();
658+
cancellationSource.Cancel();
659+
660+
await readTask;
661+
await sendTask;
662+
}
635663
}
636664
}
637665

0 commit comments

Comments
 (0)