Skip to content

Commit 68cbcab

Browse files
committed
support opt-in explicit stream validation in read model listeners
1 parent 9450431 commit 68cbcab

File tree

6 files changed

+42
-19
lines changed

6 files changed

+42
-19
lines changed

src/ReactiveDomain.Foundation/IListener.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,27 @@ public interface IListener : IDisposable
1616
/// <param name="stream">the exact stream name</param>
1717
/// <param name="checkpoint">start point to listen from</param>
1818
/// <param name="blockUntilLive">wait for the is live event from the catchup subscription before returning</param>
19+
/// <param name="validateStream">ensure the stream exists on start</param>
1920
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true</param>
20-
void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default);
21+
void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default);
2122
/// <summary>
2223
/// Starts listening on an aggregate root stream
2324
/// </summary>
2425
/// <typeparam name="TAggregate">The type of aggregate</typeparam>
2526
/// <param name="id">the aggregate id</param>
2627
/// <param name="checkpoint">start point to listen from</param>
2728
/// <param name="blockUntilLive">wait for the is live event from the catchup subscription before returning</param>
29+
/// <param name="validateStream">ensure the stream exists on start</param>
2830
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true</param>
29-
void Start<TAggregate>(Guid id, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource;
31+
void Start<TAggregate>(Guid id, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource;
3032
/// <summary>
3133
/// Starts listening on a Aggregate Category Stream
3234
/// </summary>
3335
/// <typeparam name="TAggregate">The type of aggregate</typeparam>
3436
/// <param name="checkpoint">start point to listen from</param>
3537
/// <param name="blockUntilLive">wait for the is live event from the catchup subscription before returning</param>
38+
/// <param name="validateStream">ensure the stream exists on start</param>
3639
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true</param>
37-
void Start<TAggregate>(long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource;
40+
void Start<TAggregate>(long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource;
3841
}
3942
}

src/ReactiveDomain.Foundation/StreamStore/QueuedStreamListener.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ public void Handle(IMessage @event)
5353
}
5454
}
5555

56-
public override void Start(string streamName, long? checkpoint = null, bool waitUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken))
56+
public override void Start(string streamName, long? checkpoint = null, bool waitUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default(CancellationToken))
5757
{
5858
_isLive.Reset();
5959

6060
SyncQueue?.Start();
61-
base.Start(streamName, checkpoint, waitUntilLive, cancelWaitToken);
61+
base.Start(streamName, checkpoint, waitUntilLive, validateStream, cancelWaitToken);
6262

6363
Interlocked.Exchange(ref _pendingCount, SyncQueue.MessageCount);
6464
if (Interlocked.Read(ref _pendingCount) <= 0 || SyncQueue.Idle)

src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics.SymbolStore;
34
using System.Linq;
45
using System.Threading;
56
using ReactiveDomain.Messaging;
@@ -105,7 +106,7 @@ public List<Tuple<string, long>> GetCheckpoint()
105106
/// <param name="checkpoint">The event to start with.</param>
106107
/// <param name="blockUntilLive">If true, blocks returning from this method until the listener has caught up.</param>
107108
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true.</param>
108-
public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default)
109+
public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default)
109110
{
110111
if (_getReader != null)
111112
{
@@ -115,7 +116,7 @@ public void Start(string stream, long? checkpoint = null, bool blockUntilLive =
115116
checkpoint = reader.Position ?? checkpoint;
116117
}
117118
}
118-
AddNewListener().Start(stream, checkpoint, blockUntilLive, cancelWaitToken);
119+
AddNewListener().Start(stream, checkpoint, blockUntilLive, validateStream, cancelWaitToken);
119120
}
120121

