-
-
Notifications
You must be signed in to change notification settings - Fork 545
Expand file tree
/
Copy pathConfig.cs
More file actions
113 lines (95 loc) · 4.06 KB
/
Config.cs
File metadata and controls
113 lines (95 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
using Core.BackgroundWorkers;
using Core.Configuration;
using Core.Events;
using Core.EventStoreDB.Subscriptions;
using Core.EventStoreDB.Subscriptions.Batch;
using Core.EventStoreDB.Subscriptions.Checkpoints;
using Core.OpenTelemetry;
using EventStore.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Core.EventStoreDB;
public class EventStoreDBConfig
{
public string ConnectionString { get; set; } = null!;
}
public record EventStoreDBOptions(
bool UseInternalCheckpointing = true
);
public static class EventStoreDBConfigExtensions
{
private const string DefaultConfigKey = "EventStore";
public static IServiceCollection AddEventStoreDB(
this IServiceCollection services,
IConfiguration config,
EventStoreDBOptions? options = null
) =>
services.AddEventStoreDB(
config.GetRequiredConfig<EventStoreDBConfig>(DefaultConfigKey),
options
);
public static IServiceCollection AddEventStoreDB(
this IServiceCollection services,
EventStoreDBConfig eventStoreDBConfig,
EventStoreDBOptions? options = null
)
{
services
.AddSingleton(EventTypeMapper.Instance)
.AddSingleton(new EventStoreClient(EventStoreClientSettings.Create(eventStoreDBConfig.ConnectionString)))
.AddScoped<EventsBatchProcessor, EventsBatchProcessor>()
.AddScoped<IEventsBatchCheckpointer, EventsBatchCheckpointer>()
.AddSingleton<ISubscriptionStoreSetup, NulloSubscriptionStoreSetup>();
if (options?.UseInternalCheckpointing != false)
{
services
.AddTransient<ISubscriptionCheckpointRepository, EventStoreDBSubscriptionCheckpointRepository>();
}
return services.AddHostedService(serviceProvider =>
{
var logger =
serviceProvider.GetRequiredService<ILogger<BackgroundWorker>>();
var coordinator = serviceProvider.GetRequiredService<EventStoreDBSubscriptionsToAllCoordinator>();
TelemetryPropagator.UseDefaultCompositeTextMapPropagator();
return new BackgroundWorker<EventStoreDBSubscriptionsToAllCoordinator>(
coordinator,
logger,
(c, ct) => c.SubscribeToAll(ct)
);
}
);
}
public static IServiceCollection AddEventStoreDBSubscriptionToAll<THandler>(
this IServiceCollection services,
string subscriptionId
) where THandler : IEventBatchHandler =>
services.AddEventStoreDBSubscriptionToAll(
new EventStoreDBSubscriptionToAllOptions { SubscriptionId = subscriptionId },
sp => [sp.GetRequiredService<THandler>()]
);
public static IServiceCollection AddEventStoreDBSubscriptionToAll<THandler>(
this IServiceCollection services,
EventStoreDBSubscriptionToAllOptions subscriptionOptions
) where THandler : IEventBatchHandler =>
services.AddEventStoreDBSubscriptionToAll(subscriptionOptions, sp => [sp.GetRequiredService<THandler>()]);
public static IServiceCollection AddEventStoreDBSubscriptionToAll(
this IServiceCollection services,
EventStoreDBSubscriptionToAllOptions subscriptionOptions,
Func<IServiceProvider, IEventBatchHandler[]> handlers
)
{
services.AddSingleton<EventStoreDBSubscriptionsToAllCoordinator>();
return services.AddKeyedSingleton<EventStoreDBSubscriptionToAll>(
subscriptionOptions.SubscriptionId,
(sp, _) =>
{
var subscription = new EventStoreDBSubscriptionToAll(
sp.GetRequiredService<EventStoreClient>(),
sp.GetRequiredService<IServiceScopeFactory>(),
sp.GetRequiredService<ILogger<EventStoreDBSubscriptionToAll>>()
) { Options = subscriptionOptions, GetHandlers = handlers };
return subscription;
});
}
}