Skip to content

Commit 4df8e84

Browse files
authored
Merge pull request #7 from Cabazure/feature/stateless
Add support for stateless processing of event hub messages
2 parents 7a8812d + 2c5bab9 commit 4df8e84

File tree

28 files changed

+667
-108
lines changed

28 files changed

+667
-108
lines changed
Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,37 @@
11
var builder = DistributedApplication.CreateBuilder(args);
22

3+
var blobs = builder
4+
.AddAzureStorage("storage")
5+
.RunAsEmulator()
6+
.AddBlobs("blobs");
37
var eventHub = builder
48
.AddAzureEventHubs("eh")
5-
.RunAsEmulator(b => b
6-
.WithHostPort(5672))
9+
.RunAsEmulator()
710
.AddHub("eventhub");
11+
eventHub.AddConsumerGroup("consumerGroup1");
12+
eventHub.AddConsumerGroup("consumerGroup2");
813

9-
var blobs = builder
10-
.AddAzureStorage("storage")
11-
.RunAsEmulator(c => c
12-
.WithBlobPort(10000)
13-
.WithQueuePort(10001)
14-
.WithTablePort(10002))
15-
.AddBlobs("blobs");
14+
for (int i = 1; i <= 4; i++)
15+
{
16+
builder.AddProject<Projects.EventHub_Processor>(
17+
name: $"eventhub-processor{i}",
18+
launchProfileName: $"processor{i}")
19+
.WithEnvironment("CONSUMER_GROUP", "consumerGroup1")
20+
.WithReference(eventHub).WaitFor(eventHub)
21+
.WithReference(blobs).WaitFor(blobs);
22+
}
1623

17-
builder.AddProject<Projects.EventHub_Producer>("eventhub-producer")
18-
.WithReference(eventHub);
24+
for (int i = 1; i <= 4; i++)
25+
{
26+
builder.AddProject<Projects.EventHub_Processor>(
27+
name: $"eventhub-processor-stateless{i}",
28+
launchProfileName: $"stateless{i}")
29+
.WithEnvironment("CONSUMER_GROUP", "consumerGroup2")
30+
.WithEnvironment("STATELESS", "true")
31+
.WithReference(eventHub).WaitFor(eventHub);
32+
}
1933

20-
builder.AddProject<Projects.EventHub_Processor>("eventhub-processor")
21-
.WithReference(eventHub)
22-
.WithReference(blobs);
34+
builder.AddProject<Projects.EventHub_Producer>("eventhub-producer")
35+
.WithReference(eventHub).WaitFor(eventHub);
2336

2437
builder.Build().Run();

samples/EventHub/EventHub.Processor/Program.cs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using Cabazure.Messaging;
2+
using Microsoft.AspNetCore.Mvc;
23

34
var builder = WebApplication.CreateBuilder(args);
45

@@ -8,31 +9,59 @@
89
.GetConnectionString("eventhub")!;
910
var blobsConnection = builder.Configuration
1011
.GetConnectionString("blobs")!;
12+
var consumerGroup = builder.Configuration
13+
.GetValue<string>("CONSUMER_GROUP")
14+
?? "$default";
15+
var stateless = builder.Configuration
16+
.GetValue<bool>("STATELESS");
1117

12-
builder.Services.AddCabazureEventHub(b => b
13-
.Configure(o => o
14-
.WithConnection(connectionString)
15-
.WithBlobStorage(blobsConnection))
16-
.AddProcessor<MyEvent, MyEventprocessor>("eventhub", "$default", b => b
17-
.WithBlobContainer("container1", createIfNotExist: true)));
18+
builder.Services.AddSingleton<MyEventProcessor>();
19+
20+
if (!stateless)
21+
{
22+
builder.Services.AddCabazureEventHub(b => b
23+
.Configure(o => o
24+
.WithConnection(connectionString)
25+
.WithBlobStorage(blobsConnection))
26+
.AddProcessor<MyEvent, MyEventProcessor>("eventhub", consumerGroup, b => b
27+
.WithBlobContainer("container1", createIfNotExist: true)));
28+
}
29+
else
30+
{
31+
builder.Services.AddCabazureEventHub(b => b
32+
.Configure(o => o
33+
.WithConnection(connectionString)
34+
.WithBlobStorage(blobsConnection))
35+
.AddStatelessProcessor<MyEvent, MyEventProcessor>("eventhub", consumerGroup));
36+
}
1837

1938
var app = builder.Build();
2039

2140
app.MapDefaultEndpoints();
2241
app.MapGet(
2342
"/",
24-
(MyEventprocessor processor) => Results.Ok(processor.ReceivedEvents));
43+
([FromServices] MyEventProcessor processor)
44+
=> processor.ReceivedEvents);
45+
app.MapGet(
46+
"/status",
47+
([FromServices] IMessageProcessorService<MyEvent> service)
48+
=> service.IsRunning);
2549

2650
app.Run();
2751

2852
sealed record MyEvent(
2953
DateTime Date,
3054
string Identifier);
3155

