Skip to content

Commit e86fd75

Browse files
Fixing system store (#5294)
1 parent 29851cb commit e86fd75

File tree

8 files changed

+230
-488
lines changed

8 files changed

+230
-488
lines changed

src/Connectors/KurrentDB.Connectors/Infrastructure/Eventuous/SystemEventStore.cs

Lines changed: 0 additions & 200 deletions
This file was deleted.

src/Connectors/KurrentDB.Connectors/Planes/Management/ManagementPlaneWireUp.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
using KurrentDB.Connectors.Planes.Management.Projectors;
2727
using KurrentDB.Connectors.Planes.Management.Queries;
2828
using KurrentDB.Core;
29+
using KurrentDB.Surge.Eventuous;
2930
using KurrentDB.Surge.Producers;
3031
using KurrentDB.Surge.Readers;
3132
using Microsoft.AspNetCore.Builder;
@@ -124,7 +125,7 @@ public static IServiceCollection AddConnectorsManagementPlane(this IServiceColle
124125
.ProducerId("EventuousProducer")
125126
.Create();
126127

127-
return new SystemEventStore(reader, producer);
128+
return new(reader, producer);
128129
})
129130
.AddCommandService<ConnectorsCommandApplication, ConnectorEntity>();
130131

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55

66
using Eventuous;
77
using Kurrent.Surge;
8-
using KurrentDB.Connectors.Infrastructure.Eventuous;
8+
using KurrentDB.Surge.Eventuous;
99
using KurrentDB.Surge.Testing.Fixtures;
1010
using Shouldly;
1111

12-
namespace KurrentDB.Connectors.Tests.Eventuous;
12+
namespace KurrentDB.Surge.Tests.Eventuous;
1313

1414
[Trait("Category", "Integration")]
15-
public class SystemEventStoreTests(ITestOutputHelper output, ConnectorsAssemblyFixture fixture) : ConnectorsIntegrationTests<ConnectorsAssemblyFixture>(output, fixture) {
16-
SystemEventStore NewSystemEventStore() => new SystemEventStore(Fixture.NewReader().Create(), Fixture.NewProducer().Create());
15+
public class SystemEventStoreTests(ITestOutputHelper output, SystemComponentsAssemblyFixture fixture) : SystemComponentsIntegrationTests(output, fixture) {
16+
SystemEventStore NewSystemEventStore() => new(Fixture.NewReader().Create(), Fixture.NewProducer().Create());
1717

1818
[Fact]
1919
public async Task stream_does_not_exists() {
@@ -63,7 +63,7 @@ public async Task appends_single() {
6363
var appendResult = await eventStore.AppendEvents(
6464
stream,
6565
ExpectedStreamVersion.Any,
66-
new[] { streamEvent },
66+
[streamEvent],
6767
cancellator.Token
6868
);
6969

@@ -225,7 +225,7 @@ public async Task append_with_custom_and_default_headers_are_correctly_parsed()
225225
var result = await eventStore.AppendEvents(
226226
stream,
227227
ExpectedStreamVersion.Any,
228-
new[] { eventMetadata },
228+
[eventMetadata],
229229
cancellator.Token
230230
);
231231

@@ -262,28 +262,12 @@ public async Task read_some_events_forward_from_non_existent_stream() {
262262
var stream = Fixture.NewStreamName();
263263

264264
// Act & Assert
265-
var ex = await eventStore
266-
.ReadEvents(stream, StreamReadPosition.Start, 10, cancellator.Token)
267-
.ShouldThrowAsync<StreamNotFound>();
265+
await eventStore
266+
.ReadEvents(stream, StreamReadPosition.Start, 10, cancellator.Token)
267+
.ShouldThrowAsync<StreamNotFound>();
268268
}
269269

270-
// makes no sense except to validate the argument
271-
// [Fact]
272-
// public async Task read_zero_events_forward_from_non_existent_stream() {
273-
// // Arrange
274-
// var eventStore = NewSystemEventStore();
275-
//
276-
// using var cancellator = new CancellationTokenSource(TimeSpan.FromSeconds(30));
277-
//
278-
// var stream = Fixture.NewStreamName();
279-
//
280-
// // Assert
281-
// await eventStore
282-
// .ReadEvents(stream, StreamReadPosition.Start, 0, cancellator.Token)
283-
// .ShouldThrowAsync<StreamNotFoundError>();
284-
// }
285-
286-
[Fact(Skip = "Must investigate cause it returns empty stream array instead of SteamNotFound")]
270+
[Fact]
287271
public async Task read_some_events_backwards_from_nonexistent_stream() {
288272
// Arrange
289273
var eventStore = NewSystemEventStore();
@@ -298,6 +282,30 @@ await eventStore
298282
.ShouldThrowAsync<StreamNotFound>();
299283
}
300284

285+
[Fact]
286+
public async Task read_events_forwards() {
287+
// Arrange
288+
var eventStore = NewSystemEventStore();
289+
290+
using var cancellator = new CancellationTokenSource(TimeSpan.FromSeconds(10));
291+
292+
var stream = Fixture.NewStreamName();
293+
294+
// Act
295+
var events = Fixture.CreateStreamEvents(800).ToArray();
296+
297+
var appendResult = await eventStore.AppendEvents(stream, ExpectedStreamVersion.Any, events, cancellator.Token);
298+
299+
// Assert
300+
appendResult.GlobalPosition.ShouldBeGreaterThan<ulong>(0);
301+
302+
var readResults = await eventStore.ReadEvents(stream, StreamReadPosition.Start, int.MaxValue, cancellator.Token);
303+
304+
readResults.Length.ShouldBe(events.Length);
305+
events.First().Id.ShouldBe(readResults.First().Id);
306+
events.Last().Id.ShouldBe(readResults.Last().Id);
307+
}
308+
301309
[Fact]
302310
public async Task read_events_backwards() {
303311
// Arrange
@@ -308,16 +316,16 @@ public async Task read_events_backwards() {
308316
var stream = Fixture.NewStreamName();
309317

310318
// Act
311-
var events = Fixture.CreateStreamEvents(2).ToArray();
319+
var events = Fixture.CreateStreamEvents(800).ToArray();
312320

313321
var appendResult = await eventStore.AppendEvents(stream, ExpectedStreamVersion.Any, events, cancellator.Token);
314322

315323
// Assert
316324
appendResult.GlobalPosition.ShouldBeGreaterThan<ulong>(0);
317325

318-
var readResults = await eventStore.ReadEventsBackwards(stream, new(long.MaxValue), 2, cancellator.Token);
326+
var readResults = await eventStore.ReadEventsBackwards(stream, new(long.MaxValue), int.MaxValue, cancellator.Token);
319327

320-
readResults.Length.ShouldBe(2);
328+
readResults.Length.ShouldBe(events.Length);
321329
events.First().Id.ShouldBe(readResults.Last().Id);
322330
events.Last().Id.ShouldBe(readResults.First().Id);
323331
}

0 commit comments

Comments
 (0)