Skip to content

Commit a5f2ba2

Browse files
authored
[Event Hubs] Checkpoint Store Abstraction (Azure#27386)
* [Event Hubs] Checkpoint Store Abstraction The focus of these changes is to add a `CheckpointStore` Abstraction and convert the Blob Storage provider to implement it. The implementation remains `internal` to minimize churn, but will be extended with a public version in forthcoming set of changes.
1 parent 0230af2 commit a5f2ba2

File tree

39 files changed

+1234
-2228
lines changed

39 files changed

+1234
-2228
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Experimental/src/Azure.Messaging.EventHubs.Experimental.csproj

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@
1212
</ItemGroup>
1313

1414
<ItemGroup>
15-
<PackageReference Include="Azure.Messaging.EventHubs" />
15+
<!--
16+
TEMP: Package reference is needed until the CheckpointStore is included in a release.
17+
18+
<PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.7.0-beta.3" /> Remove override when the v5.7.0 line is released for GA
19+
-->
20+
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
21+
<!-- END TEMP -->
22+
1623
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
1724
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
1825
<PackageReference Include="System.Reflection.TypeExtensions" />

sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
2626
protected override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
2727
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
2828
public override int GetHashCode() { throw null; }
29-
protected override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint>> ListCheckpointsAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
3029
protected override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
3130
protected override System.Threading.Tasks.Task OnInitializingPartitionAsync(Azure.Messaging.EventHubs.Primitives.EventProcessorPartition partition, System.Threading.CancellationToken cancellationToken) { throw null; }
3231
protected override System.Threading.Tasks.Task OnPartitionProcessingStoppedAsync(Azure.Messaging.EventHubs.Primitives.EventProcessorPartition partition, Azure.Messaging.EventHubs.Processor.ProcessingStoppedReason reason, System.Threading.CancellationToken cancellationToken) { throw null; }
@@ -38,6 +37,7 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
3837
public override System.Threading.Tasks.Task StopProcessingAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
3938
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
4039
public override string ToString() { throw null; }
40+
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
4141
}
4242
public partial class EventProcessorClientOptions
4343
{

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
-->
2222
<!-- END TEMP -->
2323

24-
<PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.7.0-beta.3" /><!-- Remove override when the v5.7.0 line is released for GA -->
24+
<!--
25+
TEMP: Package reference is needed until the CheckpointStore is included in a release.
26+
27+
<PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.7.0-beta.3" /> Remove override when the v5.7.0 line is released for GA
28+
-->
29+
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
30+
<!-- END TEMP -->
31+
2532
<PackageReference Include="Azure.Storage.Blobs" />
2633
<PackageReference Include="Microsoft.Azure.Amqp" />
2734
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobsCheckpointStore.Diagnostics.cs renamed to sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobCheckpointStoreInternal.Diagnostics.cs

Lines changed: 5 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@
44
using System;
55
using Azure.Messaging.EventHubs.Processor.Diagnostics;
66

7-
namespace Azure.Messaging.EventHubs.Processor
7+
namespace Azure.Messaging.EventHubs.Primitives
88
{
99
/// <summary>
1010
/// A storage blob service that keeps track of checkpoints and ownership.
1111
/// </summary>
1212
///
13-
internal sealed partial class BlobsCheckpointStore
13+
internal sealed partial class BlobCheckpointStoreInternal
1414
{
1515
/// <summary>
16-
/// Initializes the <see cref="BlobsCheckpointStore"/> type.
16+
/// Initializes the <see cref="BlobCheckpointStoreInternal"/> type.
1717
/// </summary>
1818
#pragma warning disable CA1810 // Initialize static fields inline
19-
static BlobsCheckpointStore()
19+
static BlobCheckpointStoreInternal()
2020
{
2121
BlobsResourceDoesNotExist = Resources.BlobsResourceDoesNotExist;
2222
}
@@ -71,36 +71,6 @@ partial void ListOwnershipStart(string fullyQualifiedNamespace,
7171
string consumerGroup) =>
7272
Logger.ListOwnershipStart(fullyQualifiedNamespace, eventHubName, consumerGroup);
7373

74-
/// <summary>
75-
/// Indicates that an attempt to retrieve a list of checkpoints has completed.
76-
/// </summary>
77-
///
78-
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
79-
/// <param name="eventHubName">The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.</param>
80-
/// <param name="consumerGroup">The name of the consumer group the checkpoints are associated with.</param>
81-
/// <param name="checkpointCount">The amount of checkpoints received from the storage service.</param>
82-
///
83-
partial void ListCheckpointsComplete(string fullyQualifiedNamespace,
84-
string eventHubName,
85-
string consumerGroup,
86-
int checkpointCount) =>
87-
Logger.ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpointCount);
88-
89-
/// <summary>
90-
/// Indicates that an unhandled exception was encountered while retrieving a list of checkpoints.
91-
/// </summary>
92-
///
93-
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
94-
/// <param name="eventHubName">The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.</param>
95-
/// <param name="consumerGroup">The name of the consumer group the ownership are associated with.</param>
96-
/// <param name="exception">The exception that occurred.</param>
97-
///
98-
partial void ListCheckpointsError(string fullyQualifiedNamespace,
99-
string eventHubName,
100-
string consumerGroup,
101-
Exception exception) =>
102-
Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message);
103-
10474
/// <summary>
10575
/// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints.
10676
/// </summary>
@@ -116,19 +86,6 @@ partial void InvalidCheckpointFound(string partitionId,
11686
string consumerGroup) =>
11787
Logger.InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup);
11888