32-
sealed class MyEventprocessor : IMessageProcessor<MyEvent>
56+
sealed class MyEventProcessor : IMessageProcessor<MyEvent>
3357
{
3458
public List<MyEvent> ReceivedEvents { get; } = [];
3559

60+
public MyEventProcessor()
61+
{
62+
63+
}
64+
3665
public Task ProcessAsync(
3766
MyEvent message,
3867
MessageMetadata metadata,

samples/EventHub/EventHub.Processor/Properties/launchSettings.json

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,78 @@
1818
"environmentVariables": {
1919
"ASPNETCORE_ENVIRONMENT": "Development"
2020
}
21+
},
22+
"processor1": {
23+
"commandName": "Project",
24+
"dotnetRunMessages": true,
25+
"launchBrowser": true,
26+
"applicationUrl": "https://localhost:7243;http://localhost:5146",
27+
"environmentVariables": {
28+
"ASPNETCORE_ENVIRONMENT": "Development"
29+
}
30+
},
31+
"processor2": {
32+
"commandName": "Project",
33+
"dotnetRunMessages": true,
34+
"launchBrowser": true,
35+
"applicationUrl": "https://localhost:7244;http://localhost:5147",
36+
"environmentVariables": {
37+
"ASPNETCORE_ENVIRONMENT": "Development"
38+
}
39+
},
40+
"processor3": {
41+
"commandName": "Project",
42+
"dotnetRunMessages": true,
43+
"launchBrowser": true,
44+
"applicationUrl": "https://localhost:7245;http://localhost:5148",
45+
"environmentVariables": {
46+
"ASPNETCORE_ENVIRONMENT": "Development"
47+
}
48+
},
49+
"processor4": {
50+
"commandName": "Project",
51+
"dotnetRunMessages": true,
52+
"launchBrowser": true,
53+
"applicationUrl": "https://localhost:7246;http://localhost:5149",
54+
"environmentVariables": {
55+
"ASPNETCORE_ENVIRONMENT": "Development"
56+
}
57+
},
58+
"stateless1": {
59+
"commandName": "Project",
60+
"dotnetRunMessages": true,
61+
"launchBrowser": true,
62+
"applicationUrl": "https://localhost:7247;http://localhost:5150",
63+
"environmentVariables": {
64+
"ASPNETCORE_ENVIRONMENT": "Development"
65+
}
66+
},
67+
"stateless2": {
68+
"commandName": "Project",
69+
"dotnetRunMessages": true,
70+
"launchBrowser": true,
71+
"applicationUrl": "https://localhost:7248;http://localhost:5151",
72+
"environmentVariables": {
73+
"ASPNETCORE_ENVIRONMENT": "Development"
74+
}
75+
},
76+
"stateless3": {
77+
"commandName": "Project",
78+
"dotnetRunMessages": true,
79+
"launchBrowser": true,
80+
"applicationUrl": "https://localhost:7249;http://localhost:5152",
81+
"environmentVariables": {
82+
"ASPNETCORE_ENVIRONMENT": "Development"
83+
}
84+
},
85+
"stateless4": {
86+
"commandName": "Project",
87+
"dotnetRunMessages": true,
88+
"launchBrowser": true,
89+
"applicationUrl": "https://localhost:7250;http://localhost:5153",
90+
"environmentVariables": {
91+
"ASPNETCORE_ENVIRONMENT": "Development"
92+
}
2193
}
2294
}
2395
}

samples/EventHub/EventHub.Producer/Program.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using Cabazure.Messaging;
2+
using Cabazure.Messaging.EventHub;
23

34
var builder = WebApplication.CreateBuilder(args);
45

@@ -28,6 +29,10 @@
2829

2930
await publisher.PublishAsync(
3031
evt,
32+
new EventHubPublishingOptions
33+
{
34+
PartitionKey = evt.Identifier,
35+
},
3136
cancellationToken);
3237

3338
return Results.Ok(evt);

samples/EventHub/EventHub.ServiceDefaults/EventHub.ServiceDefaults.csproj

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
<ItemGroup>
1111
<FrameworkReference Include="Microsoft.AspNetCore.App" />
1212

13-
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.2.0" />
13+
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.3.0" />
1414
<PackageReference Include="Microsoft.Extensions.ServiceDiscovery" Version="9.1.0" />
15-
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.1" />
16-
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.1" />
17-
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.0" />
18-
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.0" />
19-
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.0" />
15+
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.2" />
16+
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.2" />
17+
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.1" />
18+
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.1" />
19+
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.1" />
2020
</ItemGroup>
2121

2222
</Project>

samples/ServiceBus/ServiceBus.ServiceDefaults/ServiceBus.ServiceDefaults.csproj

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
<ItemGroup>
1111
<FrameworkReference Include="Microsoft.AspNetCore.App" />
1212

13-
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.2.0" />
13+
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.3.0" />
1414
<PackageReference Include="Microsoft.Extensions.ServiceDiscovery" Version="9.1.0" />
15-
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.1" />
16-
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.1" />
17-
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.0" />
18-
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.0" />
19-
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.0" />
15+
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.2" />
16+
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.2" />
17+
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.1" />
18+
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.1" />
19+
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.1" />
2020
</ItemGroup>
2121

