diff --git a/src/ReactiveDomain.Foundation.Tests/Domain/AggregateRootEntityTests.cs b/src/ReactiveDomain.Foundation.Tests/Domain/AggregateRootEntityTests.cs index 6bdf9d45..f0143291 100644 --- a/src/ReactiveDomain.Foundation.Tests/Domain/AggregateRootEntityTests.cs +++ b/src/ReactiveDomain.Foundation.Tests/Domain/AggregateRootEntityTests.cs @@ -1,9 +1,12 @@ -using System; +using ReactiveDomain.Foundation.Tests.Domain; +using System; using Xunit; // ReSharper disable once CheckNamespace -namespace ReactiveDomain.Domain.Tests { - namespace AggregateRootEntityTests { +namespace ReactiveDomain.Domain.Tests +{ + namespace AggregateRootEntityTests + { public class AnyInstance { // IEventSource behavior diff --git a/src/ReactiveDomain.Foundation.Tests/Domain/CapturingRoute.cs b/src/ReactiveDomain.Foundation.Tests/Domain/CapturingRoute.cs index 9983e887..03aa4822 100644 --- a/src/ReactiveDomain.Foundation.Tests/Domain/CapturingRoute.cs +++ b/src/ReactiveDomain.Foundation.Tests/Domain/CapturingRoute.cs @@ -1,4 +1,4 @@ -namespace ReactiveDomain.Domain.Tests +namespace ReactiveDomain.Foundation.Tests.Domain { public class CapturingRoute { diff --git a/src/ReactiveDomain.Foundation.Tests/Domain/CorrelatedAggregate.cs b/src/ReactiveDomain.Foundation.Tests/Domain/CorrelatedAggregate.cs index 043d7259..9e5e646f 100644 --- a/src/ReactiveDomain.Foundation.Tests/Domain/CorrelatedAggregate.cs +++ b/src/ReactiveDomain.Foundation.Tests/Domain/CorrelatedAggregate.cs @@ -3,7 +3,7 @@ using ReactiveDomain.Messaging; // ReSharper disable once CheckNamespace -namespace ReactiveDomain.Foundation.Tests +namespace ReactiveDomain.Foundation.Tests.Domain { public class CorrelatedAggregate : AggregateRoot { diff --git a/src/ReactiveDomain.Foundation.Tests/Domain/with_correlated_aggregate.cs b/src/ReactiveDomain.Foundation.Tests/Domain/with_correlated_aggregate.cs index be466987..d1d6cc10 100644 --- a/src/ReactiveDomain.Foundation.Tests/Domain/with_correlated_aggregate.cs +++ b/src/ReactiveDomain.Foundation.Tests/Domain/with_correlated_aggregate.cs @@ -3,7 +3,7 @@ using Xunit; // ReSharper disable once CheckNamespace -namespace ReactiveDomain.Foundation.Tests +namespace ReactiveDomain.Foundation.Tests.Domain { // ReSharper disable once InconsistentNaming public class with_correlated_aggregate @@ -28,7 +28,7 @@ public void can_raise_correlated_events_from_constructor_source() }, e =>{ if( e is CorrelatedAggregate.CorrelatedEvent @event){ - Assert.Equal(_command.CorrelationId, @event.CorrelationId); + Assert.Equal(_command.CorrelationId, @event.CorrelationId); } else{ throw new Exception("wrong event"); diff --git a/src/ReactiveDomain.Foundation.Tests/Domain/with_correlated_repository.cs b/src/ReactiveDomain.Foundation.Tests/Domain/with_correlated_repository.cs index 57e66897..f926cf7d 100644 --- a/src/ReactiveDomain.Foundation.Tests/Domain/with_correlated_repository.cs +++ b/src/ReactiveDomain.Foundation.Tests/Domain/with_correlated_repository.cs @@ -4,7 +4,7 @@ using Xunit; // ReSharper disable once CheckNamespace -namespace ReactiveDomain.Foundation.Tests +namespace ReactiveDomain.Foundation.Tests.Domain { [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] // ReSharper disable once InconsistentNaming @@ -83,36 +83,40 @@ public void correlation_and_causation_are_injected() var raisedEvents = ((IEventSource)agg).TakeEvents(); Assert.Collection(raisedEvents, - @event => { + @event => + { var created = @event as CorrelatedAggregate.Created; Assert.NotNull(created); - Assert.Equal(command1.MsgId,created.CausationId); - Assert.Equal(command1.CorrelationId,created.CorrelationId); + Assert.Equal(command1.MsgId, created.CausationId); + Assert.Equal(command1.CorrelationId, created.CorrelationId); }, - @event => { + @event => + { var corrEvent = @event as CorrelatedAggregate.CorrelatedEvent; Assert.NotNull(corrEvent); - Assert.Equal(command1.MsgId,corrEvent.CausationId); - Assert.Equal(command1.CorrelationId,corrEvent.CorrelationId); + Assert.Equal(command1.MsgId, corrEvent.CausationId); + Assert.Equal(command1.CorrelationId, corrEvent.CorrelationId); }, - @event => { + @event => + { var corrEvent = @event as CorrelatedAggregate.CorrelatedEvent; Assert.NotNull(corrEvent); - Assert.Equal(command1.MsgId,corrEvent.CausationId); - Assert.Equal(command1.CorrelationId,corrEvent.CorrelationId); + Assert.Equal(command1.MsgId, corrEvent.CausationId); + Assert.Equal(command1.CorrelationId, corrEvent.CorrelationId); }); var command2 = MessageBuilder.New(() => new TestCommands.Command2()); - ((ICorrelatedEventSource) agg).Source = command2; - + ((ICorrelatedEventSource)agg).Source = command2; + agg.RaiseCorrelatedEvent(); raisedEvents = ((IEventSource)agg).TakeEvents(); Assert.Collection(raisedEvents, - @event => { - var corrEvent = @event as CorrelatedAggregate.CorrelatedEvent; - Assert.NotNull(corrEvent); - Assert.Equal(command2.MsgId,corrEvent.CausationId); - Assert.Equal(command2.CorrelationId,corrEvent.CorrelationId); - }); + @event => + { + var corrEvent = @event as CorrelatedAggregate.CorrelatedEvent; + Assert.NotNull(corrEvent); + Assert.Equal(command2.MsgId, corrEvent.CausationId); + Assert.Equal(command2.CorrelationId, corrEvent.CorrelationId); + }); } } } diff --git a/src/ReactiveDomain.Foundation.Tests/Domain/with_repository.cs b/src/ReactiveDomain.Foundation.Tests/Domain/with_repository.cs index 680f06bb..8a4e9fa4 100644 --- a/src/ReactiveDomain.Foundation.Tests/Domain/with_repository.cs +++ b/src/ReactiveDomain.Foundation.Tests/Domain/with_repository.cs @@ -7,7 +7,7 @@ using Xunit; // ReSharper disable once CheckNamespace -namespace ReactiveDomain.Foundation.Tests +namespace ReactiveDomain.Foundation.Tests.Domain { [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] // ReSharper disable once InconsistentNaming @@ -25,20 +25,20 @@ public with_repository(StreamStoreConnectionFixture fixture) _streamNameBuilder, fixture.Connection, _serializer, - GetPolicyUserId); + GetPolicyUserId); } private Guid GetPolicyUserId() { return _policyUserId; } private Guid _policyUserId = Guid.NewGuid(); [Fact] public void policy_user_id_is_saved() { - + var id = Guid.NewGuid(); - var agg = new TestAggregate(id); + var agg = new TestAggregate(id); var userId = Guid.NewGuid(); _policyUserId = userId; _repo.Save(agg); - + agg.RaiseBy(10); var nextUser = Guid.NewGuid(); @@ -52,13 +52,13 @@ public void policy_user_id_is_saved() var md = newAggregate.ReadMetadatum(); Assert.NotNull(md); Assert.Equal(userId, md.PolicyUserId); - Assert.True((DateTime.UtcNow - md.EventDateUTC) < TimeSpan.FromSeconds(5)); // just check for a fresh timestamp, not trying to test built in .Now + Assert.True(DateTime.UtcNow - md.EventDateUTC < TimeSpan.FromSeconds(5)); // just check for a fresh timestamp, not trying to test built in .Now var evt2 = slice.Events[1]; var incremented = _serializer.Deserialize(evt2) as TestAggregateMessages.Increment; var md2 = incremented.ReadMetadatum(); Assert.NotNull(md2); Assert.Equal(nextUser, md2.PolicyUserId); - Assert.True((DateTime.UtcNow - md2.EventDateUTC) < TimeSpan.FromSeconds(5)); // just check for a fresh timestamp, not trying to test built in .Now + Assert.True(DateTime.UtcNow - md2.EventDateUTC < TimeSpan.FromSeconds(5)); // just check for a fresh timestamp, not trying to test built in .Now #if NETCOREAPP Assert.NotEqual(md.EventDateUTC, md2.EventDateUTC); #endif diff --git a/src/ReactiveDomain.Foundation.Tests/ReactiveDomain.Foundation.Tests.csproj b/src/ReactiveDomain.Foundation.Tests/ReactiveDomain.Foundation.Tests.csproj index 1b63f6c0..376120fe 100644 --- a/src/ReactiveDomain.Foundation.Tests/ReactiveDomain.Foundation.Tests.csproj +++ b/src/ReactiveDomain.Foundation.Tests/ReactiveDomain.Foundation.Tests.csproj @@ -15,9 +15,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/Common/CommonHelpers.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/Common/CommonHelpers.cs similarity index 90% rename from src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/Common/CommonHelpers.cs rename to src/ReactiveDomain.Foundation.Tests/StreamListenerTests/Common/CommonHelpers.cs index 8b9a0607..0c7ff002 100644 --- a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/Common/CommonHelpers.cs +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/Common/CommonHelpers.cs @@ -2,7 +2,7 @@ using ReactiveDomain.Messaging; using ReactiveDomain.Testing; -namespace ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests.Common +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests.Common { internal static class CommonHelpers { diff --git a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/Common/TestStreamNameBuilder.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/Common/TestStreamNameBuilder.cs similarity index 89% rename from src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/Common/TestStreamNameBuilder.cs rename to src/ReactiveDomain.Foundation.Tests/StreamListenerTests/Common/TestStreamNameBuilder.cs index 073ee7e9..f6b96c18 100644 --- a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/Common/TestStreamNameBuilder.cs +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/Common/TestStreamNameBuilder.cs @@ -1,13 +1,13 @@ using System; -namespace ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests.Common +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests.Common { /// /// Generate stream names for testing. /// /// /// todo: - /// The use of the extra Guid doesn't match the generation by the + /// The use of the extra Guid doesn't match the generation by the /// and the checks for existing category and event streams fail. /// tests the stream name generation. So switching to the PrefixedCamelCaseStreamNameBuilder. Leaving these here so Chris can /// agree and remove or correct me. diff --git a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_aggregate_and_guid.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_aggregate_and_guid.cs similarity index 75% rename from src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_aggregate_and_guid.cs rename to src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_aggregate_and_guid.cs index db12d2dc..524cb3b1 100644 --- a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_aggregate_and_guid.cs +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_aggregate_and_guid.cs @@ -3,10 +3,11 @@ using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using ReactiveDomain.Testing; -using ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests.Common; using Xunit; +using ReactiveDomain.Foundation.Tests.StreamListenerTests.Common; -namespace ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests { +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests +{ // ReSharper disable once InconsistentNaming [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] public class when_using_listener_start_with_aggregate_and_guid @@ -14,20 +15,21 @@ public class when_using_listener_start_with_aggregate_and_guid private readonly IStreamNameBuilder _streamNameBuilder = new PrefixedCamelCaseStreamNameBuilder(); private readonly IEventSerializer _eventSerializer = new JsonMessageSerializer(); - public when_using_listener_start_with_aggregate_and_guid(StreamStoreConnectionFixture fixture){ + public when_using_listener_start_with_aggregate_and_guid(StreamStoreConnectionFixture fixture) + { var conn = fixture.Connection; conn.Connect(); - // Build an origin stream to which the the events are appended. We are testing this stream directly + // Build an origin stream to which the events are appended. We are testing this stream directly var originalStreamGuid = Guid.NewGuid(); var originStreamName = _streamNameBuilder.GenerateForAggregate(typeof(AggregateIdTestAggregate), originalStreamGuid); - + // Drop an event into the stream testAggregate-guid var result = conn.AppendToStream( - originStreamName, - ExpectedVersion.NoStream, - null, + originStreamName, + ExpectedVersion.NoStream, + null, _eventSerializer.Serialize(new TestEvent())); Assert.True(result.NextExpectedVersion == 0); @@ -49,16 +51,18 @@ public when_using_listener_start_with_aggregate_and_guid(StreamStoreConnectionFi [Fact] public void can_get_events_from_aggregate_id_stream() { - AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1,3000); + AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1, 3000); } - private void Handle(IMessage message) { + private void Handle(IMessage message) + { dynamic evt = message; - if (evt is TestEvent) { + if (evt is TestEvent) + { Interlocked.Increment(ref _testEventCount); } } - public class AggregateIdTestAggregate:EventDrivenStateMachine{} + public class AggregateIdTestAggregate : EventDrivenStateMachine { } } } diff --git a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_category_aggregate.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_category_aggregate.cs similarity index 76% rename from src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_category_aggregate.cs rename to src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_category_aggregate.cs index 6b055696..192486d9 100644 --- a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_category_aggregate.cs +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_category_aggregate.cs @@ -3,10 +3,11 @@ using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using ReactiveDomain.Testing; -using ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests.Common; using Xunit; +using ReactiveDomain.Foundation.Tests.StreamListenerTests.Common; -namespace ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests { +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests +{ // ReSharper disable once InconsistentNaming [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] public class when_using_listener_start_with_category_aggregate @@ -24,38 +25,41 @@ public when_using_listener_start_with_category_aggregate(StreamStoreConnectionFi // Drop an event into the stream testAggregate-guid var result = conn.AppendToStream( - aggStream, - ExpectedVersion.NoStream, - null, + aggStream, + ExpectedVersion.NoStream, + null, _eventSerializer.Serialize(new TestEvent())); Assert.True(result.NextExpectedVersion == 0); //wait for the projection to be written. CommonHelpers.WaitForStream(conn, categoryStream); - + // Now set up the projection listener, and start it. var listener = new QueuedStreamListener( "category listener", conn, streamNameBuilder, new JsonMessageSerializer()); - listener.EventStream.Subscribe(new AdHocHandler(Handle)); + listener.EventStream.Subscribe(new AdHocHandler(Handle)); listener.Start(); } private long _testEventCount; [Fact] - public void can_get_events_from_category_projection() { + public void can_get_events_from_category_projection() + { AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1, 4000); } - private void Handle(IMessage message) { + private void Handle(IMessage message) + { dynamic evt = message; - if (evt is TestEvent) { + if (evt is TestEvent) + { Interlocked.Increment(ref _testEventCount); } } - public class AggregateCategoryTestAggregate:EventDrivenStateMachine{} + public class AggregateCategoryTestAggregate : EventDrivenStateMachine { } } } diff --git a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_not_synched.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_not_synched.cs similarity index 88% rename from src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_not_synched.cs rename to src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_not_synched.cs index 1154a6f2..e93502c3 100644 --- a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_not_synched.cs +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_not_synched.cs @@ -3,10 +3,11 @@ using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using ReactiveDomain.Testing; -using ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests.Common; using Xunit; +using ReactiveDomain.Foundation.Tests.StreamListenerTests.Common; -namespace ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests { +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests +{ // ReSharper disable once InconsistentNaming [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] public class when_using_listener_start_with_custom_stream_not_synched @@ -23,11 +24,11 @@ public when_using_listener_start_with_custom_stream_not_synched(StreamStoreConne var originStreamName = $"testStream-{Guid.NewGuid():N}"; // Generate event and save it to the custom stream - + var result = fixture.Connection.AppendToStream( - originStreamName, - ExpectedVersion.NoStream, - null, + originStreamName, + ExpectedVersion.NoStream, + null, _eventSerializer.Serialize(new TestEvent())); Assert.True(result.NextExpectedVersion == 0); @@ -46,7 +47,7 @@ public when_using_listener_start_with_custom_stream_not_synched(StreamStoreConne private long _testEventCount; [Fact] - public void can_get_events_from_custom_stream() + public void can_get_events_from_custom_stream() { AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1, 3000); } diff --git a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_synched.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_synched.cs similarity index 87% rename from src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_synched.cs rename to src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_synched.cs index cdd85ede..a355fdb3 100644 --- a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_synched.cs +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_synched.cs @@ -3,10 +3,11 @@ using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using ReactiveDomain.Testing; -using ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests.Common; using Xunit; +using ReactiveDomain.Foundation.Tests.StreamListenerTests.Common; -namespace ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests { +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests +{ // ReSharper disable once InconsistentNaming [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] public class when_using_listener_start_with_custom_stream_synched @@ -22,11 +23,11 @@ public when_using_listener_start_with_custom_stream_synched(StreamStoreConnectio // Build an origin stream from strings to which the the events are appended var originStreamName = $"testStream-{Guid.NewGuid():N}"; - + var result = fixture.Connection.AppendToStream( - originStreamName, - ExpectedVersion.NoStream, - null, + originStreamName, + ExpectedVersion.NoStream, + null, _eventSerializer.Serialize(new TestEvent())); Assert.True(result.NextExpectedVersion == 0); @@ -45,7 +46,7 @@ public when_using_listener_start_with_custom_stream_synched(StreamStoreConnectio private long _testEventCount; [Fact] - public void can_get_events_from_custom_stream() + public void can_get_events_from_custom_stream() { AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1, 3000); } diff --git a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_synched_bus.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_synched_bus.cs similarity index 86% rename from src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_synched_bus.cs rename to src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_synched_bus.cs index 137b52f4..02beeb61 100644 --- a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_custom_stream_synched_bus.cs +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_custom_stream_synched_bus.cs @@ -3,11 +3,12 @@ using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using ReactiveDomain.Testing; -using ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests.Common; using Xunit; using ReactiveDomain.Util; +using ReactiveDomain.Foundation.Tests.StreamListenerTests.Common; -namespace ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests { +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests +{ // ReSharper disable once InconsistentNaming [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] public class when_using_listener_start_with_custom_stream_synched_bus @@ -27,9 +28,9 @@ public when_using_listener_start_with_custom_stream_synched_bus(StreamStoreConne var originStreamName = $"testStream-{Guid.NewGuid():N}"; var result = fixture.Connection.AppendToStream( - originStreamName, - ExpectedVersion.NoStream, - null, + originStreamName, + ExpectedVersion.NoStream, + null, _eventSerializer.Serialize(new TestEvent())); Assert.True(result.NextExpectedVersion == 0); @@ -47,19 +48,20 @@ public when_using_listener_start_with_custom_stream_synched_bus(StreamStoreConne listener.Start(originStreamName); } - + private long _testEventCount; private long _gotLiveStarted; - - private void LiveProcessingStarted(Unit _) { + + private void LiveProcessingStarted(Unit _) + { Interlocked.Increment(ref _gotLiveStarted); - } + } [Fact] - public void can_get_events_from_custom_stream() + public void can_get_events_from_custom_stream() { AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1, 3000); - AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _gotLiveStarted) == 1); + AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _gotLiveStarted) == 1); } private void Handle(IMessage message) diff --git a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_event_type.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_event_type.cs similarity index 76% rename from src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_event_type.cs rename to src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_event_type.cs index 4d0f0fbf..531cb32f 100644 --- a/src/ReactiveDomain.Foundation.Tests/SynchronizedStreamListenerTests/when_using_listener_start_with_event_type.cs +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_event_type.cs @@ -3,34 +3,36 @@ using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using ReactiveDomain.Testing; -using ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests.Common; using Xunit; +using ReactiveDomain.Foundation.Tests.StreamListenerTests.Common; -namespace ReactiveDomain.Foundation.Tests.SynchronizedStreamListenerTests { +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests +{ // ReSharper disable once InconsistentNaming [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] public class when_using_listener_start_with_event_type { private readonly IEventSerializer _eventSerializer = new JsonMessageSerializer(); - public when_using_listener_start_with_event_type(StreamStoreConnectionFixture fixture) { + public when_using_listener_start_with_event_type(StreamStoreConnectionFixture fixture) + { var streamNameBuilder = new PrefixedCamelCaseStreamNameBuilder(); var conn = fixture.Connection; conn.Connect(); - var originalAggregateStream = + var originalAggregateStream = streamNameBuilder.GenerateForAggregate( - typeof(TestAggregate), + typeof(TestAggregate), Guid.NewGuid()); var evt = new EventProjectionTestEvent(); //drop the event into the stream var result = conn.AppendToStream( - originalAggregateStream, - ExpectedVersion.NoStream, - null, + originalAggregateStream, + ExpectedVersion.NoStream, + null, _eventSerializer.Serialize(evt)); Assert.True(result.NextExpectedVersion == 0); - + //wait for the projection to be written CommonHelpers.WaitForStream(conn, streamNameBuilder.GenerateForEventType(nameof(EventProjectionTestEvent))); @@ -47,13 +49,16 @@ public when_using_listener_start_with_event_type(StreamStoreConnectionFixture fi private long _testEventCount; [Fact] - public void can_get_events_from_event_type_stream() { - AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1, 4000,"Event Not Received"); + public void can_get_events_from_event_type_stream() + { + AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1, 4000, "Event Not Received"); } - private void Handle(IMessage message) { + private void Handle(IMessage message) + { dynamic evt = message; - if (evt is EventProjectionTestEvent) { + if (evt is EventProjectionTestEvent) + { Interlocked.Increment(ref _testEventCount); } diff --git a/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_future_stream.cs b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_future_stream.cs new file mode 100644 index 00000000..e92a24c4 --- /dev/null +++ b/src/ReactiveDomain.Foundation.Tests/StreamListenerTests/when_using_listener_start_with_future_stream.cs @@ -0,0 +1,88 @@ +using System; +using System.Threading; +using ReactiveDomain.Messaging; +using ReactiveDomain.Messaging.Bus; +using ReactiveDomain.Testing; +using Xunit; +using ReactiveDomain.Foundation.Tests.StreamListenerTests.Common; + +namespace ReactiveDomain.Foundation.Tests.StreamListenerTests +{ + // ReSharper disable once InconsistentNaming + [Collection(nameof(EmbeddedStreamStoreConnectionCollection))] + public class when_using_listener_start_with_future_stream + { + private readonly IStreamNameBuilder _streamNameBuilder = new PrefixedCamelCaseStreamNameBuilder(); + private readonly IEventSerializer _eventSerializer = new JsonMessageSerializer(); + private readonly string _originStreamName = $"testStream-{Guid.NewGuid():N}"; + private readonly IStreamStoreConnection _connection; + public when_using_listener_start_with_future_stream(StreamStoreConnectionFixture fixture) + { + _connection = fixture.Connection; + + + StreamListener listener = new QueuedStreamListener( + _originStreamName, + fixture.Connection, + new PrefixedCamelCaseStreamNameBuilder(), + _eventSerializer); + listener.EventStream.Subscribe(new AdHocHandler(Handle)); + listener.Start(_originStreamName); + } + + private long _testEventCount; + + [Fact] + public void validation_throws_on_mising_stream() + { + var missingStream = _originStreamName + "missing"; + StreamListener listener = new QueuedStreamListener( + missingStream, + _connection, + new PrefixedCamelCaseStreamNameBuilder(), + _eventSerializer); + listener.EventStream.Subscribe(new AdHocHandler(Handle)); + Assert.Throws(() => listener.Start(missingStream, validateStream: true)); + listener.Dispose(); + } + [Fact] + public void can_subscribe_to_missing_stream() + { + var missingStream = _originStreamName + "missing"; + StreamListener listener = new QueuedStreamListener( + missingStream, + _connection, + new PrefixedCamelCaseStreamNameBuilder(), + _eventSerializer); + listener.EventStream.Subscribe(new AdHocHandler(Handle)); + listener.Start(missingStream, validateStream: false); + Assert.True(listener.IsLive); + listener.Dispose(); + } + + [Fact] + public void can_get_events_from_future_stream() + { + _connection.Connect(); + var result = _connection.AppendToStream( + _originStreamName, + ExpectedVersion.NoStream, + null, + _eventSerializer.Serialize(new TestEvent())); + Assert.True(result.NextExpectedVersion == 0); + + // Wait for the stream to be written + CommonHelpers.WaitForStream(_connection, _originStreamName); + AssertEx.IsOrBecomesTrue(() => Interlocked.Read(ref _testEventCount) == 1, 3000); + } + + private void Handle(IMessage message) + { + dynamic evt = message; + if (evt is TestEvent) + { + Interlocked.Increment(ref _testEventCount); + } + } + } +} diff --git a/src/ReactiveDomain.Foundation/IListener.cs b/src/ReactiveDomain.Foundation/IListener.cs index 9de59bb3..fceb078e 100644 --- a/src/ReactiveDomain.Foundation/IListener.cs +++ b/src/ReactiveDomain.Foundation/IListener.cs @@ -16,8 +16,9 @@ public interface IListener : IDisposable /// the exact stream name /// start point to listen from /// wait for the is live event from the catchup subscription before returning + /// ensure the stream exists on start /// Cancellation token to cancel waiting if blockUntilLive is true - void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default); + void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default); /// /// Starts listening on an aggregate root stream /// @@ -25,15 +26,17 @@ public interface IListener : IDisposable /// the aggregate id /// start point to listen from /// wait for the is live event from the catchup subscription before returning + /// ensure the stream exists on start /// Cancellation token to cancel waiting if blockUntilLive is true - void Start(Guid id, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource; + void Start(Guid id, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource; /// /// Starts listening on a Aggregate Category Stream /// /// The type of aggregate /// start point to listen from /// wait for the is live event from the catchup subscription before returning + /// ensure the stream exists on start /// Cancellation token to cancel waiting if blockUntilLive is true - void Start(long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource; + void Start(long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource; } } diff --git a/src/ReactiveDomain.Foundation/StreamStore/QueuedStreamListener.cs b/src/ReactiveDomain.Foundation/StreamStore/QueuedStreamListener.cs index 64fa06c9..94b84241 100644 --- a/src/ReactiveDomain.Foundation/StreamStore/QueuedStreamListener.cs +++ b/src/ReactiveDomain.Foundation/StreamStore/QueuedStreamListener.cs @@ -53,12 +53,12 @@ public void Handle(IMessage @event) } } - public override void Start(string streamName, long? checkpoint = null, bool waitUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken)) + public override void Start(string streamName, long? checkpoint = null, bool waitUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default(CancellationToken)) { _isLive.Reset(); SyncQueue?.Start(); - base.Start(streamName, checkpoint, waitUntilLive, cancelWaitToken); + base.Start(streamName, checkpoint, waitUntilLive, validateStream, cancelWaitToken); Interlocked.Exchange(ref _pendingCount, SyncQueue.MessageCount); if (Interlocked.Read(ref _pendingCount) <= 0 || SyncQueue.Idle) diff --git a/src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs b/src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs index a5e4d82e..e2650b89 100644 --- a/src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs +++ b/src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics.SymbolStore; using System.Linq; using System.Threading; using ReactiveDomain.Messaging; @@ -105,7 +106,7 @@ public List> GetCheckpoint() /// The event to start with. /// If true, blocks returning from this method until the listener has caught up. /// Cancellation token to cancel waiting if blockUntilLive is true. - public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) + public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) { if (_getReader != null) { @@ -115,7 +116,7 @@ public void Start(string stream, long? checkpoint = null, bool blockUntilLive = checkpoint = reader.Position ?? checkpoint; } } - AddNewListener().Start(stream, checkpoint, blockUntilLive, cancelWaitToken); + AddNewListener().Start(stream, checkpoint, blockUntilLive, validateStream, cancelWaitToken); } /// @@ -126,7 +127,7 @@ public void Start(string stream, long? checkpoint = null, bool blockUntilLive = /// The event to start with. /// If true, blocks returning from this method until the listener has caught up. /// Cancellation token to cancel waiting if blockUntilLive is true. - public void Start(Guid id, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource + public void Start(Guid id, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource { if (_getReader != null) { @@ -136,7 +137,7 @@ public void Start(Guid id, long? checkpoint = null, bool blockUntilL checkpoint = reader.Position; } } - AddNewListener().Start(id, checkpoint, blockUntilLive, cancelWaitToken); + AddNewListener().Start(id, checkpoint, blockUntilLive, validateStream, cancelWaitToken); } /// @@ -146,7 +147,7 @@ public void Start(Guid id, long? checkpoint = null, bool blockUntilL /// The event to start with. /// If true, blocks returning from this method until the listener has caught up. /// Cancellation token to cancel waiting if blockUntilLive is true. - public void Start(long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource + public void Start(long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource { if (_getReader != null) { @@ -156,7 +157,7 @@ public void Start(long? checkpoint = null, bool blockUntilLive = fal checkpoint = reader.Position; } } - AddNewListener().Start(checkpoint, blockUntilLive, cancelWaitToken); + AddNewListener().Start(checkpoint, blockUntilLive, validateStream, cancelWaitToken); } /// diff --git a/src/ReactiveDomain.Foundation/StreamStore/SnapshotReadModel.cs b/src/ReactiveDomain.Foundation/StreamStore/SnapshotReadModel.cs index 3fc326cb..7635bc68 100644 --- a/src/ReactiveDomain.Foundation/StreamStore/SnapshotReadModel.cs +++ b/src/ReactiveDomain.Foundation/StreamStore/SnapshotReadModel.cs @@ -17,6 +17,7 @@ protected virtual void Restore( ReadModelState snapshot, bool startListeners = true, bool block = false, + bool validateStreams = false, CancellationToken cancelWaitToken = default(CancellationToken)) { if(StartingState != null) { throw new InvalidOperationException("ReadModel has already been restored."); @@ -27,7 +28,7 @@ protected virtual void Restore( if (!startListeners || StartingState.Checkpoints == null) return; foreach (var stream in StartingState.Checkpoints) { - Start(stream.Item1,stream.Item2,block, cancelWaitToken); + Start(stream.Item1,stream.Item2, block, validateStreams, cancelWaitToken); } } diff --git a/src/ReactiveDomain.Foundation/StreamStore/StreamListener.cs b/src/ReactiveDomain.Foundation/StreamStore/StreamListener.cs index 307efc53..b611826d 100644 --- a/src/ReactiveDomain.Foundation/StreamStore/StreamListener.cs +++ b/src/ReactiveDomain.Foundation/StreamStore/StreamListener.cs @@ -79,6 +79,7 @@ public void Start( Type tMessage, long? checkpoint = null, bool blockUntilLive = false, + bool validateStream = false, CancellationToken cancelWaitToken = default(CancellationToken)) { if (!tMessage.IsSubclassOf(typeof(Event))) @@ -89,6 +90,7 @@ public void Start( _streamNameBuilder.GenerateForEventType(tMessage.Name), checkpoint, blockUntilLive, + validateStream, cancelWaitToken); } /// @@ -102,6 +104,7 @@ public void Start( public void Start( long? checkpoint = null, bool blockUntilLive = false, + bool validateStream = false, CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource { @@ -109,6 +112,7 @@ public void Start( _streamNameBuilder.GenerateForCategory(typeof(TAggregate)), checkpoint, blockUntilLive, + validateStream, cancelWaitToken); } @@ -125,12 +129,14 @@ public void Start( Guid id, long? checkpoint = null, bool blockUntilLive = false, + bool validateStream = false, CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource { Start( _streamNameBuilder.GenerateForAggregate(typeof(TAggregate), id), checkpoint, blockUntilLive, + validateStream, cancelWaitToken); } @@ -146,6 +152,7 @@ public virtual void Start( string streamName, long? checkpoint = null, bool blockUntilLive = false, + bool validateStream = false, CancellationToken cancelWaitToken = default(CancellationToken)) { _liveLock.Reset(); @@ -153,7 +160,7 @@ public virtual void Start( { if (_started) throw new InvalidOperationException("Listener already started."); - if (!ValidateStreamName(streamName)) + if (validateStream && !ValidateStreamName(streamName)) throw new ArgumentException("Stream not found.", streamName); StreamName = streamName; _subscription = @@ -204,8 +211,17 @@ public IDisposable SubscribeToStreamFrom( public bool ValidateStreamName(string streamName) { - var isValid = _streamStoreConnection.ReadStreamForward(streamName, 0, 1) != null; - return isValid; + try + { + var result = _streamStoreConnection.ReadStreamForward(streamName, 0, 1); + + return result.GetType() == typeof(StreamEventsSlice); + } + catch (Exception) + { + return false; + } + } protected virtual void GotEvent(RecordedEvent recordedEvent) { diff --git a/src/ReactiveDomain.IdentityStorage.Tests/ReactiveDomain.IdentityStorage.Tests.csproj b/src/ReactiveDomain.IdentityStorage.Tests/ReactiveDomain.IdentityStorage.Tests.csproj index 89ece5a3..f584fea7 100644 --- a/src/ReactiveDomain.IdentityStorage.Tests/ReactiveDomain.IdentityStorage.Tests.csproj +++ b/src/ReactiveDomain.IdentityStorage.Tests/ReactiveDomain.IdentityStorage.Tests.csproj @@ -6,9 +6,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/ReactiveDomain.Messaging.Tests/ReactiveDomain.Messaging.Tests.csproj b/src/ReactiveDomain.Messaging.Tests/ReactiveDomain.Messaging.Tests.csproj index 2927d72e..13d7a15f 100644 --- a/src/ReactiveDomain.Messaging.Tests/ReactiveDomain.Messaging.Tests.csproj +++ b/src/ReactiveDomain.Messaging.Tests/ReactiveDomain.Messaging.Tests.csproj @@ -10,9 +10,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/ReactiveDomain.Policy.Tests/ReactiveDomain.Policy.Tests.csproj b/src/ReactiveDomain.Policy.Tests/ReactiveDomain.Policy.Tests.csproj index b44ae16c..38d4c103 100644 --- a/src/ReactiveDomain.Policy.Tests/ReactiveDomain.Policy.Tests.csproj +++ b/src/ReactiveDomain.Policy.Tests/ReactiveDomain.Policy.Tests.csproj @@ -10,9 +10,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/ReactiveDomain.PolicyStorage.Tests/ReactiveDomain.PolicyStorage.Tests.csproj b/src/ReactiveDomain.PolicyStorage.Tests/ReactiveDomain.PolicyStorage.Tests.csproj index 23587c2b..fe9e312f 100644 --- a/src/ReactiveDomain.PolicyStorage.Tests/ReactiveDomain.PolicyStorage.Tests.csproj +++ b/src/ReactiveDomain.PolicyStorage.Tests/ReactiveDomain.PolicyStorage.Tests.csproj @@ -15,9 +15,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/ReactiveDomain.Testing/ReactiveDomain.Testing.csproj b/src/ReactiveDomain.Testing/ReactiveDomain.Testing.csproj index c53a8898..8ce6fd94 100644 --- a/src/ReactiveDomain.Testing/ReactiveDomain.Testing.csproj +++ b/src/ReactiveDomain.Testing/ReactiveDomain.Testing.csproj @@ -3,6 +3,7 @@ $(LibTargetFrameworks) True + true @@ -10,9 +11,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/ReactiveDomain.Testing/Specifications/NullListener.cs b/src/ReactiveDomain.Testing/Specifications/NullListener.cs index 772b6649..ad073e0a 100644 --- a/src/ReactiveDomain.Testing/Specifications/NullListener.cs +++ b/src/ReactiveDomain.Testing/Specifications/NullListener.cs @@ -11,6 +11,7 @@ namespace ReactiveDomain.Testing /// public class NullListener : IListener, ISubscriber { +#pragma warning disable CS1066 // The default value specified will have no effect because it applies to a member that is used in contexts that do not allow optional arguments private string _stream; private long _position; @@ -34,7 +35,7 @@ public class NullListener : IListener, ISubscriber /// /// This parameter is ignored. public NullListener(string name = "") - { + { } /// @@ -52,7 +53,7 @@ public void Dispose() /// The position at which the listener should start. /// This parameter is ignored. /// This parameter is ignored. - public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) + public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) { _stream = stream; _position = checkpoint ?? 0; @@ -87,7 +88,7 @@ bool ISubscriber.HasSubscriberFor(bool includeDerived) /// This parameter is ignored. /// This parameter is ignored. /// This parameter is ignored. - void IListener.Start(Guid id, long? checkpoint, bool blockUntilLive, CancellationToken cancelWaitToken) + void IListener.Start(Guid id, long? checkpoint, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) { _stream = new PrefixedCamelCaseStreamNameBuilder().GenerateForAggregate(typeof(TAggregate), id); } @@ -99,7 +100,7 @@ void IListener.Start(Guid id, long? checkpoint, bool blockUntilLive, /// This parameter is ignored. /// This parameter is ignored. /// This parameter is ignored. - void IListener.Start(long? checkpoint, bool blockUntilLive, CancellationToken cancelWaitToken) + void IListener.Start(long? checkpoint, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) { _stream = nameof(TAggregate); } @@ -124,5 +125,6 @@ IDisposable ISubscriber.Subscribe(IHandle handler, bool includeDerived) void ISubscriber.Unsubscribe(IHandle handler) { } +#pragma warning restore CS1066 // The default value specified will have no effect because it applies to a member that is used in contexts that do not allow optional arguments } } diff --git a/src/ReactiveDomain.Transport.Tests/ReactiveDomain.Transport.Tests.csproj b/src/ReactiveDomain.Transport.Tests/ReactiveDomain.Transport.Tests.csproj index b667d7f6..a76cbf88 100644 --- a/src/ReactiveDomain.Transport.Tests/ReactiveDomain.Transport.Tests.csproj +++ b/src/ReactiveDomain.Transport.Tests/ReactiveDomain.Transport.Tests.csproj @@ -5,9 +5,9 @@ true - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive