-
-
Notifications
You must be signed in to change notification settings - Fork 545
Expand file tree
/
Copy pathEventStoreDBAsyncCommandBusTests.cs
More file actions
111 lines (93 loc) · 3.63 KB
/
EventStoreDBAsyncCommandBusTests.cs
File metadata and controls
111 lines (93 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
using System.Diagnostics.Metrics;
using Core;
using Core.Commands;
using Core.Events;
using Core.EventStoreDB;
using Core.EventStoreDB.Commands;
using Core.EventStoreDB.Events;
using Core.EventStoreDB.Subscriptions;
using Core.OpenTelemetry;
using Core.Testing;
using EventStore.Client;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Hosting.Internal;
using Microsoft.Extensions.Time.Testing;
using Polly;
using Xunit;
namespace EventStoreDB.Integration.Tests.Commands;
public class EventStoreDBAsyncCommandBusTests
{
private readonly EventStoreDBAsyncCommandBus martenAsyncCommandBus;
private readonly List<Guid> userIds = [];
private readonly EventListener eventListener = new();
private readonly CancellationToken ct = new CancellationTokenSource().Token;
private readonly EventStoreClient eventStoreClient;
private readonly IHostedService subscriptionToAll;
public EventStoreDBAsyncCommandBusTests()
{
var services = new ServiceCollection();
services
.AddLogging()
.AddSingleton<IHostEnvironment, HostingEnvironment>(
_ => new HostingEnvironment { EnvironmentName = Environments.Development }
)
.AddSingleton<IEventBus>(sp =>
new EventCatcher(
eventListener,
sp.GetRequiredService<EventBus>()
)
)
.AddCoreServices()
.AddEventStoreDB(new EventStoreDBConfig { ConnectionString = "esdb://localhost:2113?tls=false" })
.AddEventStoreDBSubscriptionToAll<EventBusBatchHandler>("AsyncCommandBusTest")
.AddCommandHandler<AddUser, AddUserCommandHandler>(
_ => new AddUserCommandHandler(userIds)
)
.AddScoped(sp => new InMemoryCommandBus(
sp,
new CommandHandlerActivity(new CommandHandlerMetrics(new DummyMeterFactory(), new FakeTimeProvider())),
new ActivityScope(),
Policy.NoOpAsync()
))
.AddSingleton(eventListener)
.AddCommandForwarder();
var serviceProvider = services.BuildServiceProvider();
eventStoreClient = serviceProvider.GetRequiredService<EventStoreClient>();
subscriptionToAll = serviceProvider.GetRequiredService<IHostedService>();
martenAsyncCommandBus = new EventStoreDBAsyncCommandBus(eventStoreClient);
}
[Fact]
public async Task CommandIsStoredInEventStoreDBAndForwardedToCommandHandler()
{
// Given
var userId = Guid.CreateVersion7();
var command = new AddUser(userId);
// When
await subscriptionToAll.StartAsync(ct);
await martenAsyncCommandBus.Schedule(command, ct);
// Then
await eventListener.WaitForProcessing(command, ct);
var commands = await eventStoreClient.ReadStream(EventStoreDBAsyncCommandBus.CommandsStreamId, ct);
commands.Should().HaveCountGreaterThanOrEqualTo(1);
commands.OfType<AddUser>()
.Count(e => e.UserId == userId)
.Should().Be(1);
userIds.Should().Contain(userId);
}
}
public record AddUser(Guid UserId, string? Sth = null);
internal class AddUserCommandHandler(List<Guid> userIds): ICommandHandler<AddUser>
{
public Task Handle(AddUser command, CancellationToken ct)
{
userIds.Add(command.UserId);
return Task.CompletedTask;
}
}
internal sealed class DummyMeterFactory: IMeterFactory
{
public Meter Create(MeterOptions options) => new(options);
public void Dispose() { }
}