2222
</Project>

samples/StorageQueue/StorageQueue.ServiceDefaults/StorageQueue.ServiceDefaults.csproj

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
<ItemGroup>
1111
<FrameworkReference Include="Microsoft.AspNetCore.App" />
1212

13-
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.2.0" />
13+
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.3.0" />
1414
<PackageReference Include="Microsoft.Extensions.ServiceDiscovery" Version="9.1.0" />
15-
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.1" />
16-
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.1" />
17-
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.0" />
18-
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.0" />
19-
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.0" />
15+
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.2" />
16+
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.2" />
17+
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.1" />
18+
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.1" />
19+
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.1" />
2020
</ItemGroup>
2121

2222
</Project>

src/Cabazure.Messaging.EventHub/Cabazure.Messaging.EventHub.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
<ItemGroup>
2828
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.11.6" />
2929
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.11.6" />
30-
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.2" />
31-
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.2" />
32-
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="9.0.2" />
33-
<PackageReference Include="System.Text.Json" Version="9.0.2" />
30+
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.3" />
31+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.3" />
32+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="9.0.3" />
33+
<PackageReference Include="System.Text.Json" Version="9.0.3" />
3434
</ItemGroup>
3535

3636
<ItemGroup>

src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubBuilder.cs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public EventHubBuilder AddProcessor<TMessage, TProcessor>(
6565

6666
Services.AddLogging();
6767
Services.TryAddSingleton<IBlobStorageClientProvider, BlobStorageClientProvider>();
68-
Services.TryAddSingleton<IEventHubBatchProcessorFactory, EventHubBatchProcessorFactory>();
68+
Services.TryAddSingleton<IEventHubProcessorFactory, EventHubProcessorFactory>();
6969
Services.TryAddSingleton<TProcessor>();
7070

7171
Services.AddSingleton(s =>
@@ -81,7 +81,7 @@ public EventHubBuilder AddProcessor<TMessage, TProcessor>(
8181
processorBuilder.Filters);
8282

8383
var batchProcessor = s
84-
.GetRequiredService<IEventHubBatchProcessorFactory>()
84+
.GetRequiredService<IEventHubProcessorFactory>()
8585
.Create(
8686
batchHandler,
8787
ConnectionName,
@@ -99,4 +99,45 @@ public EventHubBuilder AddProcessor<TMessage, TProcessor>(
9999

100100
return this;
101101
}
102+
103+
public EventHubBuilder AddStatelessProcessor<TMessage, TProcessor>(
104+
string eventHubName,
105+
string consumerGroup = "$default",
106+
Action<EventHubStatelessProcessorBuilder>? builder = null)
107+
where TProcessor : class, IMessageProcessor<TMessage>
108+
{
109+
var processorBuilder = new EventHubStatelessProcessorBuilder();
110+
builder?.Invoke(processorBuilder);
111+
112+
Services.AddLogging();
113+
Services.TryAddSingleton<IEventHubConsumerClientFactory, EventHubConsumerClientFactory>();
114+
Services.TryAddSingleton<TProcessor>();
115+
116+
Services.AddSingleton(s =>
117+
{
118+
var config = s
119+
.GetRequiredService<IOptionsMonitor<CabazureEventHubOptions>>()
120+
.Get(ConnectionName);
121+
122+
var client = s
123+
.GetRequiredService<IEventHubConsumerClientFactory>()
124+
.Create(ConnectionName, eventHubName, consumerGroup);
125+
126+
var processor = new EventHubStatelessProcessor<TMessage, TProcessor>(
127+
client,
128+
s.GetRequiredService<ILogger<TProcessor>>(),
129+
s.GetRequiredService<TProcessor>(),
130+
config.SerializerOptions,
131+
processorBuilder.ReadOptions,
132+
processorBuilder.Filters);
133+
134+
return new EventHubProcessorService<TMessage, TProcessor>(processor);
135+
});
136+
Services.AddSingleton<IMessageProcessorService<TProcessor>>(s
137+
=> s.GetRequiredService<EventHubProcessorService<TMessage, TProcessor>>());
138+
Services.AddHostedService(s
139+
=> s.GetRequiredService<EventHubProcessorService<TMessage, TProcessor>>());
140+
141+
return this;
142+
}
102143
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using Azure.Messaging.EventHubs.Consumer;
2+
3+
namespace Cabazure.Messaging.EventHub.DependencyInjection;
4+
5+
public class EventHubStatelessProcessorBuilder
6+
{
7+
public List<Func<IDictionary<string, object>, bool>> Filters { get; } = [];
8+
9+
public ReadEventOptions? ReadOptions { get; private set; }
10+
11+
public EventHubStatelessProcessorBuilder WithFilter(
12+
Func<IDictionary<string, object>, bool> predicate)
13+
{
14+
Filters.Add(predicate);
15+
return this;
16+
}
17+
18+
public EventHubStatelessProcessorBuilder WithReadEventOptions(
19+
ReadEventOptions options)
20+
{
21+
ReadOptions = options;
22+
return this;
23+
}
24+
}

0 commit comments

Comments
 (0)