Skip to content

Commit 5e7b964

Browse files
authored
Merge pull request #8 from Cabazure/hotfix/checkpoint
Ensure EventHubProcessor updates partition checkpoints
2 parents 47e626e + 0b48aa3 commit 5e7b964

File tree

2 files changed

+44
-14
lines changed

2 files changed

+44
-14
lines changed

src/Cabazure.Messaging.EventHub/Internal/EventHubBatchHandler.cs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public interface IEventHubBatchHandler<TMessage, TProcessor>
99
{
1010
public TProcessor Processor { get; }
1111

12-
Task ProcessBatchAsync(
12+
Task<EventData?> ProcessBatchAsync(
1313
IEnumerable<EventData> events,
1414
EventProcessorPartition partition,
1515
CancellationToken cancellationToken);
@@ -29,29 +29,47 @@ public class EventHubBatchHandler<TMessage, TProcessor>(
2929
{
3030
public TProcessor Processor => processor;
3131

32-
public async Task ProcessBatchAsync(
32+
public async Task<EventData?> ProcessBatchAsync(
3333
IEnumerable<EventData> events,
3434
EventProcessorPartition partition,
3535
CancellationToken cancellationToken)
3636
{
37+
EventData? lastEvent = null;
3738
foreach (var evt in events)
3839
{
39-
if (!filters.TrueForAll(f => f.Invoke(evt.Properties)))
40+
lastEvent = evt;
41+
42+
try
4043
{
41-
continue;
42-
}
44+
if (!filters.TrueForAll(f => f.Invoke(evt.Properties)))
45+
{
46+
continue;
47+
}
4348

44-
var message = evt.EventBody
45-
.ToObjectFromJson<TMessage>(serializerOptions);
49+
var message = evt.EventBody
50+
.ToObjectFromJson<TMessage>(serializerOptions);
4651

47-
var metadata = EventHubMetadata
48-
.Create(evt, partition.PartitionId);
52+
if (message != null)
53+
{
54+
var metadata = EventHubMetadata
55+
.Create(evt, partition.PartitionId);
4956

50-
await processor.ProcessAsync(
51-
message!,
52-
metadata,
53-
cancellationToken);
57+
await processor
58+
.ProcessAsync(
59+
message!,
60+
metadata,
61+
cancellationToken)
62+
.ConfigureAwait(false);
63+
}
64+
}
65+
catch (Exception ex)
66+
{
67+
await ProcessErrorAsync(ex, cancellationToken)
68+
.ConfigureAwait(false);
69+
}
5470
}
71+
72+
return lastEvent;
5573
}
5674

5775
public async Task ProcessErrorAsync(

src/Cabazure.Messaging.EventHub/Internal/EventHubProcessor.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Azure.Core;
22
using Azure.Messaging.EventHubs;
33
using Azure.Messaging.EventHubs.Primitives;
4+
using Azure.Messaging.EventHubs.Processor;
45

56
namespace Cabazure.Messaging.EventHub.Internal;
67

@@ -53,13 +54,24 @@ protected override async Task OnProcessingEventBatchAsync(
5354
IEnumerable<EventData> events,
5455
EventProcessorPartition partition,
5556
CancellationToken cancellationToken)
56-
=> await batchHandler
57+
{
58+
var lastEvent = await batchHandler
5759
.ProcessBatchAsync(
5860
events,
5961
partition,
6062
cancellationToken)
6163
.ConfigureAwait(false);
6264

65+
if (lastEvent != null)
66+
{
67+
await UpdateCheckpointAsync(
68+
partition.PartitionId,
69+
CheckpointPosition.FromEvent(lastEvent),
70+
cancellationToken)
71+
.ConfigureAwait(false);
72+
}
73+
}
74+
6375
protected override async Task OnProcessingErrorAsync(
6476
Exception exception,
6577
EventProcessorPartition partition,

0 commit comments

Comments
 (0)