Skip to content

Commit 1d41111

Browse files
jsquirem-redding
andauthored
[Event Hubs] GeoDR - OffsetString (Azure#47133)
* [Event Hubs] GeoDR - OffsetString The focus of these changes is to introduce members for the string-based offset format and mark the numeric-based offset members as obsolete. This is part of the Event Hubs GeoDR feature design for the service, where replication-based offsets will be composite strings and no longer parsable to numeric values. Not included in these changes is the error handling for the new error for an invalid reader position after failover. The error details are still being finalized by the service and will be included in a future change set. * Fixing AOT warning * Fixing build error from rebase and regenerating net8.0 API listings. * Fixing comments --------- Co-authored-by: Madalyn Redding <[email protected]>
1 parent 7efe3d1 commit 1d41111

File tree

96 files changed

+1644
-881
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+1644
-881
lines changed

eng/ApiListing.exclude-attributes.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ T:System.Runtime.CompilerServices.AsyncStateMachineAttribute
33
T:System.Runtime.CompilerServices.CompilerGeneratedAttribute
44
T:System.Runtime.CompilerServices.NullableContextAttribute
55
T:System.Runtime.CompilerServices.NullableAttribute
6+
T:System.Runtime.CompilerServices.IsReadOnlyAttribute
67
T:Azure.Core.CodeGenSuppressAttribute
78
T:Azure.Core.CodeGenModelAttribute
89
T:Azure.Core.CodeGenMemberAttribute

sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.net8.0.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
3838
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
3939
public override string ToString() { throw null; }
4040
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
41+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
42+
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
4143
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
4244
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
4345
}
@@ -71,6 +73,8 @@ public BlobCheckpointStore(Azure.Storage.Blobs.BlobContainerClient blobContainer
7173
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
7274
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
7375
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
76+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
77+
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
7478
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
7579
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
7680
}

sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
3838
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
3939
public override string ToString() { throw null; }
4040
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
41+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
42+
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
4143
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
4244
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
4345
}
@@ -71,6 +73,8 @@ public BlobCheckpointStore(Azure.Storage.Blobs.BlobContainerClient blobContainer
7173
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
7274
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
7375
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
76+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
77+
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
7478
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
7579
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
7680
}

sdk/eventhub/Azure.Messaging.EventHubs.Processor/samples/Sample08_MockingClientTypes.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ EventData eventData = EventHubsModelFactory.EventData(
4949
systemProperties: new Dictionary<string, object>(), //arbitrary value
5050
partitionKey: "sample-key",
5151
sequenceNumber: 1000,
52-
offset: 1500,
52+
offsetString: "1500:1:3344.1",
5353
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));
5454

5555
// This creates a new instance of ProcessEventArgs to pass into the handler directly.
@@ -110,7 +110,7 @@ TimerCallback dispatchEvent = async _ =>
110110
systemProperties: new Dictionary<string, object>(), //arbitrary value
111111
partitionKey: "sample-key",
112112
sequenceNumber: 1000,
113-
offset: 1500,
113+
offsetString: "1500:1:1111",
114114
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));
115115

116116
ProcessEventArgs eventArgs = new(

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<PropertyGroup>
33
<Description>Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
44
<Version>5.12.0-beta.2</Version>
5-
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually.-->
5+
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually. -->
66
<ApiCompatVersion>5.11.5</ApiCompatVersion>
77
<PackageTags>Azure;Event Hubs;EventHubs;.NET;Event Processor;EventProcessor;$(PackageCommonTags)</PackageTags>
88
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
@@ -16,7 +16,9 @@
1616
</ItemGroup>
1717

1818
<ItemGroup>
19-
<PackageReference Include="Azure.Messaging.EventHubs" />
19+
<!-- TEMP TEMP -->
20+
<ProjectReference Include="../../Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj" />
21+
<!-- PackageReference Include="Azure.Messaging.EventHubs" /-->
2022
<PackageReference Include="Azure.Storage.Blobs" />
2123
<PackageReference Include="Microsoft.Azure.Amqp" />
2224
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobCheckpointStoreInternal.Diagnostics.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ partial void InvalidCheckpointFound(string partitionId,
9797
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
9898
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
9999
/// <param name="sequenceNumber">The sequence number associated with the checkpoint.</param>
100-
/// <param name="replicationSegment">The replication segment associated with the checkpoint.</param>
101100
/// <param name="offset">The offset associated with the checkpoint.</param>
102101
/// <param name="exception">The exception that occurred.</param>
103102
///
@@ -107,10 +106,9 @@ partial void UpdateCheckpointError(string partitionId,
107106
string consumerGroup,
108107
string clientIdentifier,
109108
string sequenceNumber,
110-
string replicationSegment,
111109
string offset,
112110
Exception exception) =>
113-
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, replicationSegment, offset);
111+
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, offset);
114112

115113
/// <summary>
116114
/// Indicates that an attempt to update a checkpoint has completed.
@@ -122,7 +120,6 @@ partial void UpdateCheckpointError(string partitionId,
122120
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
123121
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
124122
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
125-
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
126123
/// <param name="offset">The offset associated with this checkpoint.</param>
127124
///
128125
partial void UpdateCheckpointComplete(string partitionId,
@@ -131,9 +128,8 @@ partial void UpdateCheckpointComplete(string partitionId,
131128
string consumerGroup,
132129
string clientIdentifier,
133130
string sequenceNumber,
134-
string replicationSegment,
135131
string offset) =>
136-
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
132+
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);
137133

138134
/// <summary>
139135
/// Indicates that an attempt to create/update a checkpoint has started.
@@ -145,7 +141,6 @@ partial void UpdateCheckpointComplete(string partitionId,
145141
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
146142
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
147143
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
148-
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
149144
/// <param name="offset">The offset associated with this checkpoint.</param>
150145
///
151146
partial void UpdateCheckpointStart(string partitionId,
@@ -154,9 +149,8 @@ partial void UpdateCheckpointStart(string partitionId,
154149
string consumerGroup,
155150
string clientIdentifier,
156151
string sequenceNumber,
157-
string replicationSegment,
158152
string offset) =>
159-
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
153+
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);
160154

161155
/// <summary>
162156
/// Indicates that an attempt to retrieve claim partition ownership has completed.

0 commit comments

Comments
 (0)