121122
/// <summary>
@@ -126,7 +127,7 @@ public void Start(string stream, long? checkpoint = null, bool blockUntilLive =
126127
/// <param name="checkpoint">The event to start with.</param>
127128
/// <param name="blockUntilLive">If true, blocks returning from this method until the listener has caught up.</param>
128129
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true.</param>
129-
public void Start<TAggregate>(Guid id, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource
130+
public void Start<TAggregate>(Guid id, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource
130131
{
131132
if (_getReader != null)
132133
{
@@ -136,7 +137,7 @@ public void Start<TAggregate>(Guid id, long? checkpoint = null, bool blockUntilL
136137
checkpoint = reader.Position;
137138
}
138139
}
139-
AddNewListener().Start<TAggregate>(id, checkpoint, blockUntilLive, cancelWaitToken);
140+
AddNewListener().Start<TAggregate>(id, checkpoint, blockUntilLive, validateStream, cancelWaitToken);
140141
}
141142

142143
/// <summary>
@@ -146,7 +147,7 @@ public void Start<TAggregate>(Guid id, long? checkpoint = null, bool blockUntilL
146147
/// <param name="checkpoint">The event to start with.</param>
147148
/// <param name="blockUntilLive">If true, blocks returning from this method until the listener has caught up.</param>
148149
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true.</param>
149-
public void Start<TAggregate>(long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource
150+
public void Start<TAggregate>(long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource
150151
{
151152
if (_getReader != null)
152153
{
@@ -156,7 +157,7 @@ public void Start<TAggregate>(long? checkpoint = null, bool blockUntilLive = fal
156157
checkpoint = reader.Position;
157158
}
158159
}
159-
AddNewListener().Start<TAggregate>(checkpoint, blockUntilLive, cancelWaitToken);
160+
AddNewListener().Start<TAggregate>(checkpoint, blockUntilLive, validateStream, cancelWaitToken);
160161
}
161162

162163
/// <summary>

src/ReactiveDomain.Foundation/StreamStore/SnapshotReadModel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ protected virtual void Restore(
1717
ReadModelState snapshot,
1818
bool startListeners = true,
1919
bool block = false,
20+
bool validateStreams = false,
2021
CancellationToken cancelWaitToken = default(CancellationToken)) {
2122
if(StartingState != null) {
2223
throw new InvalidOperationException("ReadModel has already been restored.");
@@ -27,7 +28,7 @@ protected virtual void Restore(
2728
if (!startListeners || StartingState.Checkpoints == null) return;
2829

2930
foreach (var stream in StartingState.Checkpoints) {
30-
Start(stream.Item1,stream.Item2,block, cancelWaitToken);
31+
Start(stream.Item1,stream.Item2, block, validateStreams, cancelWaitToken);
3132
}
3233
}
3334

src/ReactiveDomain.Foundation/StreamStore/StreamListener.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public void Start(
7979
Type tMessage,
8080
long? checkpoint = null,
8181
bool blockUntilLive = false,
82+
bool validateStream = false,
8283
CancellationToken cancelWaitToken = default(CancellationToken))
8384
{
8485
if (!tMessage.IsSubclassOf(typeof(Event)))
@@ -89,6 +90,7 @@ public void Start(
8990
_streamNameBuilder.GenerateForEventType(tMessage.Name),
9091
checkpoint,
9192
blockUntilLive,
93+
validateStream,
9294
cancelWaitToken);
9395
}
9496
/// <summary>
@@ -102,13 +104,15 @@ public void Start(
102104
public void Start<TAggregate>(
103105
long? checkpoint = null,
104106
bool blockUntilLive = false,
107+
bool validateStream = false,
105108
CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource
106109
{
107110

108111
Start(
109112
_streamNameBuilder.GenerateForCategory(typeof(TAggregate)),
110113
checkpoint,
111114
blockUntilLive,
115+
validateStream,
112116
cancelWaitToken);
113117
}
114118

@@ -125,12 +129,14 @@ public void Start<TAggregate>(
125129
Guid id,
126130
long? checkpoint = null,
127131
bool blockUntilLive = false,
132+
bool validateStream = false,
128133
CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource
129134
{
130135
Start(
131136
_streamNameBuilder.GenerateForAggregate(typeof(TAggregate), id),
132137
checkpoint,
133138
blockUntilLive,
139+
validateStream,
134140
cancelWaitToken);
135141
}
136142

@@ -146,14 +152,15 @@ public virtual void Start(
146152
string streamName,
147153
long? checkpoint = null,
148154
bool blockUntilLive = false,
155+
bool validateStream = false,
149156
CancellationToken cancelWaitToken = default(CancellationToken))
150157
{
151158
_liveLock.Reset();
152159
lock (_startLock)
153160
{
154161
if (_started)
155162
throw new InvalidOperationException("Listener already started.");
156-
if (!ValidateStreamName(streamName))
163+
if (validateStream && !ValidateStreamName(streamName))
157164
throw new ArgumentException("Stream not found.", streamName);
158165
StreamName = streamName;
159166
_subscription =
@@ -204,8 +211,17 @@ public IDisposable SubscribeToStreamFrom(
204211

205212
public bool ValidateStreamName(string streamName)
206213
{
207-
var isValid = _streamStoreConnection.ReadStreamForward(streamName, 0, 1) != null;
208-
return isValid;
214+
try
215+
{
216+
var result = _streamStoreConnection.ReadStreamForward(streamName, 0, 1);
217+
218+
return result.GetType() == typeof(StreamEventsSlice);
219+
}
220+
catch (Exception)
221+
{
222+
return false;
223+
}
224+
209225
}
210226
protected virtual void GotEvent(RecordedEvent recordedEvent)
211227
{

src/ReactiveDomain.Testing/Specifications/NullListener.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace ReactiveDomain.Testing
1111
/// </summary>
1212
public class NullListener : IListener, ISubscriber
1313
{
14+
#pragma warning disable CS1066 // The default value specified will have no effect because it applies to a member that is used in contexts that do not allow optional arguments
1415

1516
private string _stream;
1617
private long _position;
@@ -34,7 +35,7 @@ public class NullListener : IListener, ISubscriber
3435
/// </summary>
3536
/// <param name="name">This parameter is ignored.</param>
3637
public NullListener(string name = "")
37-
{
38+
{
3839
}
3940

4041
/// <summary>
@@ -52,7 +53,7 @@ public void Dispose()
5253
/// <param name="checkpoint">The position at which the listener should start.</param>
5354
/// <param name="blockUntilLive">This parameter is ignored.</param>
5455
/// <param name="cancelWaitToken">This parameter is ignored.</param>
55-
public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default)
56+
public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default)
5657
{
5758
_stream = stream;
5859
_position = checkpoint ?? 0;
@@ -87,7 +88,7 @@ bool ISubscriber.HasSubscriberFor<T>(bool includeDerived)
8788
/// <param name="checkpoint">This parameter is ignored.</param>
8889
/// <param name="blockUntilLive">This parameter is ignored.</param>
8990
/// <param name="cancelWaitToken">This parameter is ignored.</param>
90-
void IListener.Start<TAggregate>(Guid id, long? checkpoint, bool blockUntilLive, CancellationToken cancelWaitToken)
91+
void IListener.Start<TAggregate>(Guid id, long? checkpoint, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default)
9192
{
9293
_stream = new PrefixedCamelCaseStreamNameBuilder().GenerateForAggregate(typeof(TAggregate), id);
9394
}
@@ -99,7 +100,7 @@ void IListener.Start<TAggregate>(Guid id, long? checkpoint, bool blockUntilLive,
99100
/// <param name="checkpoint">This parameter is ignored.</param>
100101
/// <param name="blockUntilLive">This parameter is ignored.</param>
101102
/// <param name="cancelWaitToken">This parameter is ignored.</param>
102-
void IListener.Start<TAggregate>(long? checkpoint, bool blockUntilLive, CancellationToken cancelWaitToken)
103+
void IListener.Start<TAggregate>(long? checkpoint, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default)
103104
{
104105
_stream = nameof(TAggregate);
105106
}
@@ -124,5 +125,6 @@ IDisposable ISubscriber.Subscribe<T>(IHandle<T> handler, bool includeDerived)
124125
void ISubscriber.Unsubscribe<T>(IHandle<T> handler)
125126
{
126127
}
128+
#pragma warning restore CS1066 // The default value specified will have no effect because it applies to a member that is used in contexts that do not allow optional arguments
127129
}
128130
}

0 commit comments

Comments
 (0)