Skip to content

Commit eb8e9a2

Browse files
authored
[Event Hubs] Detect offset placeholder value (Azure#49319)
* [Event Hubs] Detect offset placeholder value The focus of these changes is to ensure that the placeholder value used to represent missing offsets in v5.11.3 - v5.11.6 is properly detected when reading a checkpoint and not used as a valid offset when reading from a partition. * Update sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
1 parent fb71f4c commit eb8e9a2

File tree

4 files changed

+62
-8
lines changed

4 files changed

+62
-8
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobCheckpointStoreInternal.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ namespace Azure.Messaging.EventHubs.Primitives
2525
///
2626
internal partial class BlobCheckpointStoreInternal : CheckpointStore
2727
{
28+
/// <summary>The text used as a placeholder when no offset metadata exists in some older checkpoint formats.</summary>
29+
internal const string NoOffsetPlaceholderText = "no offset";
30+
2831
#pragma warning disable CA1802 // Use a constant field
2932
/// <summary>A message to use when throwing exception when checkpoint container or blob does not exists.</summary>
3033
private static readonly string BlobsResourceDoesNotExist = "The Azure Storage Blobs container or blob used by the Event Processor Client does not exist.";
@@ -524,7 +527,7 @@ private EventProcessorCheckpoint CreateCheckpoint(string fullyQualifiedNamespace
524527
var sequenceNumber = default(long?);
525528
var clientIdentifier = default(string);
526529

527-
if (metadata.TryGetValue(BlobMetadataKey.Offset, out var offsetStr) && !string.IsNullOrEmpty(offsetStr))
530+
if (metadata.TryGetValue(BlobMetadataKey.Offset, out var offsetStr) && !IsPlaceholderOffset(offsetStr))
528531
{
529532
offset = offsetStr;
530533
startingPosition = EventPosition.FromOffset(offsetStr, false);
@@ -631,6 +634,20 @@ private async Task<EventProcessorCheckpoint> CreateLegacyCheckpoint(string fully
631634
};
632635
}
633636

637+
/// <summary>
638+
/// Determines whether an offset value is a placeholder slug that was written
639+
/// for legacy compatibility purposes.
640+
/// </summary>
641+
///
642+
/// <param name="offset">The offset to test.</param>
643+
///
644+
/// <returns><c>true</c> if <paramref name="offset"/> is the placeholder offset; otherwise, <c>false</c>.</returns>
645+
///
646+
/// <seealso href="https://github.com/Azure/azure-sdk-for-net/blob/Azure.Messaging.EventHubs_5.11.6/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobCheckpointStoreInternal.cs#L446" />
647+
///
648+
private bool IsPlaceholderOffset(string offset) =>
649+
string.IsNullOrEmpty(offset) || string.Equals(offset, NoOffsetPlaceholderText, StringComparison.OrdinalIgnoreCase);
650+
634651
/// <summary>
635652
/// Attempts to read a legacy checkpoint JSON format and extract an offset and a sequence number.
636653
/// </summary>

sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreInternalTests.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,47 @@ public async Task GetCheckpointLogsStartAndComplete()
358358
mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partition, null, It.IsAny<DateTimeOffset>()));
359359
}
360360

361+
/// <summary>
362+
/// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly.
363+
/// </summary>
364+
///
365+
[Test]
366+
[TestCase(null)]
367+
[TestCase("")]
368+
[TestCase(BlobCheckpointStoreInternal.NoOffsetPlaceholderText)]
369+
public async Task GetCheckpointIgnoresPlaceholderOffsets(string placeholderOffset)
370+
{
371+
var expectedSequence = 133;
372+
var expectedStartingPosition = EventPosition.FromSequenceNumber(expectedSequence, false);
373+
var partition = Guid.NewGuid().ToString();
374+
375+
var blobList = new List<BlobItem>
376+
{
377+
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partition}",
378+
false,
379+
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
380+
"snapshot",
381+
new Dictionary<string, string>
382+
{
383+
{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()},
384+
{BlobMetadataKey.Offset, placeholderOffset},
385+
{BlobMetadataKey.SequenceNumber, expectedSequence.ToString()}
386+
})
387+
};
388+
389+
var target = new BlobCheckpointStoreInternal(new MockBlobContainerClient() { Blobs = blobList });
390+
var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partition, CancellationToken.None);
391+
392+
Assert.That(checkpoint, Is.Not.Null, "A checkpoint should have been returned.");
393+
Assert.That(checkpoint.StartingPosition, Is.EqualTo(expectedStartingPosition));
394+
395+
Assert.That(checkpoint, Is.InstanceOf<BlobCheckpointStoreInternal.BlobStorageCheckpoint>(), "Checkpoint instance was not the expected type.");
396+
397+
var blobCheckpoint = (BlobCheckpointStoreInternal.BlobStorageCheckpoint)checkpoint;
398+
Assert.That(blobCheckpoint.Offset, Is.Null, $"The offset should not have been populated, as it was not set.");
399+
Assert.That(expectedSequence, Is.EqualTo(blobCheckpoint.SequenceNumber), "Checkpoint sequence number did not have the correct value.");
400+
}
401+
361402
/// <summary>
362403
/// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly.
363404
/// </summary>

sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
# Release History
22

3-
## 5.13.0-beta.1 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
3+
## 5.12.1 (2025-04-09)
84

95
### Bugs Fixed
106

11-
### Other Changes
7+
- Fixed a bug which caused the placeholder value used to represent missing offsets in v5.11.3 - v5.11.6 to not be properly detected and incorrectly used as a valid offset when reading from a partition.
128

139
## 5.12.0 (2025-04-08)
1410

sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<Description>Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This client library allows for both publishing and consuming events using Azure Event Hubs. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
4-
<Version>5.13.0-beta.1</Version>
4+
<Version>5.12.1</Version>
55
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually. -->
66
<ApiCompatVersion>5.12.0</ApiCompatVersion>
77
<PackageTags>Azure;Event Hubs;EventHubs;.NET;AMQP;IoT;$(PackageCommonTags)</PackageTags>

0 commit comments

Comments
 (0)