119-
/// <summary>
120-
/// Indicates that an attempt to retrieve a list of checkpoints has started.
121-
/// </summary>
122-
///
123-
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
124-
/// <param name="eventHubName">The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.</param>
125-
/// <param name="consumerGroup">The name of the consumer group the checkpoints are associated with.</param>
126-
///
127-
partial void ListCheckpointsStart(string fullyQualifiedNamespace,
128-
string eventHubName,
129-
string consumerGroup) =>
130-
Logger.ListCheckpointsStart(fullyQualifiedNamespace, eventHubName, consumerGroup);
131-
13289
/// <summary>
13390
/// Indicates that an unhandled exception was encountered while updating a checkpoint.
13491
/// </summary>
@@ -266,7 +223,7 @@ partial void ClaimOwnershipStart(string partitionId,
266223
Logger.ClaimOwnershipStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier);
267224

268225
/// <summary>
269-
/// Indicates that a <see cref="BlobsCheckpointStore" /> was created.
226+
/// Indicates that a <see cref="BlobCheckpointStoreInternal" /> was created.
270227
/// </summary>
271228
///
272229
/// <param name="typeName">The type name for the checkpoint store.</param>

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs

Lines changed: 2 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Diagnostics.Tracing;
66
using System.Runtime.CompilerServices;
77
using Azure.Core.Diagnostics;
8+
using Azure.Messaging.EventHubs.Primitives;
89

910
namespace Azure.Messaging.EventHubs.Processor.Diagnostics
1011
{
@@ -43,7 +44,7 @@ protected BlobEventStoreEventSource() : base(EventSourceName)
4344
}
4445

4546
/// <summary>
46-
/// Indicates that a <see cref="BlobsCheckpointStore" /> was created.
47+
/// Indicates that a <see cref="BlobCheckpointStoreInternal" /> was created.
4748
/// </summary>
4849
///
4950
/// <param name="typeName">The type name for the checkpoint store.</param>
@@ -241,67 +242,6 @@ public virtual void OwnershipClaimed(string partitionId,
241242
}
242243
}
243244

244-
/// <summary>
245-
/// Indicates that an attempt to retrieve a list of checkpoints has started.
246-
/// </summary>
247-
///
248-
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
249-
/// <param name="eventHubName">The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.</param>
250-
/// <param name="consumerGroup">The name of the consumer group the checkpoints are associated with.</param>
251-
///
252-
[Event(29, Level = EventLevel.Verbose, Message = "Starting to list checkpoints for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'.")]
253-
public virtual void ListCheckpointsStart(string fullyQualifiedNamespace,
254-
string eventHubName,
255-
string consumerGroup)
256-
{
257-
if (IsEnabled())
258-
{
259-
WriteEvent(29, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty);
260-
}
261-
}
262-
263-
/// <summary>
264-
/// Indicates that an attempt to retrieve a list of checkpoints has completed.
265-
/// </summary>
266-
///
267-
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
268-
/// <param name="eventHubName">The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.</param>
269-
/// <param name="consumerGroup">The name of the consumer group the checkpoints are associated with.</param>
270-
/// <param name="checkpointCount">The amount of checkpoints received from the storage service.</param>
271-
///
272-
[Event(30, Level = EventLevel.Verbose, Message = "Completed listing checkpoints for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'. There were '{3}' checkpoints found.")]
273-
public virtual void ListCheckpointsComplete(string fullyQualifiedNamespace,
274-
string eventHubName,
275-
string consumerGroup,
276-
int checkpointCount)
277-
{
278-
if (IsEnabled())
279-
{
280-
WriteEvent(30, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, checkpointCount);
281-
}
282-
}
283-
284-
/// <summary>
285-
/// Indicates that an unhandled exception was encountered while retrieving a list of checkpoints.
286-
/// </summary>
287-
///
288-
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
289-
/// <param name="eventHubName">The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.</param>
290-
/// <param name="consumerGroup">The name of the consumer group the ownership are associated with.</param>
291-
/// <param name="errorMessage">The message for the exception that occurred.</param>
292-
///
293-
[Event(31, Level = EventLevel.Error, Message = "An exception occurred when listing checkpoints for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; ErrorMessage: '{3}'.")]
294-
public virtual void ListCheckpointsError(string fullyQualifiedNamespace,
295-
string eventHubName,
296-
string consumerGroup,
297-
string errorMessage)
298-
{
299-
if (IsEnabled())
300-
{
301-
WriteEvent(31, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty);
302-
}
303-
}
304-
305245
/// <summary>
306246
/// Indicates that an attempt to create/update a checkpoint has started.
307247
/// </summary>

0 commit comments

Comments
 (0)